diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cba7fa8d..79521c7f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ * [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58 * [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76 #77 #84 #91 #93 * [ENHANCEMENT] Memberlist: prepare the data to send on the write before starting counting the elapsed time for `-memberlist.packet-write-timeout`, in order to reduce chances we hit the timeout when sending a packet to other node. #89 +* [ENHANCEMENT] Memberlist: parallelize processing of messages received by memberlist. #110 * [ENHANCEMENT] flagext: for cases such as `DeprecatedFlag()` that need a logger, add RegisterFlagsWithLogger. #80 * [ENHANCEMENT] Added option to BasicLifecycler to keep instance in the ring when stopping. #97 * [ENHANCEMENT] Add WaitRingTokensStability function to ring, to be able to wait on ring stability excluding allowed state transitions. #95 diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 23c40ac76..57c08926f 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -27,6 +27,7 @@ import ( const ( maxCasRetries = 10 // max retries in CAS operation noChangeDetectedRetrySleep = time.Second // how long to sleep after no change was detected in CAS + notifyMsgQueueSize = 1024 // size of buffered channels to handle memberlist messages ) // Client implements kv.Client interface, by using memberlist.KV @@ -251,6 +252,10 @@ type KV struct { receivedMessagesSize int messageCounter int // Used to give each message in the sentMessages and receivedMessages a unique ID, for UI. + // Per-key value update workers + workersMu sync.Mutex + workersChannels map[string]chan valueUpdate + // closed on shutdown shutdown chan struct{} @@ -258,6 +263,7 @@ type KV struct { numberOfReceivedMessages prometheus.Counter totalSizeOfReceivedMessages prometheus.Counter numberOfInvalidReceivedMessages prometheus.Counter + numberOfDroppedMessages prometheus.Counter numberOfPulls prometheus.Counter numberOfPushes prometheus.Counter totalSizeOfPulls prometheus.Counter @@ -319,6 +325,12 @@ func (v ValueDesc) Clone() (result ValueDesc) { return } +type valueUpdate struct { + value []byte + codec codec.Codec + messageSize int +} + func (v ValueDesc) String() string { return fmt.Sprintf("version: %d, codec: %s", v.Version, v.CodecID) } @@ -339,17 +351,17 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace mlkv := &KV{ - cfg: cfg, - logger: logger, - registerer: registerer, - provider: dnsProvider, - - store: make(map[string]ValueDesc), - codecs: make(map[string]codec.Codec), - watchers: make(map[string][]chan string), - prefixWatchers: make(map[string][]chan string), - shutdown: make(chan struct{}), - maxCasRetries: maxCasRetries, + cfg: cfg, + logger: logger, + registerer: registerer, + provider: dnsProvider, + store: make(map[string]ValueDesc), + codecs: make(map[string]codec.Codec), + watchers: make(map[string][]chan string), + prefixWatchers: make(map[string][]chan string), + workersChannels: make(map[string]chan valueUpdate), + shutdown: make(chan struct{}), + maxCasRetries: maxCasRetries, } mlkv.createAndRegisterMetrics() @@ -429,7 +441,6 @@ func (m *KV) starting(_ context.Context) error { if err != nil { return fmt.Errorf("failed to create memberlist: %v", err) } - // Finish delegate initialization. m.memberlist = list m.broadcasts = &memberlist.TransmitLimitedQueue{ @@ -931,8 +942,6 @@ func (m *KV) NodeMeta(limit int) []byte { // NotifyMsg is method from Memberlist Delegate interface // Called when single message is received, i.e. what our broadcastNewValue has sent. func (m *KV) NotifyMsg(msg []byte) { - m.initWG.Wait() - m.numberOfReceivedMessages.Inc() m.totalSizeOfReceivedMessages.Add(float64(len(msg))) @@ -957,29 +966,67 @@ func (m *KV) NotifyMsg(msg []byte) { return } - // we have a ring update! Let's merge it with our version of the ring for given key - mod, version, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec) + ch := m.getKeyWorkerChannel(kvPair.Key) + select { + case ch <- valueUpdate{value: kvPair.Value, codec: codec, messageSize: len(msg)}: + default: + m.numberOfDroppedMessages.Inc() + level.Warn(m.logger).Log("msg", "notify queue full, dropping message", "key", kvPair.Key) + } +} + +func (m *KV) getKeyWorkerChannel(key string) chan<- valueUpdate { + m.workersMu.Lock() + defer m.workersMu.Unlock() + + ch := m.workersChannels[key] + if ch == nil { + // spawn a key associated worker goroutine to process updates in background + ch = make(chan valueUpdate, notifyMsgQueueSize) + go m.processValueUpdate(ch, key) - changes := []string(nil) - if mod != nil { - changes = mod.MergeContent() + m.workersChannels[key] = ch } + return ch +} - m.addReceivedMessage(Message{ - Time: time.Now(), - Size: len(msg), - Pair: kvPair, - Version: version, - Changes: changes, - }) +func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { + for { + select { + case update := <-workerCh: + // we have a value update! Let's merge it with our current version for given key + mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec) - if err != nil { - level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err) - } else if version > 0 { - m.notifyWatchers(kvPair.Key) + changes := []string(nil) + if mod != nil { + changes = mod.MergeContent() + } + + m.addReceivedMessage(Message{ + Time: time.Now(), + Size: update.messageSize, + Pair: KeyValuePair{ + Key: key, + Value: update.value, + Codec: update.codec.CodecID(), + }, + Version: version, + Changes: changes, + }) - // Don't resend original message, but only changes. - m.broadcastNewValue(kvPair.Key, mod, version, codec) + if err != nil { + level.Error(m.logger).Log("msg", "failed to store received value", "key", key, "err", err) + } else if version > 0 { + m.notifyWatchers(key) + + // Don't resend original message, but only changes. + m.broadcastNewValue(key, mod, version, update.codec) + } + + case <-m.shutdown: + // stop running on shutdown + return + } } } diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 28fe80b36..2daf89dee 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -1125,6 +1125,9 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) { "c": {Timestamp: now.Unix(), State: ACTIVE}, }})) + // Wait until KV update has been processed. + time.Sleep(time.Millisecond * 100) + // Check two things here: // 1) state of value in KV store // 2) broadcast message only has changed members @@ -1220,6 +1223,9 @@ func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) { kv.NotifyMsg(marshalKeyValuePair(t, key, codec, tc.msgToSend)) + // Wait until KV update has been processed. + time.Sleep(time.Millisecond * 100) + bs := kv.GetBroadcasts(0, math.MaxInt32) if tc.broadcastMessage == nil { require.Equal(t, 0, len(bs), "expected no broadcast message") diff --git a/kv/memberlist/metrics.go b/kv/memberlist/metrics.go index c7d3f01c2..9ab56a662 100644 --- a/kv/memberlist/metrics.go +++ b/kv/memberlist/metrics.go @@ -36,6 +36,13 @@ func (m *KV) createAndRegisterMetrics() { Help: "Number of received broadcast user messages that were invalid. Hopefully 0.", }) + m.numberOfDroppedMessages = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ + Namespace: m.cfg.MetricsNamespace, + Subsystem: subsystem, + Name: "received_broadcasts_dropped_total", + Help: "Number of received broadcast user messages that were dropped. Hopefully 0.", + }) + m.numberOfPushes = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem,