Skip to content

Commit

Permalink
addressed PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ortuman committed Jan 7, 2022
1 parent 0cac009 commit 21a4374
Showing 1 changed file with 39 additions and 29 deletions.
68 changes: 39 additions & 29 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ type valueDesc struct {
}

type valueUpdate struct {
kvp KeyValuePair
codec codec.Codec
messageSz int
value []byte
codec codec.Codec
messageSize int
}

func (v valueDesc) Clone() (result valueDesc) {
Expand Down Expand Up @@ -965,56 +965,66 @@ func (m *KV) NotifyMsg(msg []byte) {
level.Error(m.logger).Log("msg", "failed to decode received value, unknown codec", "codec", kvPair.GetCodec())
return
}
var wCh chan valueUpdate

m.workersMu.Lock()
wCh = m.workersChannels[kvPair.Key]
if wCh == nil {
// spawn a key associated worker goroutine to process updates in background
wCh = make(chan valueUpdate, notifyMsgQueueSize)
go m.processValueUpdate(wCh)

m.workersChannels[kvPair.Key] = wCh
}
m.workersMu.Unlock()

ch := m.getKeyWorkerChannel(kvPair.Key)
select {
case wCh <- valueUpdate{kvp: kvPair, codec: codec, messageSz: len(msg)}:
case ch <- valueUpdate{
value: kvPair.Value,
codec: codec,
messageSize: len(msg),
}:
default:
m.numberOfDroppedMessages.Inc()
level.Warn(m.logger).Log("msg", "notify queue full, dropping message", "key", kvPair.Key, "queue_size", notifyMsgQueueSize)
level.Warn(m.logger).Log("msg", "notify queue full, dropping message", "key", kvPair.Key)
}
}

func (m *KV) processValueUpdate(workerCh <-chan valueUpdate) {
func (m *KV) getKeyWorkerChannel(key string) chan<- valueUpdate {
m.workersMu.Lock()
defer m.workersMu.Unlock()

ch := m.workersChannels[key]
if ch == nil {
// spawn a key associated worker goroutine to process updates in background
ch = make(chan valueUpdate, notifyMsgQueueSize)
go m.processValueUpdate(ch, key)

m.workersChannels[key] = ch
}
return ch
}

func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) {
for {
select {
case wData := <-workerCh:
// we have a ring update! Let's merge it with our version of the ring for given key
kvPair := wData.kvp

mod, version, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, wData.codec)
case update := <-workerCh:
// we have a value update! Let's merge it with our current version for given key
mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec)

changes := []string(nil)
if mod != nil {
changes = mod.MergeContent()
}

m.addReceivedMessage(message{
Time: time.Now(),
Size: wData.messageSz,
Pair: kvPair,
Time: time.Now(),
Size: update.messageSize,
Pair: KeyValuePair{
Key: key,
Value: update.value,
Codec: update.codec.CodecID(),
},
Version: version,
Changes: changes,
})

if err != nil {
level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err)
level.Error(m.logger).Log("msg", "failed to store received value", "key", key, "err", err)
} else if version > 0 {
m.notifyWatchers(kvPair.Key)
m.notifyWatchers(key)

// Don't resend original message, but only changes.
m.broadcastNewValue(kvPair.Key, mod, version, wData.codec)
m.broadcastNewValue(key, mod, version, update.codec)
}

case <-m.shutdown:
Expand Down

0 comments on commit 21a4374

Please sign in to comment.