diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5053e680db..e7756cd114 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1,4 +1,4 @@ -// Copyright 2020-2023 The NATS Authors +// Copyright 2020-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -432,10 +432,10 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { } // Restart the stream in question. -// Should only be called when the stream is know in a bad state. +// Should only be called when the stream is known to be in a bad state. func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) { js.mu.Lock() - cc := js.cluster + s, cc := js.srv, js.cluster if cc == nil { js.mu.Unlock() return @@ -458,9 +458,18 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) { } rg.node = nil } + sinceCreation := time.Since(sa.Created) js.mu.Unlock() // Process stream assignment to recreate. + // Check that we have given system enough time to start us up. + // This will be longer than obvious, and matches consumer logic in case system very busy. + if sinceCreation < 10*time.Second { + s.Debugf("Not restarting missing stream '%s > %s', too soon since creation %v", + acc, csa.Config.Name, sinceCreation) + return + } + js.processStreamAssignment(sa) // If we had consumers assigned to this server they will be present in the copy, csa. @@ -569,13 +578,24 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum // When we try to restart we nil out the node if applicable // and reprocess the consumer assignment. restartConsumer := func() { + mset.mu.RLock() + accName, streamName := mset.acc.GetName(), mset.cfg.Name + mset.mu.RUnlock() + js.mu.Lock() + deleted := ca.deleted + // Check that we have not just been created. + if !deleted && time.Since(ca.Created) < 10*time.Second { + s.Debugf("Not restarting missing consumer '%s > %s > %s', too soon since creation %v", + accName, streamName, consumer, time.Since(ca.Created)) + js.mu.Unlock() + return + } // Make sure the node is stopped if still running. if node != nil && node.State() != Closed { node.Stop() } ca.Group.node = nil - deleted := ca.deleted js.mu.Unlock() if !deleted { js.processConsumerAssignment(ca) @@ -4112,6 +4132,7 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) { // Make sure this removal is for what we have, otherwise ignore. if ca.Group != nil && oca.Group != nil && ca.Group.Name == oca.Group.Name { needDelete = true + oca.deleted = true delete(sa.consumers, ca.Name) } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index dec767ef7f..a269523c1b 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3786,7 +3786,7 @@ func TestJetStreamClusterHealthzCheckForStoppedAssets(t *testing.T) { // Wait for exit. time.Sleep(100 * time.Millisecond) - checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { + checkFor(t, 15*time.Second, 500*time.Millisecond, func() error { hs := s.healthz(nil) if hs.Error != _EMPTY_ { return errors.New(hs.Error) diff --git a/server/norace_test.go b/server/norace_test.go index 2866d05484..83c4ea8e07 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -9390,3 +9390,127 @@ func TestNoRaceJetStreamClusterStreamCatchupLargeInteriorDeletes(t *testing.T) { return fmt.Errorf("Msgs not equal %d vs %d", state.Msgs, si.State.Msgs) }) } + +func TestNoRaceJetStreamClusterBadRestartsWithHealthzPolling(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.>"}, + Replicas: 3, + } + _, err := js.AddStream(cfg) + require_NoError(t, err) + + // We will poll healthz at a decent clip and make sure any restart logic works + // correctly with assets coming and going. + ch := make(chan struct{}) + defer close(ch) + + go func() { + for { + select { + case <-ch: + return + case <-time.After(50 * time.Millisecond): + for _, s := range c.servers { + s.healthz(nil) + } + } + } + }() + + numConsumers := 500 + consumers := make([]string, 0, numConsumers) + + var wg sync.WaitGroup + + for i := 0; i < numConsumers; i++ { + cname := fmt.Sprintf("CONS-%d", i+1) + consumers = append(consumers, cname) + wg.Add(1) + go func() { + defer wg.Done() + _, err := js.PullSubscribe("foo.>", cname, nats.BindStream("TEST")) + require_NoError(t, err) + }() + } + wg.Wait() + + // Make sure all are reported. + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + for _, s := range c.servers { + jsz, _ := s.Jsz(nil) + if jsz.Consumers != numConsumers { + return fmt.Errorf("%v wrong number of consumers: %d vs %d", s, jsz.Consumers, numConsumers) + } + } + return nil + }) + + // Now do same for streams. + numStreams := 200 + streams := make([]string, 0, numStreams) + + for i := 0; i < numStreams; i++ { + sname := fmt.Sprintf("TEST-%d", i+1) + streams = append(streams, sname) + wg.Add(1) + go func() { + defer wg.Done() + _, err := js.AddStream(&nats.StreamConfig{Name: sname, Replicas: 3}) + require_NoError(t, err) + }() + } + wg.Wait() + + // Make sure all are reported. + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + for _, s := range c.servers { + jsz, _ := s.Jsz(nil) + if jsz.Streams != numStreams+1 { + return fmt.Errorf("%v wrong number of streams: %d vs %d", s, jsz.Streams, numStreams+1) + } + } + return nil + }) + + // Delete consumers. + for _, cname := range consumers { + err := js.DeleteConsumer("TEST", cname) + require_NoError(t, err) + } + // Make sure reporting goes to zero. + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + for _, s := range c.servers { + jsz, _ := s.Jsz(nil) + if jsz.Consumers != 0 { + return fmt.Errorf("%v still has %d consumers", s, jsz.Consumers) + } + } + return nil + }) + + // Delete streams + for _, sname := range streams { + err := js.DeleteStream(sname) + require_NoError(t, err) + } + err = js.DeleteStream("TEST") + require_NoError(t, err) + + // Make sure reporting goes to zero. + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + for _, s := range c.servers { + jsz, _ := s.Jsz(nil) + if jsz.Streams != 0 { + return fmt.Errorf("%v still has %d streams", s, jsz.Streams) + } + } + return nil + }) +}