Skip to content

Commit

Permalink
[ADDED] Delay option to js_cluster_migrate
Browse files Browse the repository at this point in the history
This adds a `delay` option to `js_cluster_migrate` as an option in nested object.
The assets assets will only be migrated after the specified delay (rather than
immediately, which is the default).

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Sep 19, 2024
1 parent 50562c7 commit 2adbc4a
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 9 deletions.
88 changes: 88 additions & 0 deletions server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6089,6 +6089,94 @@ func TestJetStreamClusterLeafNodeSPOFMigrateLeaders(t *testing.T) {
})
}

// https://github.com/nats-io/nats-server/issues/3178
func TestJetStreamClusterLeafNodeSPOFMigrateLeadersWithMigrateDelay(t *testing.T) {
tmpl := strings.Replace(jsClusterTempl, "store_dir:", "domain: REMOTE, store_dir:", 1)
c := createJetStreamClusterWithTemplate(t, tmpl, "HUB", 2)
defer c.shutdown()

tmpl = strings.Replace(jsClusterTemplWithLeafNode, "store_dir:", "domain: CORE, store_dir:", 1)
lnc := c.createLeafNodesWithTemplateAndStartPort(tmpl, "LNC", 2, 22110)
defer lnc.shutdown()

lnc.waitOnClusterReady()

// Place JS assets in LN, and we will do a pull consumer from the HUB.
nc, js := jsClientConnect(t, lnc.randomServer())
defer nc.Close()

si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 2,
})
require_NoError(t, err)
require_True(t, si.Cluster.Name == "LNC")

for i := 0; i < 100; i++ {
js.PublishAsync("foo", []byte("HELLO"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Create the consumer.
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "d", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)

nc, _ = jsClientConnect(t, c.randomServer())
defer nc.Close()

dsubj := "$JS.CORE.API.CONSUMER.MSG.NEXT.TEST.d"
// Grab directly using domain based subject but from the HUB cluster.
_, err = nc.Request(dsubj, nil, time.Second)
require_NoError(t, err)

// Now we will force the consumer leader's server to drop and stall leafnode connections.
cl := lnc.consumerLeader("$G", "TEST", "d")
cl.setJetStreamMigrateOnRemoteLeafWithDelay(5 * time.Second)
cl.closeAndDisableLeafnodes()

// Now make sure we can eventually get a message again.
checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
_, err = nc.Request(dsubj, nil, 500*time.Millisecond)
return err
})

nc, _ = jsClientConnect(t, lnc.randomServer())
defer nc.Close()

// Now make sure the consumer, or any other asset, can not become a leader on this node while the leafnode
// is disconnected.
csd := fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "d")
for i := 0; i < 10; i++ {
nc.Request(csd, nil, time.Second)
lnc.waitOnConsumerLeader(globalAccountName, "TEST", "d")
if lnc.consumerLeader(globalAccountName, "TEST", "d") == cl {
t.Fatalf("Consumer leader should not migrate to server without a leafnode connection")
}
}

// Now make sure once leafnode is back we can have leaders on this server.
cl.reEnableLeafnodes()
checkLeafNodeConnectedCount(t, cl, 2)
for _, ln := range cl.leafRemoteCfgs {
require_True(t, ln.jsMigrateTimer == nil)
}

// Make sure we can migrate back to this server now that we are connected.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
nc.Request(csd, nil, time.Second)
lnc.waitOnConsumerLeader(globalAccountName, "TEST", "d")
if lnc.consumerLeader(globalAccountName, "TEST", "d") == cl {
return nil
}
return fmt.Errorf("Not this server yet")
})
}

