Skip to content

Commit

Permalink
NRG: Wait for goroutines to shutdown when recreating group
Browse files Browse the repository at this point in the history
This should fix some logical races where multiple sets of goroutines
can fight over the same store directory, i.e. when shutting down and
recreating a group rapidly.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Sep 18, 2024
1 parent 83c77b4 commit ed7724b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 14 deletions.
10 changes: 8 additions & 2 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2049,16 +2049,22 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor
return nil
}

js.mu.Unlock()

// Check if we already have this assigned.
retry:
if node := s.lookupRaftNode(rg.Name); node != nil {
if node.State() == Closed {
// We're waiting for this node to finish shutting down before we replace it.
node.WaitForStop()
goto retry
}
s.Debugf("JetStream cluster already has raft group %q assigned", rg.Name)
rg.node = node
js.mu.Unlock()
return nil
}

s.Debugf("JetStream cluster creating raft group:%+v", rg)
js.mu.Unlock()

sysAcc := s.SystemAccount()
if sysAcc == nil {
Expand Down
2 changes: 2 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5779,6 +5779,8 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) {

// Should only be meta NRG left.
require_True(t, s.numRaftNodes() == 1)
s.rnMu.RLock()
defer s.rnMu.RUnlock()
require_True(t, s.lookupRaftNode(sgn) == nil)
require_True(t, s.lookupRaftNode(ogn) == nil)
}
Expand Down
42 changes: 30 additions & 12 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type RaftNode interface {
QuitC() <-chan struct{}
Created() time.Time
Stop()
WaitForStop()
Delete()
Wipe()
RecreateInternalSubs() error
Expand Down Expand Up @@ -128,12 +129,13 @@ func (state RaftState) String() string {
type raft struct {
sync.RWMutex

created time.Time // Time that the group was created
accName string // Account name of the asset this raft group is for
acc *Account // Account that NRG traffic will be sent/received in
group string // Raft group
sd string // Store directory
id string // Node ID
created time.Time // Time that the group was created
accName string // Account name of the asset this raft group is for
acc *Account // Account that NRG traffic will be sent/received in
group string // Raft group
sd string // Store directory
id string // Node ID
wg sync.WaitGroup // Wait for running goroutines to exit on shutdown

wal WAL // WAL store (filestore or memstore)
wtype StorageType // WAL type, e.g. FileStorage or MemoryStorage
Expand Down Expand Up @@ -521,6 +523,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
s.registerRaftNode(n.group, n)

// Start the run goroutine for the Raft state machine.
n.wg.Add(1)
s.startGoRoutine(n.run, labels)

return n, nil
Expand Down Expand Up @@ -655,7 +658,7 @@ func (s *Server) clusterNameForNode(node string) string {
}

// Registers the Raft node with the server, as it will track all of the Raft
// nodes.
// nodes. rnMu must must be held.
func (s *Server) registerRaftNode(group string, n RaftNode) {
s.rnMu.Lock()
defer s.rnMu.Unlock()
Expand All @@ -676,8 +679,8 @@ func (s *Server) unregisterRaftNode(group string) {

// Returns how many Raft nodes are running in this server instance.
func (s *Server) numRaftNodes() int {
s.rnMu.Lock()
defer s.rnMu.Unlock()
s.rnMu.RLock()
defer s.rnMu.RUnlock()
return len(s.raftNodes)
}

Expand Down Expand Up @@ -1718,12 +1721,17 @@ func (n *raft) Stop() {
n.shutdown(false)
}

func (n *raft) WaitForStop() {
n.wg.Wait()
}

func (n *raft) Delete() {
n.shutdown(true)
}

func (n *raft) shutdown(shouldDelete bool) {
n.Lock()
defer n.Unlock()

// Returned swap value is the previous state. It looks counter-intuitive
// to do this atomic operation with the lock held, but we have to do so in
Expand All @@ -1734,15 +1742,21 @@ func (n *raft) shutdown(shouldDelete bool) {
if n.state.Swap(int32(Closed)) == int32(Closed) {
// If we get called again with shouldDelete, in case we were called first with Stop() cleanup
if shouldDelete {
n.wg.Wait()
if wal := n.wal; wal != nil {
wal.Delete()
}
os.RemoveAll(n.sd)
}
n.Unlock()
return
}

// Need to add this here as this shutdown() call is effectively another
// goroutine trying to manage the Raft group state. A call to WaitForStop()
// will therefore block on this completing too.
n.wg.Add(1)
defer n.wg.Done()

close(n.quit)
if c := n.c; c != nil {
var subs []*subscription
Expand Down Expand Up @@ -1771,7 +1785,6 @@ func (n *raft) shutdown(shouldDelete bool) {
q.unregister()
}
sd := n.sd
n.Unlock()

s.unregisterRaftNode(g)

Expand Down Expand Up @@ -1922,6 +1935,7 @@ func (n *raft) resetElectWithLock(et time.Duration) {
// the entire life of the Raft node once started.
func (n *raft) run() {
s := n.s
defer n.wg.Done()
defer s.grWG.Done()

// We want to wait for some routing to be enabled, so we will wait for
Expand Down Expand Up @@ -2799,7 +2813,11 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.progress[ar.peer] = indexUpdates
n.Unlock()

n.s.startGoRoutine(func() { n.runCatchup(ar, indexUpdates) })
n.wg.Add(1)
n.s.startGoRoutine(func() {
defer n.wg.Done()
n.runCatchup(ar, indexUpdates)
})
}

func (n *raft) loadEntry(index uint64) (*appendEntry, error) {
Expand Down

0 comments on commit ed7724b

Please sign in to comment.