Skip to content

Commit

Permalink
ring/util.go: make WaitRingStability more responsive
Browse files Browse the repository at this point in the history
The current method polls the ring member states at every second. However
given: 1) large number of members 2) member state changing after stability
reached from one valid state to another e.g. JOINING->ACTIVE. Some members
will change their state after stability, earlier then others. Since polling
takes 1 second, the remaining members may notice this quite late, thus starting
a new 2 second minStability interval. In testing this comes up frequently,
so for 20 members , reaching stability for all members can be over 30 seconds.

This patch changes the logic so that instead of polling, the algorithm notifies
WaitRingStability as soon as possible about changes, thus no time is wasted
by waiting when the stability is _already_ lost. For 20 members it reduced
the time from 30s to ~7s to reach stability.

It is possible to achieve the same result by reducing the polling period to
10ms - at least in the 20 member case, but being notified seems less
 arbitrary.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
  • Loading branch information
krajorama committed Dec 13, 2021
1 parent c026bbd commit 8a0fda0
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@
* [ENHANCEMENT] flagext: for cases such as `DeprecatedFlag()` that need a logger, add RegisterFlagsWithLogger. #80
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [ENHANCEMENT] ring/util.go: make WaitRingStability more responsive #95
7 changes: 7 additions & 0 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ type Ring struct {
// If set to nil, no caching is done (used by tests, and subrings).
shuffledSubringCache map[subringCacheKey]*Ring

statesWatcher watcher

memberOwnershipGaugeVec *prometheus.GaugeVec
numMembersGaugeVec *prometheus.GaugeVec
totalTokensGauge prometheus.Gauge
Expand Down Expand Up @@ -328,6 +330,11 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
}

rc := prevRing.RingCompare(ringDesc)

if rc != Equal {
r.statesWatcher.notify()
}

if rc == Equal || rc == EqualButStatesAndTimestamps {
// No need to update tokens or zones. Only states and timestamps
// have changed. (If Equal, nothing has changed, but that doesn't happen
Expand Down
21 changes: 14 additions & 7 deletions ring/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,27 +102,34 @@ func WaitRingStability(ctx context.Context, r *Ring, op Operation, minStability,

// Get the initial ring state.
ringLastState, _ := r.GetAllHealthy(op) // nolint:errcheck
ringLastStateTs := time.Now()

const pollingFrequency = time.Second
pollingTicker := time.NewTicker(pollingFrequency)
defer pollingTicker.Stop()
// This timer will be our state
stabilityTimer := time.NewTimer(minStability)

stateWatch, err := r.statesWatcher.Register()
if err != nil {
return err
}
defer r.statesWatcher.UnRegister()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-pollingTicker.C:
case <-stabilityTimer.C:
// We ignore the error because in case of error it will return an empty
// replication set which we use to compare with the previous state.
currRingState, _ := r.GetAllHealthy(op) // nolint:errcheck

if HasReplicationSetChanged(ringLastState, currRingState) {
ringLastState = currRingState
ringLastStateTs = time.Now()
} else if time.Since(ringLastStateTs) >= minStability {
stabilityTimer = time.NewTimer(minStability)
} else {
return nil
}
case <-stateWatch:
ringLastState, _ = r.GetAllHealthy(op) // nolint:errcheck
stabilityTimer = time.NewTimer(minStability)
}
}
}
Expand Down
48 changes: 48 additions & 0 deletions ring/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ring

import (
"fmt"
"sync"
)

type watcher struct {
mtx sync.Mutex
notifier chan struct{}
}

func (s *watcher) Register() (<-chan struct{}, error) {
s.mtx.Lock()
defer s.mtx.Unlock()

if s.notifier != nil {
return nil, fmt.Errorf("only one watcher allowed at a time")
}
s.notifier = make(chan struct{})
return s.notifier, nil
}

func (s *watcher) UnRegister() {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.notifier != nil {
close(s.notifier)
}
s.notifier = nil
}

// Handles notifying listeners about state change in the Ring.
// Assumes that the caller will check that there is meaningful change to the state.
func (s *watcher) notify() {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.notifier == nil {
return
}

select {
case s.notifier <- struct{}{}: // notify
return
default: // already notified - throttle
return
}
}
104 changes: 104 additions & 0 deletions ring/watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package ring

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestRegister(t *testing.T) {
w := watcher{}

ch, err := w.Register()
defer w.UnRegister()

assert.IsType(t, make(<-chan struct{}), ch, "should return a channel")
assert.NoError(t, err, "should not have error")
}

func TestDoubleRegister(t *testing.T) {
w := watcher{}
_, regErr := w.Register()
defer w.UnRegister()
ch, err := w.Register()

assert.NoError(t, regErr, "should be able to register")
assert.Nil(t, ch, "should not return channel")
assert.Error(t, err, "should have an error")
}

func TestRegisterAfterUnregister(t *testing.T) {
w := watcher{}
_, regErr := w.Register()
w.UnRegister()
ch, err := w.Register()
defer w.UnRegister()

assert.NoError(t, regErr, "should be able to register")
assert.IsType(t, make(<-chan struct{}), ch, "should return a channel")
assert.NoError(t, err, "should not have error")
}

func TestUnRegisterNone(t *testing.T) {
w := watcher{}
w.UnRegister() // does not block, crash
}

func TestNotifyNoRegister(t *testing.T) {
w := watcher{}
w.notify() // does not block, crash
}

func TestNotify(t *testing.T) {
w := watcher{}
ch, regErr := w.Register()
defer w.UnRegister()

received := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
<-ch
received <- struct{}{}
wg.Done()
}()
loop:
for {
select {
case <-received:
break loop
default:
w.notify()
}
}
wg.Wait()
assert.NoError(t, regErr, "should be able to register")
}

func TestNotifyNeverBlocks(t *testing.T) {
w := watcher{}
_, regErr := w.Register()
defer w.UnRegister()

var wg sync.WaitGroup
wg.Add(1)
go func() {
w.notify()
w.notify()
w.notify()
wg.Done()
}()
wg.Wait()
assert.NoError(t, regErr, "should be able to register")
}

func TestUnRegisterStopsNotify(t *testing.T) {
w := watcher{}
ch, regErr := w.Register()
w.UnRegister()
_, ok := <-ch

assert.NoError(t, regErr, "should be able to register")
assert.False(t, ok, "Channel should be closed")
}

0 comments on commit 8a0fda0

Please sign in to comment.