From d7f01498fca98ca61c1721b86d8a9ec6f6495792 Mon Sep 17 00:00:00 2001 From: Marco Primi Date: Wed, 31 Jan 2024 15:48:12 -0800 Subject: [PATCH 1/8] [FIXED] RAFT node responds to VoteRequest with outdated Term A node's Term may be increased during handling of a VoteRequest. When that was the case, the node was responding with a stale Term. --- server/raft.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/raft.go b/server/raft.go index 6d5a51b735..f041fb0898 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3953,6 +3953,10 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { n.resetElect(randCampaignTimeout()) } } + + // Term might have changed, make sure response has the most current + vresp.term = n.term + n.Unlock() n.sendReply(vr.reply, vresp.encode()) From d0ee01342cefd4549c093998944e0c5c8c9e00b8 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 1 Feb 2024 17:10:49 +0000 Subject: [PATCH 2/8] Fix `TestNRGSimpleElection` to track the changed term after granted vote Signed-off-by: Neil Twigg --- server/raft_test.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/server/raft_test.go b/server/raft_test.go index a2cafa61cc..5a8d0b861f 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -273,15 +273,13 @@ func TestNRGSimpleElection(t *testing.T) { re := decodeVoteResponse(msg.Data) require_True(t, re != nil) - // The new term hasn't started yet, so the vote responses - // should contain the term from before the election. It is - // possible that candidates are listening to this to work - // out if they are in previous terms. - require_Equal(t, re.term, vr.lastTerm) - require_Equal(t, re.term, startTerm) - // The vote should have been granted. require_Equal(t, re.granted, true) + + // The node granted the vote, therefore the term in the vote + // response should have advanced as well. + require_Equal(t, re.term, vr.term) + require_Equal(t, re.term, startTerm+1) } // Everyone in the group should have voted for our candidate From eb2a53f90903f6dc9e15a089b5e18db9663dbf5b Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 1 Feb 2024 16:54:21 +0000 Subject: [PATCH 3/8] NRG: Don't send metaleader snapshot as normal entry on leader change if apply queue is not empty Signed-off-by: Neil Twigg fix --- server/jetstream_cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 146a3d3592..04f49fb4c6 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1358,7 +1358,7 @@ func (js *jetStream) monitorCluster() { case isLeader = <-lch: // For meta layer synchronize everyone to our state on becoming leader. - if isLeader { + if isLeader && n.ApplyQ().len() == 0 { n.SendSnapshot(js.metaSnapshot()) } // Process the change. From 20028dd7b5af1a967048ec8abf2a4c59f9ccab87 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 1 Feb 2024 12:26:13 -0700 Subject: [PATCH 4/8] Fix flapping test TestRouteCompressionAuto Signed-off-by: Ivan Kozlovic --- server/routes_test.go | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/server/routes_test.go b/server/routes_test.go index e376d3beb7..5ddc77da89 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -3939,8 +3939,8 @@ func TestRouteCompressionAuto(t *testing.T) { np := createNetProxy(0, 1024*1024*1024, 1024*1024*1024, fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port), true) routes := fmt.Sprintf("routes: [\"%s\"]", np.routeURL()) - rtts := "{mode: s2_auto, rtt_thresholds: [10ms, 20ms, 30ms]}" - conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "B", "100ms", rtts, routes))) + rtts := "{mode: s2_auto, rtt_thresholds: [100ms, 200ms, 300ms]}" + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "B", "500ms", rtts, routes))) s2, _ := RunServerWithConfig(conf2) defer s2.Shutdown() defer np.stop() @@ -3949,7 +3949,7 @@ func TestRouteCompressionAuto(t *testing.T) { checkComp := func(expected string) { t.Helper() - checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + checkFor(t, 4*time.Second, 50*time.Millisecond, func() error { s2.mu.RLock() defer s2.mu.RUnlock() if n := s2.numRoutes(); n != 4 { @@ -3973,15 +3973,15 @@ func TestRouteCompressionAuto(t *testing.T) { checkComp(CompressionS2Uncompressed) // Change the proxy RTT and we should get compression "fast" - np.updateRTT(15 * time.Millisecond) + np.updateRTT(150 * time.Millisecond) checkComp(CompressionS2Fast) - // Now 25ms, and get "better" - np.updateRTT(25 * time.Millisecond) + // Now 250ms, and get "better" + np.updateRTT(250 * time.Millisecond) checkComp(CompressionS2Better) - // Above 35 and we should get "best" - np.updateRTT(35 * time.Millisecond) + // Above 350 and we should get "best" + np.updateRTT(350 * time.Millisecond) checkComp(CompressionS2Best) // Down to 1ms and again should get "uncompressed" @@ -3989,28 +3989,28 @@ func TestRouteCompressionAuto(t *testing.T) { checkComp(CompressionS2Uncompressed) // Do a config reload with disabling uncompressed - reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", "100ms", "{mode: s2_auto, rtt_thresholds: [0ms, 10ms, 0ms, 30ms]}", routes)) + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", "500ms", "{mode: s2_auto, rtt_thresholds: [0ms, 100ms, 0ms, 300ms]}", routes)) // Change the RTT back down to 1ms, but we should not go uncompressed, // we should have "fast" compression. np.updateRTT(1 * time.Millisecond) checkComp(CompressionS2Fast) - // Now bump to 15ms and we should be using "best", not the "better" mode - np.updateRTT(15 * time.Millisecond) + // Now bump to 150ms and we should be using "best", not the "better" mode + np.updateRTT(150 * time.Millisecond) checkComp(CompressionS2Best) - // Try 40ms and we should still be using "best" - np.updateRTT(40 * time.Millisecond) + // Try 400ms and we should still be using "best" + np.updateRTT(400 * time.Millisecond) checkComp(CompressionS2Best) // Try other variations - reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", "100ms", "{mode: s2_auto, rtt_thresholds: [5ms, 15ms, 0ms, 0ms]}", routes)) - np.updateRTT(1 * time.Millisecond) + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", "500ms", "{mode: s2_auto, rtt_thresholds: [50ms, 150ms, 0ms, 0ms]}", routes)) + np.updateRTT(0 * time.Millisecond) checkComp(CompressionS2Uncompressed) - np.updateRTT(10 * time.Millisecond) + np.updateRTT(100 * time.Millisecond) checkComp(CompressionS2Fast) // Since we expect the same compression level, just wait before doing // the update and the next check. time.Sleep(100 * time.Millisecond) - np.updateRTT(25 * time.Millisecond) + np.updateRTT(250 * time.Millisecond) checkComp(CompressionS2Fast) // Now disable compression on s1 @@ -4020,7 +4020,7 @@ func TestRouteCompressionAuto(t *testing.T) { time.Sleep(100 * time.Millisecond) checkClusterFormed(t, s1, s2) // Now change the RTT values in the proxy. - np.updateRTT(1 * time.Millisecond) + np.updateRTT(0 * time.Millisecond) // Now check that s2 also shows as "off". Wait for some ping intervals. time.Sleep(200 * time.Millisecond) checkComp(CompressionOff) From fc8434222b7a2b9a170fead9c10542a2c9b2e22a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 1 Feb 2024 13:59:57 -0800 Subject: [PATCH 5/8] When moving to direct addServiceImport, needed to explicitly set "to" to avoid extra work. Caused performance hit on KV Get performance tests. Signed-off-by: Derek Collison --- server/client.go | 9 +++++++-- server/jetstream.go | 3 ++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/server/client.go b/server/client.go index 4ef424cbb0..795e0b9e3b 100644 --- a/server/client.go +++ b/server/client.go @@ -4089,6 +4089,12 @@ func getHeader(key string, hdr []byte) []byte { return value } +// For bytes.HasPrefix below. +var ( + jsRequestNextPreB = []byte(jsRequestNextPre) + jsDirectGetPreB = []byte(jsDirectGetPre) +) + // processServiceImport is an internal callback when a subscription matches an imported service // from another account. This includes response mappings as well. func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byte) { @@ -4110,8 +4116,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt var checkJS bool shouldReturn := si.invalid || acc.sl == nil if !shouldReturn && !isResponse && si.to == jsAllAPI { - subj := bytesToString(c.pa.subject) - if strings.HasPrefix(subj, jsRequestNextPre) || strings.HasPrefix(subj, jsDirectGetPre) { + if bytes.HasPrefix(c.pa.subject, jsDirectGetPreB) || bytes.HasPrefix(c.pa.subject, jsRequestNextPreB) { checkJS = true } } diff --git a/server/jetstream.go b/server/jetstream.go index 679761c15f..cba11d14ef 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -654,7 +654,8 @@ func (a *Account) enableAllJetStreamServiceImportsAndMappings() error { if !a.serviceImportExists(jsAllAPI) { // Capture si so we can turn on implicit sharing with JetStream layer. - si, err := a.addServiceImport(s.SystemAccount(), jsAllAPI, _EMPTY_, nil) + // Make sure to set "to" otherwise will incur performance slow down. + si, err := a.addServiceImport(s.SystemAccount(), jsAllAPI, jsAllAPI, nil) if err != nil { return fmt.Errorf("Error setting up jetstream service imports for account: %v", err) } From 7a8f9d7444544958e9f6ae314e7e1691018378c6 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 1 Feb 2024 15:26:49 -0800 Subject: [PATCH 6/8] Use dios gate internally and do not wrap whole function. Signed-off-by: Derek Collison --- server/filestore.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index bcffac9794..861fe4dc9f 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4846,11 +4846,6 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg } func (mb *msgBlock) recompressOnDiskIfNeeded() error { - // Wait for disk I/O slots to become available. This prevents us from - // running away with system resources. - <-dios - defer func() { dios <- struct{}{} }() - alg := mb.fs.fcfg.Compression mb.mu.Lock() defer mb.mu.Unlock() @@ -4864,7 +4859,10 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { // header, in which case we do nothing. // 2. The block will be uncompressed, in which case we will compress it // and then write it back out to disk, reencrypting if necessary. + <-dios origBuf, err := os.ReadFile(origFN) + dios <- struct{}{} + if err != nil { return fmt.Errorf("failed to read original block from disk: %w", err) } From a478ee0ec126f44b183b8f81fd0a6befe449ee24 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 1 Feb 2024 17:44:20 -0800 Subject: [PATCH 7/8] Remove fallback cleanup of consumer raft directory. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 04f49fb4c6..5053e680db 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4392,7 +4392,6 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb recovering := ca.recovering js.mu.RUnlock() - stopped := false var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}} var err error var acc *Account @@ -4402,9 +4401,13 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb if mset, _ := acc.lookupStream(ca.Stream); mset != nil { if o := mset.lookupConsumer(ca.Name); o != nil { err = o.stopWithFlags(true, false, true, wasLeader) - stopped = true } } + } else if ca.Group != nil { + // We have a missing account, see if we can cleanup. + if sacc := s.SystemAccount(); sacc != nil { + os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, ca.Group.Name)) + } } // Always delete the node if present. @@ -4412,15 +4415,6 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb node.Delete() } - // This is a stop gap cleanup in case - // 1) the account does not exist (and mset consumer couldn't be stopped) and/or - // 2) node was nil (and couldn't be deleted) - if !stopped || node == nil { - if sacc := s.SystemAccount(); sacc != nil { - os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, ca.Group.Name)) - } - } - if !wasLeader || ca.Reply == _EMPTY_ { if !(offline && isMetaLeader) { return From 79960bbb12406a37b8c581c4014171529f4906e7 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Thu, 1 Feb 2024 19:46:53 -0800 Subject: [PATCH 8/8] Add support to customize cluster ping_interval Signed-off-by: Waldemar Quevedo --- server/client.go | 21 ++++++++++++++--- server/config_check_test.go | 38 ++++++++++++++++++++++++++----- server/configs/reload/reload.conf | 2 ++ server/opts.go | 9 ++++++++ server/reload_test.go | 6 +++++ server/route.go | 10 +++++++- server/routes_test.go | 34 +++++++++++++++++++++++++++ 7 files changed, 110 insertions(+), 10 deletions(-) diff --git a/server/client.go b/server/client.go index 795e0b9e3b..348c0478fe 100644 --- a/server/client.go +++ b/server/client.go @@ -4795,7 +4795,11 @@ func (c *client) processPingTimer() { var sendPing bool - pingInterval := c.srv.getOpts().PingInterval + opts := c.srv.getOpts() + pingInterval := opts.PingInterval + if c.kind == ROUTER && opts.Cluster.PingInterval > 0 { + pingInterval = opts.Cluster.PingInterval + } pingInterval = adjustPingInterval(c.kind, pingInterval) now := time.Now() needRTT := c.rtt == 0 || now.Sub(c.rttStart) > DEFAULT_RTT_MEASUREMENT_INTERVAL @@ -4815,7 +4819,11 @@ func (c *client) processPingTimer() { if sendPing { // Check for violation - if c.ping.out+1 > c.srv.getOpts().MaxPingsOut { + maxPingsOut := opts.MaxPingsOut + if c.kind == ROUTER && opts.Cluster.MaxPingsOut > 0 { + maxPingsOut = opts.Cluster.MaxPingsOut + } + if c.ping.out+1 > maxPingsOut { c.Debugf("Stale Client Connection - Closing") c.enqueueProto([]byte(fmt.Sprintf(errProto, "Stale Connection"))) c.mu.Unlock() @@ -4852,7 +4860,11 @@ func (c *client) setPingTimer() { if c.srv == nil { return } - d := c.srv.getOpts().PingInterval + opts := c.srv.getOpts() + d := opts.PingInterval + if c.kind == ROUTER && opts.Cluster.PingInterval > 0 { + d = opts.Cluster.PingInterval + } d = adjustPingInterval(c.kind, d) c.ping.tmr = time.AfterFunc(d, c.processPingTimer) } @@ -5745,6 +5757,9 @@ func (c *client) setFirstPingTimer() { opts := s.getOpts() d := opts.PingInterval + if c.kind == ROUTER && opts.Cluster.PingInterval > 0 { + d = opts.Cluster.PingInterval + } if !opts.DisableShortFirstPing { if c.kind != CLIENT { if d > firstPingInterval { diff --git a/server/config_check_test.go b/server/config_check_test.go index 5329949226..a9ec00cf1a 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -1580,7 +1580,7 @@ func TestConfigCheck(t *testing.T) { errorPos: 6, }, { - name: "wrong type for cluter pool size", + name: "wrong type for cluster pool size", config: ` cluster { port: -1 @@ -1592,7 +1592,7 @@ func TestConfigCheck(t *testing.T) { errorPos: 6, }, { - name: "wrong type for cluter accounts", + name: "wrong type for cluster accounts", config: ` cluster { port: -1 @@ -1604,7 +1604,7 @@ func TestConfigCheck(t *testing.T) { errorPos: 6, }, { - name: "wrong type for cluter compression", + name: "wrong type for cluster compression", config: ` cluster { port: -1 @@ -1616,7 +1616,7 @@ func TestConfigCheck(t *testing.T) { errorPos: 6, }, { - name: "wrong type for cluter compression mode", + name: "wrong type for cluster compression mode", config: ` cluster { port: -1 @@ -1630,7 +1630,7 @@ func TestConfigCheck(t *testing.T) { errorPos: 7, }, { - name: "wrong type for cluter compression rtt thresholds", + name: "wrong type for cluster compression rtt thresholds", config: ` cluster { port: -1 @@ -1645,7 +1645,7 @@ func TestConfigCheck(t *testing.T) { errorPos: 7, }, { - name: "invalid durations for cluter compression rtt thresholds", + name: "invalid durations for cluster compression rtt thresholds", config: ` cluster { port: -1 @@ -1659,6 +1659,32 @@ func TestConfigCheck(t *testing.T) { errorLine: 6, errorPos: 7, }, + { + name: "invalid durations for cluster ping interval", + config: ` + cluster { + port: -1 + ping_interval: -1 + ping_max: 6 + } + `, + err: fmt.Errorf(`invalid use of field "ping_interval": ping_interval should be converted to a duration`), + errorLine: 4, + errorPos: 6, + }, + { + name: "invalid durations for cluster ping interval", + config: ` + cluster { + port: -1 + ping_interval: '2m' + ping_max: 6 + } + `, + warningErr: fmt.Errorf(`Cluster 'ping_interval' will reset to 30s which is the max for routes`), + errorLine: 4, + errorPos: 6, + }, { name: "wrong type for leafnodes compression", config: ` diff --git a/server/configs/reload/reload.conf b/server/configs/reload/reload.conf index 068500b328..47d517042f 100644 --- a/server/configs/reload/reload.conf +++ b/server/configs/reload/reload.conf @@ -35,4 +35,6 @@ cluster { listen: 127.0.0.1:-1 name: "abc" no_advertise: true # enable on reload + ping_interval: '20s' + ping_max: 8 } diff --git a/server/opts.go b/server/opts.go index 8925ef6df1..721d6be843 100644 --- a/server/opts.go +++ b/server/opts.go @@ -80,6 +80,8 @@ type ClusterOpts struct { PoolSize int `json:"-"` PinnedAccounts []string `json:"-"` Compression CompressionOpts `json:"-"` + PingInterval time.Duration `json:"-"` + MaxPingsOut int `json:"-"` // Not exported (used in tests) resolver netResolver @@ -1755,6 +1757,13 @@ func parseCluster(v interface{}, opts *Options, errors *[]error, warnings *[]err *errors = append(*errors, err) continue } + case "ping_interval": + opts.Cluster.PingInterval = parseDuration("ping_interval", tk, mv, errors, warnings) + if opts.Cluster.PingInterval > routeMaxPingInterval { + *warnings = append(*warnings, &configErr{tk, fmt.Sprintf("Cluster 'ping_interval' will reset to %v which is the max for routes", routeMaxPingInterval)}) + } + case "ping_max": + opts.Cluster.MaxPingsOut = int(mv.(int64)) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/reload_test.go b/server/reload_test.go index 5be29aa737..2bc22b6ee5 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -348,6 +348,12 @@ func TestConfigReload(t *testing.T) { if !updated.Cluster.NoAdvertise { t.Fatal("Expected NoAdvertise to be true") } + if updated.Cluster.PingInterval != 20*time.Second { + t.Fatalf("Cluster PingInterval is incorrect.\nexpected: 20s\ngot: %v", updated.Cluster.PingInterval) + } + if updated.Cluster.MaxPingsOut != 8 { + t.Fatalf("Cluster MaxPingsOut is incorrect.\nexpected: 6\ngot: %v", updated.Cluster.MaxPingsOut) + } if updated.PidFile != "nats-server.pid" { t.Fatalf("PidFile is incorrect.\nexpected: nats-server.pid\ngot: %s", updated.PidFile) } diff --git a/server/route.go b/server/route.go index e73f1bdf72..f8a8623d6a 100644 --- a/server/route.go +++ b/server/route.go @@ -1780,7 +1780,15 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL, accName string) *clie // the connection as stale based on the ping interval and max out values, // but without actually sending pings. if compressionConfigured { - c.ping.tmr = time.AfterFunc(opts.PingInterval*time.Duration(opts.MaxPingsOut+1), func() { + pingInterval := opts.PingInterval + pingMax := opts.MaxPingsOut + if opts.Cluster.PingInterval > 0 { + pingInterval = opts.Cluster.PingInterval + } + if opts.Cluster.MaxPingsOut > 0 { + pingMax = opts.MaxPingsOut + } + c.ping.tmr = time.AfterFunc(pingInterval*time.Duration(pingMax+1), func() { c.mu.Lock() c.Debugf("Stale Client Connection - Closing") c.enqueueProto([]byte(fmt.Sprintf(errProto, "Stale Connection"))) diff --git a/server/routes_test.go b/server/routes_test.go index 5ddc77da89..faf0160015 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -4059,6 +4059,40 @@ func TestRoutePings(t *testing.T) { } } +func TestRouteCustomPing(t *testing.T) { + pingInterval := 50 * time.Millisecond + o1 := DefaultOptions() + o1.Cluster.PingInterval = pingInterval + o1.Cluster.MaxPingsOut = 2 + s1 := RunServer(o1) + defer s1.Shutdown() + + o2 := DefaultOptions() + o2.Cluster.PingInterval = pingInterval + o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port)) + s2 := RunServer(o2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + ch := make(chan struct{}, 1) + s1.mu.RLock() + s1.forEachRemote(func(r *client) { + r.mu.Lock() + r.nc = &capturePingConn{r.nc, ch} + r.mu.Unlock() + }) + s1.mu.RUnlock() + + for i := 0; i < 5; i++ { + select { + case <-ch: + case <-time.After(250 * time.Millisecond): + t.Fatalf("Did not send PING") + } + } +} + func TestRouteNoLeakOnSlowConsumer(t *testing.T) { o1 := DefaultOptions() o1.Cluster.PoolSize = -1