diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 1333e9ff1a..42c9eee668 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -6089,6 +6089,94 @@ func TestJetStreamClusterLeafNodeSPOFMigrateLeaders(t *testing.T) { }) } +// https://github.com/nats-io/nats-server/issues/3178 +func TestJetStreamClusterLeafNodeSPOFMigrateLeadersWithMigrateDelay(t *testing.T) { + tmpl := strings.Replace(jsClusterTempl, "store_dir:", "domain: REMOTE, store_dir:", 1) + c := createJetStreamClusterWithTemplate(t, tmpl, "HUB", 2) + defer c.shutdown() + + tmpl = strings.Replace(jsClusterTemplWithLeafNode, "store_dir:", "domain: CORE, store_dir:", 1) + lnc := c.createLeafNodesWithTemplateAndStartPort(tmpl, "LNC", 2, 22110) + defer lnc.shutdown() + + lnc.waitOnClusterReady() + + // Place JS assets in LN, and we will do a pull consumer from the HUB. + nc, js := jsClientConnect(t, lnc.randomServer()) + defer nc.Close() + + si, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 2, + }) + require_NoError(t, err) + require_True(t, si.Cluster.Name == "LNC") + + for i := 0; i < 100; i++ { + js.PublishAsync("foo", []byte("HELLO")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Create the consumer. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "d", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + + nc, _ = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + dsubj := "$JS.CORE.API.CONSUMER.MSG.NEXT.TEST.d" + // Grab directly using domain based subject but from the HUB cluster. + _, err = nc.Request(dsubj, nil, time.Second) + require_NoError(t, err) + + // Now we will force the consumer leader's server to drop and stall leafnode connections. + cl := lnc.consumerLeader("$G", "TEST", "d") + cl.setJetStreamMigrateOnRemoteLeafWithDelay(5 * time.Second) + cl.closeAndDisableLeafnodes() + + // Now make sure we can eventually get a message again. + checkFor(t, 10*time.Second, 500*time.Millisecond, func() error { + _, err = nc.Request(dsubj, nil, 500*time.Millisecond) + return err + }) + + nc, _ = jsClientConnect(t, lnc.randomServer()) + defer nc.Close() + + // Now make sure the consumer, or any other asset, can not become a leader on this node while the leafnode + // is disconnected. + csd := fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "d") + for i := 0; i < 10; i++ { + nc.Request(csd, nil, time.Second) + lnc.waitOnConsumerLeader(globalAccountName, "TEST", "d") + if lnc.consumerLeader(globalAccountName, "TEST", "d") == cl { + t.Fatalf("Consumer leader should not migrate to server without a leafnode connection") + } + } + + // Now make sure once leafnode is back we can have leaders on this server. + cl.reEnableLeafnodes() + checkLeafNodeConnectedCount(t, cl, 2) + for _, ln := range cl.leafRemoteCfgs { + require_True(t, ln.jsMigrateTimer == nil) + } + + // Make sure we can migrate back to this server now that we are connected. + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + nc.Request(csd, nil, time.Second) + lnc.waitOnConsumerLeader(globalAccountName, "TEST", "d") + if lnc.consumerLeader(globalAccountName, "TEST", "d") == cl { + return nil + } + return fmt.Errorf("Not this server yet") + }) +} + func TestJetStreamClusterStreamCatchupWithTruncateAndPriorSnapshot(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3F", 3) defer c.shutdown() diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 719ea06f92..7d1cd2015f 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1135,6 +1135,16 @@ func (s *Server) setJetStreamMigrateOnRemoteLeaf() { s.mu.Unlock() } +// Helper to set the remote migrate feature. +func (s *Server) setJetStreamMigrateOnRemoteLeafWithDelay(delay time.Duration) { + s.mu.Lock() + for _, cfg := range s.leafRemoteCfgs { + cfg.JetStreamClusterMigrate = true + cfg.JetStreamClusterMigrateDelay = delay + } + s.mu.Unlock() +} + // Will add in the mapping for the account to each server. func (c *cluster) addSubjectMapping(account, src, dest string) { c.t.Helper() diff --git a/server/jetstream_leafnode_test.go b/server/jetstream_leafnode_test.go index f0fa78881f..250ddabf87 100644 --- a/server/jetstream_leafnode_test.go +++ b/server/jetstream_leafnode_test.go @@ -1324,3 +1324,106 @@ func TestJetStreamLeafNodeJSClusterMigrateRecovery(t *testing.T) { // long election timer. Now this should work reliably. lnc.waitOnStreamLeader(globalAccountName, "TEST") } + +func TestJetStreamLeafNodeJSClusterMigrateRecoveryWithDelay(t *testing.T) { + tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: hub, store_dir:", 1) + c := createJetStreamCluster(t, tmpl, "hub", _EMPTY_, 3, 12232, true) + defer c.shutdown() + + tmpl = strings.Replace(jsClusterTemplWithLeafNode, "store_dir:", "domain: leaf, store_dir:", 1) + lnc := c.createLeafNodesWithTemplateAndStartPort(tmpl, "leaf", 3, 23913) + defer lnc.shutdown() + + lnc.waitOnClusterReady() + delay := 5 * time.Second + for _, s := range lnc.servers { + s.setJetStreamMigrateOnRemoteLeafWithDelay(delay) + } + + nc, _ := jsClientConnect(t, lnc.randomServer()) + defer nc.Close() + + ljs, err := nc.JetStream(nats.Domain("leaf")) + require_NoError(t, err) + + // Create an asset in the leafnode cluster. + si, err := ljs.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + require_Equal(t, si.Cluster.Name, "leaf") + require_NotEqual(t, si.Cluster.Leader, noLeader) + require_Equal(t, len(si.Cluster.Replicas), 2) + + // Count how many remotes each server in the leafnode cluster is + // supposed to have and then take them down. + remotes := map[*Server]int{} + for _, s := range lnc.servers { + remotes[s] += len(s.leafRemoteCfgs) + s.closeAndDisableLeafnodes() + checkLeafNodeConnectedCount(t, s, 0) + } + + // The Raft nodes in the leafnode cluster now need some time to + // notice that they're no longer receiving AEs from a leader, as + // they should have been forced into observer mode. Check that + // this is the case. + // We expect the nodes to become observers after the delay time. + now := time.Now() + timeout := maxElectionTimeout + delay + success := false + for time.Since(now) <= timeout { + allObservers := true + for _, s := range lnc.servers { + s.rnMu.RLock() + for name, n := range s.raftNodes { + if name == defaultMetaGroupName { + require_False(t, n.IsObserver()) + } else if n.IsObserver() { + // Make sure the migration delay is respected. + require_True(t, time.Since(now) < time.Duration(float64(delay)*0.7)) + } else { + allObservers = false + } + } + s.rnMu.RUnlock() + } + if allObservers { + success = true + break + } + time.Sleep(100 * time.Millisecond) + } + require_True(t, success) + + // Bring the leafnode connections back up. + for _, s := range lnc.servers { + s.reEnableLeafnodes() + checkLeafNodeConnectedCount(t, s, remotes[s]) + } + + // Wait for nodes to notice they are no longer in observer mode + // and to leave observer mode. + time.Sleep(maxElectionTimeout) + for _, s := range lnc.servers { + s.rnMu.RLock() + for _, n := range s.raftNodes { + require_False(t, n.IsObserver()) + } + s.rnMu.RUnlock() + } + + // Make sure all delay timers in remotes are disabled + for _, s := range lnc.servers { + for _, r := range s.leafRemoteCfgs { + require_True(t, r.jsMigrateTimer == nil) + } + } + + // Previously nodes would have left observer mode but then would + // have failed to elect a stream leader as they were stuck on a + // long election timer. Now this should work reliably. + lnc.waitOnStreamLeader(globalAccountName, "TEST") +} diff --git a/server/leafnode.go b/server/leafnode.go index 5f0834da86..801a9c1b85 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -105,13 +105,14 @@ type leaf struct { type leafNodeCfg struct { sync.RWMutex *RemoteLeafOpts - urls []*url.URL - curURL *url.URL - tlsName string - username string - password string - perms *Permissions - connDelay time.Duration // Delay before a connect, could be used while detecting loop condition, etc.. + urls []*url.URL + curURL *url.URL + tlsName string + username string + password string + perms *Permissions + connDelay time.Duration // Delay before a connect, could be used while detecting loop condition, etc.. + jsMigrateTimer *time.Timer } // Check to see if this is a solicited leafnode. We do special processing for solicited. @@ -500,6 +501,7 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) if s.eventsEnabled() { isSysAcc = remote.LocalAccount == s.sys.account.Name } + jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay s.mu.Unlock() // If we are sharing a system account and we are not standalone delay to gather some info prior. @@ -522,6 +524,7 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v" attempts := 0 + for s.isRunning() && s.remoteLeafNodeStillValid(remote) { rURL := remote.pickNextURL() url, err := s.getRandomIP(resolver, rURL.Host, nil) @@ -548,15 +551,28 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) } else { s.Debugf(connErrFmt, rURL.Host, attempts, err) } + remote.Lock() + // if we are using a delay to start migrating assets, kick off a migrate timer. + if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 { + remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() { + s.checkJetStreamMigrate(remote) + }) + } + remote.Unlock() select { case <-s.quitCh: + remote.cancelMigrateTimer() return case <-time.After(delay): - // Check if we should migrate any JetStream assets while this remote is down. - s.checkJetStreamMigrate(remote) + // Check if we should migrate any JetStream assets immediately while this remote is down. + // This will be used if JetStreamClusterMigrateDelay was not set + if jetstreamMigrateDelay == 0 { + s.checkJetStreamMigrate(remote) + } continue } } + remote.cancelMigrateTimer() if !s.remoteLeafNodeStillValid(remote) { conn.Close() return @@ -573,6 +589,15 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) } } +func (cfg *leafNodeCfg) cancelMigrateTimer() { + cfg.Lock() + if cfg.jsMigrateTimer != nil { + cfg.jsMigrateTimer.Stop() + cfg.jsMigrateTimer = nil + } + cfg.Unlock() +} + // This will clear any observer state such that stream or consumer assets on this server can become leaders again. func (s *Server) clearObserverState(remote *leafNodeCfg) { s.mu.RLock() diff --git a/server/opts.go b/server/opts.go index f562d925a6..beae40971e 100644 --- a/server/opts.go +++ b/server/opts.go @@ -241,6 +241,10 @@ type RemoteLeafOpts struct { // not be able to work. This tells the system to migrate the leaders away from this server. // This only changes leader for R>1 assets. JetStreamClusterMigrate bool `json:"jetstream_cluster_migrate,omitempty"` + + // If JetStreamClusterMigrate is set to true, this is the time after which the leader + // will be migrated away from this server if still disconnected. + JetStreamClusterMigrateDelay time.Duration `json:"jetstream_cluster_migrate_delay,omitempty"` } type JSLimitOpts struct { @@ -2746,6 +2750,16 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL remote.Websocket.NoMasking = v.(bool) case "jetstream_cluster_migrate", "js_cluster_migrate": remote.JetStreamClusterMigrate = true + migrateConfig, ok := v.(map[string]any) + if !ok { + continue + } + val, ok := migrateConfig["delay"] + tk, delay := unwrapValue(val, &tk) + if ok { + remote.JetStreamClusterMigrateDelay = parseDuration("delay", tk, delay, errors, warnings) + } + case "compression": if err := parseCompression(&remote.Compression, CompressionS2Auto, tk, k, v); err != nil { *errors = append(*errors, err) diff --git a/server/opts_test.go b/server/opts_test.go index 7ea6bf4980..cb456edcfb 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -2540,6 +2540,57 @@ func TestParsingLeafNodeRemotes(t *testing.T) { t.Fatal("Expected urls to be random") } }) + + t.Run("parse config file js_cluster_migrate", func(t *testing.T) { + content := ` + leafnodes { + remotes = [ + { + url: nats-leaf://127.0.0.1:2222 + account: foo // Local Account to bind to.. + credentials: "./my.creds" + js_cluster_migrate: true + }, + { + url: nats-leaf://127.0.0.1:2222 + account: bar // Local Account to bind to.. + credentials: "./my.creds" + js_cluster_migrate: { + delay: 30s + } + } + ] + } + ` + conf := createConfFile(t, []byte(content)) + opts, err := ProcessConfigFile(conf) + if err != nil { + t.Fatalf("Error processing file: %v", err) + } + if len(opts.LeafNode.Remotes) != 2 { + t.Fatalf("Expected 2 remote, got %d", len(opts.LeafNode.Remotes)) + } + u, _ := url.Parse("nats-leaf://127.0.0.1:2222") + expected := []*RemoteLeafOpts{ + { + URLs: []*url.URL{u}, + LocalAccount: "foo", + Credentials: "./my.creds", + JetStreamClusterMigrate: true, + }, + { + URLs: []*url.URL{u}, + LocalAccount: "bar", + Credentials: "./my.creds", + JetStreamClusterMigrate: true, + JetStreamClusterMigrateDelay: 30 * time.Second, + }, + } + if !reflect.DeepEqual(opts.LeafNode.Remotes, expected) { + t.Fatalf("Expected %v, got %v", expected, opts.LeafNode.Remotes) + } + }) + } func TestLargeMaxControlLine(t *testing.T) {