Skip to content

Commit

Permalink
parallelize memberlist notified message processing
Browse files Browse the repository at this point in the history
Notified messages in KV memberlist are now processed asynchronously by a worker pool.
This will facilitate vertical scaling in conditions where UDP packet pressure is high due to a high number of instances of the memberlist.

Some unit test had been additionally tweaked to support the new async model.
  • Loading branch information
ortuman committed May 6, 2022
1 parent 4d72380 commit c9521c3
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58
* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76 #77 #84 #91 #93
* [ENHANCEMENT] Memberlist: prepare the data to send on the write before starting counting the elapsed time for `-memberlist.packet-write-timeout`, in order to reduce chances we hit the timeout when sending a packet to other node. #89
* [ENHANCEMENT] Memberlist: parallelize processing of messages received by memberlist. #110
* [ENHANCEMENT] flagext: for cases such as `DeprecatedFlag()` that need a logger, add RegisterFlagsWithLogger. #80
* [ENHANCEMENT] Added option to BasicLifecycler to keep instance in the ring when stopping. #97
* [ENHANCEMENT] Add WaitRingTokensStability function to ring, to be able to wait on ring stability excluding allowed state transitions. #95
Expand Down
111 changes: 79 additions & 32 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
const (
maxCasRetries = 10 // max retries in CAS operation
noChangeDetectedRetrySleep = time.Second // how long to sleep after no change was detected in CAS
notifyMsgQueueSize = 1024 // size of buffered channels to handle memberlist messages
)

// Client implements kv.Client interface, by using memberlist.KV
Expand Down Expand Up @@ -251,13 +252,18 @@ type KV struct {
receivedMessagesSize int
messageCounter int // Used to give each message in the sentMessages and receivedMessages a unique ID, for UI.

// Per-key value update workers
workersMu sync.Mutex
workersChannels map[string]chan valueUpdate

// closed on shutdown
shutdown chan struct{}

// metrics
numberOfReceivedMessages prometheus.Counter
totalSizeOfReceivedMessages prometheus.Counter
numberOfInvalidReceivedMessages prometheus.Counter
numberOfDroppedMessages prometheus.Counter
numberOfPulls prometheus.Counter
numberOfPushes prometheus.Counter
totalSizeOfPulls prometheus.Counter
Expand Down Expand Up @@ -319,6 +325,12 @@ func (v ValueDesc) Clone() (result ValueDesc) {
return
}

type valueUpdate struct {
value []byte
codec codec.Codec
messageSize int
}

func (v ValueDesc) String() string {
return fmt.Sprintf("version: %d, codec: %s", v.Version, v.CodecID)
}
Expand All @@ -339,17 +351,17 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer
cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace

mlkv := &KV{
cfg: cfg,
logger: logger,
registerer: registerer,
provider: dnsProvider,

store: make(map[string]ValueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
prefixWatchers: make(map[string][]chan string),
shutdown: make(chan struct{}),
maxCasRetries: maxCasRetries,
cfg: cfg,
logger: logger,
registerer: registerer,
provider: dnsProvider,
store: make(map[string]ValueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
prefixWatchers: make(map[string][]chan string),
workersChannels: make(map[string]chan valueUpdate),
shutdown: make(chan struct{}),
maxCasRetries: maxCasRetries,
}

mlkv.createAndRegisterMetrics()
Expand Down Expand Up @@ -429,7 +441,6 @@ func (m *KV) starting(_ context.Context) error {
if err != nil {
return fmt.Errorf("failed to create memberlist: %v", err)
}

// Finish delegate initialization.
m.memberlist = list
m.broadcasts = &memberlist.TransmitLimitedQueue{
Expand Down Expand Up @@ -931,8 +942,6 @@ func (m *KV) NodeMeta(limit int) []byte {
// NotifyMsg is method from Memberlist Delegate interface
// Called when single message is received, i.e. what our broadcastNewValue has sent.
func (m *KV) NotifyMsg(msg []byte) {
m.initWG.Wait()

m.numberOfReceivedMessages.Inc()
m.totalSizeOfReceivedMessages.Add(float64(len(msg)))

Expand All @@ -957,29 +966,67 @@ func (m *KV) NotifyMsg(msg []byte) {
return
}

// we have a ring update! Let's merge it with our version of the ring for given key
mod, version, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec)
ch := m.getKeyWorkerChannel(kvPair.Key)
select {
case ch <- valueUpdate{value: kvPair.Value, codec: codec, messageSize: len(msg)}:
default:
m.numberOfDroppedMessages.Inc()
level.Warn(m.logger).Log("msg", "notify queue full, dropping message", "key", kvPair.Key)
}
}

func (m *KV) getKeyWorkerChannel(key string) chan<- valueUpdate {
m.workersMu.Lock()
defer m.workersMu.Unlock()

ch := m.workersChannels[key]
if ch == nil {
// spawn a key associated worker goroutine to process updates in background
ch = make(chan valueUpdate, notifyMsgQueueSize)
go m.processValueUpdate(ch, key)

changes := []string(nil)
if mod != nil {
changes = mod.MergeContent()
m.workersChannels[key] = ch
}
return ch
}

m.addReceivedMessage(Message{
Time: time.Now(),
Size: len(msg),
Pair: kvPair,
Version: version,
Changes: changes,
})
func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) {
for {
select {
case update := <-workerCh:
// we have a value update! Let's merge it with our current version for given key
mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec)

if err != nil {
level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err)
} else if version > 0 {
m.notifyWatchers(kvPair.Key)
changes := []string(nil)
if mod != nil {
changes = mod.MergeContent()
}

m.addReceivedMessage(Message{
Time: time.Now(),
Size: update.messageSize,
Pair: KeyValuePair{
Key: key,
Value: update.value,
Codec: update.codec.CodecID(),
},
Version: version,
Changes: changes,
})

// Don't resend original message, but only changes.
m.broadcastNewValue(kvPair.Key, mod, version, codec)
if err != nil {
level.Error(m.logger).Log("msg", "failed to store received value", "key", key, "err", err)
} else if version > 0 {
m.notifyWatchers(key)

// Don't resend original message, but only changes.
m.broadcastNewValue(key, mod, version, update.codec)
}

case <-m.shutdown:
// stop running on shutdown
return
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,9 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) {
"c": {Timestamp: now.Unix(), State: ACTIVE},
}}))

// Wait until KV update has been processed.
time.Sleep(time.Millisecond * 100)

// Check two things here:
// 1) state of value in KV store
// 2) broadcast message only has changed members
Expand Down Expand Up @@ -1220,6 +1223,9 @@ func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) {

kv.NotifyMsg(marshalKeyValuePair(t, key, codec, tc.msgToSend))

// Wait until KV update has been processed.
time.Sleep(time.Millisecond * 100)

bs := kv.GetBroadcasts(0, math.MaxInt32)
if tc.broadcastMessage == nil {
require.Equal(t, 0, len(bs), "expected no broadcast message")
Expand Down
7 changes: 7 additions & 0 deletions kv/memberlist/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ func (m *KV) createAndRegisterMetrics() {
Help: "Number of received broadcast user messages that were invalid. Hopefully 0.",
})

m.numberOfDroppedMessages = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: "received_broadcasts_dropped_total",
Help: "Number of received broadcast user messages that were dropped. Hopefully 0.",
})

m.numberOfPushes = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Expand Down

0 comments on commit c9521c3

Please sign in to comment.