func TestJetStreamClusterStreamCatchupWithTruncateAndPriorSnapshot(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3F", 3)
defer c.shutdown()
Expand Down
10 changes: 10 additions & 0 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,16 @@ func (s *Server) setJetStreamMigrateOnRemoteLeaf() {
s.mu.Unlock()
}

// Helper to set the remote migrate feature.
func (s *Server) setJetStreamMigrateOnRemoteLeafWithDelay(delay time.Duration) {
s.mu.Lock()
for _, cfg := range s.leafRemoteCfgs {
cfg.JetStreamClusterMigrate = true
cfg.JetStreamClusterMigrateDelay = delay
}
s.mu.Unlock()
}

// Will add in the mapping for the account to each server.
func (c *cluster) addSubjectMapping(account, src, dest string) {
c.t.Helper()
Expand Down
103 changes: 103 additions & 0 deletions server/jetstream_leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,3 +1324,106 @@ func TestJetStreamLeafNodeJSClusterMigrateRecovery(t *testing.T) {
// long election timer. Now this should work reliably.
lnc.waitOnStreamLeader(globalAccountName, "TEST")
}

func TestJetStreamLeafNodeJSClusterMigrateRecoveryWithDelay(t *testing.T) {
tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: hub, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "hub", _EMPTY_, 3, 12232, true)
defer c.shutdown()

tmpl = strings.Replace(jsClusterTemplWithLeafNode, "store_dir:", "domain: leaf, store_dir:", 1)
lnc := c.createLeafNodesWithTemplateAndStartPort(tmpl, "leaf", 3, 23913)
defer lnc.shutdown()

lnc.waitOnClusterReady()
delay := 5 * time.Second
for _, s := range lnc.servers {
s.setJetStreamMigrateOnRemoteLeafWithDelay(delay)
}

nc, _ := jsClientConnect(t, lnc.randomServer())
defer nc.Close()

ljs, err := nc.JetStream(nats.Domain("leaf"))
require_NoError(t, err)

// Create an asset in the leafnode cluster.
si, err := ljs.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
require_Equal(t, si.Cluster.Name, "leaf")
require_NotEqual(t, si.Cluster.Leader, noLeader)
require_Equal(t, len(si.Cluster.Replicas), 2)

// Count how many remotes each server in the leafnode cluster is
// supposed to have and then take them down.
remotes := map[*Server]int{}
for _, s := range lnc.servers {
remotes[s] += len(s.leafRemoteCfgs)
s.closeAndDisableLeafnodes()
checkLeafNodeConnectedCount(t, s, 0)
}

// The Raft nodes in the leafnode cluster now need some time to
// notice that they're no longer receiving AEs from a leader, as
// they should have been forced into observer mode. Check that
// this is the case.
// We expect the nodes to become observers after the delay time.
now := time.Now()
timeout := maxElectionTimeout + delay
success := false
for time.Since(now) <= timeout {
allObservers := true
for _, s := range lnc.servers {
s.rnMu.RLock()
for name, n := range s.raftNodes {
if name == defaultMetaGroupName {
require_False(t, n.IsObserver())
} else if n.IsObserver() {
// Make sure the migration delay is respected.
require_True(t, time.Since(now) < time.Duration(float64(delay)*0.7))
} else {
allObservers = false
}
}
s.rnMu.RUnlock()
}
if allObservers {
success = true
break
}
time.Sleep(100 * time.Millisecond)
}
require_True(t, success)

// Bring the leafnode connections back up.
for _, s := range lnc.servers {
s.reEnableLeafnodes()
checkLeafNodeConnectedCount(t, s, remotes[s])
}

// Wait for nodes to notice they are no longer in observer mode
// and to leave observer mode.
time.Sleep(maxElectionTimeout)
for _, s := range lnc.servers {
s.rnMu.RLock()
for _, n := range s.raftNodes {
require_False(t, n.IsObserver())
}
s.rnMu.RUnlock()
}

// Make sure all delay timers in remotes are disabled
for _, s := range lnc.servers {
for _, r := range s.leafRemoteCfgs {
require_True(t, r.jsMigrateTimer == nil)
}
}

// Previously nodes would have left observer mode but then would
// have failed to elect a stream leader as they were stuck on a
// long election timer. Now this should work reliably.
lnc.waitOnStreamLeader(globalAccountName, "TEST")
}
43 changes: 34 additions & 9 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,14 @@ type leaf struct {
type leafNodeCfg struct {
sync.RWMutex
*RemoteLeafOpts
urls []*url.URL
curURL *url.URL
tlsName string
username string
password string
perms *Permissions
connDelay time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
urls []*url.URL
curURL *url.URL
tlsName string
username string
password string
perms *Permissions
connDelay time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
jsMigrateTimer *time.Timer
}

// Check to see if this is a solicited leafnode. We do special processing for solicited.
Expand Down Expand Up @@ -500,6 +501,7 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)
if s.eventsEnabled() {
isSysAcc = remote.LocalAccount == s.sys.account.Name
}
jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
s.mu.Unlock()

// If we are sharing a system account and we are not standalone delay to gather some info prior.
Expand All @@ -522,6 +524,7 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)
const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"

attempts := 0

