From 8a0fda0ced28252bcdab5e3296d6dad807c94c83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Mon, 13 Dec 2021 14:21:43 +0100 Subject: [PATCH] ring/util.go: make WaitRingStability more responsive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The current method polls the ring member states at every second. However given: 1) large number of members 2) member state changing after stability reached from one valid state to another e.g. JOINING->ACTIVE. Some members will change their state after stability, earlier then others. Since polling takes 1 second, the remaining members may notice this quite late, thus starting a new 2 second minStability interval. In testing this comes up frequently, so for 20 members , reaching stability for all members can be over 30 seconds. This patch changes the logic so that instead of polling, the algorithm notifies WaitRingStability as soon as possible about changes, thus no time is wasted by waiting when the stability is _already_ lost. For 20 members it reduced the time from 30s to ~7s to reach stability. It is possible to achieve the same result by reducing the polling period to 10ms - at least in the 20 member case, but being notified seems less arbitrary. Signed-off-by: György Krajcsovits --- CHANGELOG.md | 1 + ring/ring.go | 7 +++ ring/util.go | 21 ++++++--- ring/watcher.go | 48 ++++++++++++++++++++ ring/watcher_test.go | 104 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 174 insertions(+), 7 deletions(-) create mode 100644 ring/watcher.go create mode 100644 ring/watcher_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f50d722e..809d0a003 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,3 +24,4 @@ * [ENHANCEMENT] flagext: for cases such as `DeprecatedFlag()` that need a logger, add RegisterFlagsWithLogger. #80 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 +* [ENHANCEMENT] ring/util.go: make WaitRingStability more responsive #95 \ No newline at end of file diff --git a/ring/ring.go b/ring/ring.go index 63e3a547c..bb2c0390d 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -193,6 +193,8 @@ type Ring struct { // If set to nil, no caching is done (used by tests, and subrings). shuffledSubringCache map[subringCacheKey]*Ring + statesWatcher watcher + memberOwnershipGaugeVec *prometheus.GaugeVec numMembersGaugeVec *prometheus.GaugeVec totalTokensGauge prometheus.Gauge @@ -328,6 +330,11 @@ func (r *Ring) updateRingState(ringDesc *Desc) { } rc := prevRing.RingCompare(ringDesc) + + if rc != Equal { + r.statesWatcher.notify() + } + if rc == Equal || rc == EqualButStatesAndTimestamps { // No need to update tokens or zones. Only states and timestamps // have changed. (If Equal, nothing has changed, but that doesn't happen diff --git a/ring/util.go b/ring/util.go index a836aa2fc..2eaf3eb1d 100644 --- a/ring/util.go +++ b/ring/util.go @@ -102,27 +102,34 @@ func WaitRingStability(ctx context.Context, r *Ring, op Operation, minStability, // Get the initial ring state. ringLastState, _ := r.GetAllHealthy(op) // nolint:errcheck - ringLastStateTs := time.Now() - const pollingFrequency = time.Second - pollingTicker := time.NewTicker(pollingFrequency) - defer pollingTicker.Stop() + // This timer will be our state + stabilityTimer := time.NewTimer(minStability) + + stateWatch, err := r.statesWatcher.Register() + if err != nil { + return err + } + defer r.statesWatcher.UnRegister() for { select { case <-ctx.Done(): return ctx.Err() - case <-pollingTicker.C: + case <-stabilityTimer.C: // We ignore the error because in case of error it will return an empty // replication set which we use to compare with the previous state. currRingState, _ := r.GetAllHealthy(op) // nolint:errcheck if HasReplicationSetChanged(ringLastState, currRingState) { ringLastState = currRingState - ringLastStateTs = time.Now() - } else if time.Since(ringLastStateTs) >= minStability { + stabilityTimer = time.NewTimer(minStability) + } else { return nil } + case <-stateWatch: + ringLastState, _ = r.GetAllHealthy(op) // nolint:errcheck + stabilityTimer = time.NewTimer(minStability) } } } diff --git a/ring/watcher.go b/ring/watcher.go new file mode 100644 index 000000000..a7ec91510 --- /dev/null +++ b/ring/watcher.go @@ -0,0 +1,48 @@ +package ring + +import ( + "fmt" + "sync" +) + +type watcher struct { + mtx sync.Mutex + notifier chan struct{} +} + +func (s *watcher) Register() (<-chan struct{}, error) { + s.mtx.Lock() + defer s.mtx.Unlock() + + if s.notifier != nil { + return nil, fmt.Errorf("only one watcher allowed at a time") + } + s.notifier = make(chan struct{}) + return s.notifier, nil +} + +func (s *watcher) UnRegister() { + s.mtx.Lock() + defer s.mtx.Unlock() + if s.notifier != nil { + close(s.notifier) + } + s.notifier = nil +} + +// Handles notifying listeners about state change in the Ring. +// Assumes that the caller will check that there is meaningful change to the state. +func (s *watcher) notify() { + s.mtx.Lock() + defer s.mtx.Unlock() + if s.notifier == nil { + return + } + + select { + case s.notifier <- struct{}{}: // notify + return + default: // already notified - throttle + return + } +} diff --git a/ring/watcher_test.go b/ring/watcher_test.go new file mode 100644 index 000000000..2b5d67c93 --- /dev/null +++ b/ring/watcher_test.go @@ -0,0 +1,104 @@ +package ring + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRegister(t *testing.T) { + w := watcher{} + + ch, err := w.Register() + defer w.UnRegister() + + assert.IsType(t, make(<-chan struct{}), ch, "should return a channel") + assert.NoError(t, err, "should not have error") +} + +func TestDoubleRegister(t *testing.T) { + w := watcher{} + _, regErr := w.Register() + defer w.UnRegister() + ch, err := w.Register() + + assert.NoError(t, regErr, "should be able to register") + assert.Nil(t, ch, "should not return channel") + assert.Error(t, err, "should have an error") +} + +func TestRegisterAfterUnregister(t *testing.T) { + w := watcher{} + _, regErr := w.Register() + w.UnRegister() + ch, err := w.Register() + defer w.UnRegister() + + assert.NoError(t, regErr, "should be able to register") + assert.IsType(t, make(<-chan struct{}), ch, "should return a channel") + assert.NoError(t, err, "should not have error") +} + +func TestUnRegisterNone(t *testing.T) { + w := watcher{} + w.UnRegister() // does not block, crash +} + +func TestNotifyNoRegister(t *testing.T) { + w := watcher{} + w.notify() // does not block, crash +} + +func TestNotify(t *testing.T) { + w := watcher{} + ch, regErr := w.Register() + defer w.UnRegister() + + received := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + <-ch + received <- struct{}{} + wg.Done() + }() +loop: + for { + select { + case <-received: + break loop + default: + w.notify() + } + } + wg.Wait() + assert.NoError(t, regErr, "should be able to register") +} + +func TestNotifyNeverBlocks(t *testing.T) { + w := watcher{} + _, regErr := w.Register() + defer w.UnRegister() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + w.notify() + w.notify() + w.notify() + wg.Done() + }() + wg.Wait() + assert.NoError(t, regErr, "should be able to register") +} + +func TestUnRegisterStopsNotify(t *testing.T) { + w := watcher{} + ch, regErr := w.Register() + w.UnRegister() + _, ok := <-ch + + assert.NoError(t, regErr, "should be able to register") + assert.False(t, ok, "Channel should be closed") +}