for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
rURL := remote.pickNextURL()
url, err := s.getRandomIP(resolver, rURL.Host, nil)
Expand All @@ -548,15 +551,28 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)
} else {
s.Debugf(connErrFmt, rURL.Host, attempts, err)
}
remote.Lock()
// if we are using a delay to start migrating assets, kick off a migrate timer.
if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
s.checkJetStreamMigrate(remote)
})
}
remote.Unlock()
select {
case <-s.quitCh:
remote.cancelMigrateTimer()
return
case <-time.After(delay):
// Check if we should migrate any JetStream assets while this remote is down.
s.checkJetStreamMigrate(remote)
// Check if we should migrate any JetStream assets immediately while this remote is down.
// This will be used if JetStreamClusterMigrateDelay was not set
if jetstreamMigrateDelay == 0 {
s.checkJetStreamMigrate(remote)
}
continue
}
}
remote.cancelMigrateTimer()
if !s.remoteLeafNodeStillValid(remote) {
conn.Close()
return
Expand All @@ -573,6 +589,15 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)
}
}

func (cfg *leafNodeCfg) cancelMigrateTimer() {
cfg.Lock()
if cfg.jsMigrateTimer != nil {
cfg.jsMigrateTimer.Stop()
cfg.jsMigrateTimer = nil
}
cfg.Unlock()
}

// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
func (s *Server) clearObserverState(remote *leafNodeCfg) {
s.mu.RLock()
Expand Down
14 changes: 14 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ type RemoteLeafOpts struct {
// not be able to work. This tells the system to migrate the leaders away from this server.
// This only changes leader for R>1 assets.
JetStreamClusterMigrate bool `json:"jetstream_cluster_migrate,omitempty"`

// If JetStreamClusterMigrate is set to true, this is the time after which the leader
// will be migrated away from this server if still disconnected.
JetStreamClusterMigrateDelay time.Duration `json:"jetstream_cluster_migrate_delay,omitempty"`
}

type JSLimitOpts struct {
Expand Down Expand Up @@ -2746,6 +2750,16 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL
remote.Websocket.NoMasking = v.(bool)
case "jetstream_cluster_migrate", "js_cluster_migrate":
remote.JetStreamClusterMigrate = true
migrateConfig, ok := v.(map[string]any)
if !ok {
continue
}
val, ok := migrateConfig["delay"]
tk, delay := unwrapValue(val, &tk)
if ok {
remote.JetStreamClusterMigrateDelay = parseDuration("delay", tk, delay, errors, warnings)
}

case "compression":
if err := parseCompression(&remote.Compression, CompressionS2Auto, tk, k, v); err != nil {
*errors = append(*errors, err)
Expand Down
51 changes: 51 additions & 0 deletions server/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2540,6 +2540,57 @@ func TestParsingLeafNodeRemotes(t *testing.T) {
t.Fatal("Expected urls to be random")
}
})

t.Run("parse config file js_cluster_migrate", func(t *testing.T) {
content := `
leafnodes {
remotes = [
{
url: nats-leaf://127.0.0.1:2222
account: foo // Local Account to bind to..
credentials: "./my.creds"
js_cluster_migrate: true
},
{
url: nats-leaf://127.0.0.1:2222
account: bar // Local Account to bind to..
credentials: "./my.creds"
js_cluster_migrate: {
delay: 30s
}
}
]
}
`
conf := createConfFile(t, []byte(content))
opts, err := ProcessConfigFile(conf)
if err != nil {
t.Fatalf("Error processing file: %v", err)
}
if len(opts.LeafNode.Remotes) != 2 {
t.Fatalf("Expected 2 remote, got %d", len(opts.LeafNode.Remotes))
}
u, _ := url.Parse("nats-leaf://127.0.0.1:2222")
expected := []*RemoteLeafOpts{
{
URLs: []*url.URL{u},
LocalAccount: "foo",
Credentials: "./my.creds",
JetStreamClusterMigrate: true,
},
{
URLs: []*url.URL{u},
LocalAccount: "bar",
Credentials: "./my.creds",
JetStreamClusterMigrate: true,
JetStreamClusterMigrateDelay: 30 * time.Second,
},
}
if !reflect.DeepEqual(opts.LeafNode.Remotes, expected) {
t.Fatalf("Expected %v, got %v", expected, opts.LeafNode.Remotes)
}
})

}

func TestLargeMaxControlLine(t *testing.T) {
Expand Down

0 comments on commit 2adbc4a

Please sign in to comment.