Skip to content

Commit

Permalink
Memberlist: Optimise receive path by not cloning current ring state. (#…
Browse files Browse the repository at this point in the history
…76)

When memberlist is used as the backing store for the ring, processing updates
can be costly if the ring state is very large. This is partly due to the ring
state being cloned upon the arrival of every message. It is believed that this
can be avoided. Additionally, the reduced memory allocations will reduce the
amount of work required by the GC.

The included benchmark shows a good improvement:

Command:
```
dskit/bench$ go test -v -bench BenchmarkMemberlistReceiveWithRingDesc -benchmem -cpu 6
```

```
name                             old time/op    new time/op    delta
MemberlistReceiveWithRingDesc-6    1.76ms ± 5%    1.01ms ± 6%  -42.32%  (p=0.000 n=10+9)

name                             old alloc/op   new alloc/op   delta
MemberlistReceiveWithRingDesc-6     566kB ± 0%     254kB ± 0%  -55.09%  (p=0.000 n=9+10)

name                             old allocs/op  new allocs/op  delta
MemberlistReceiveWithRingDesc-6     1.86k ± 0%     0.64k ± 0%  -65.83%  (p=0.000 n=10+10)
```
  • Loading branch information
stevesg committed Nov 26, 2021
1 parent 08e35ac commit 6ba7c3b
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 3 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
* [ENHANCEMENT] Add grpcclient, grpcencoding and grpcutil packages. #39
* [ENHANCEMENT] Replace go-kit/kit/log with go-kit/log. #52
* [ENHANCEMENT] Add spanlogger package. #42
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58
* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
11 changes: 9 additions & 2 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,10 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
m.storeMu.Lock()
defer m.storeMu.Unlock()

curr := m.store[key].Clone()
// Note that we do not take a deep copy of curr.value here, it is modified in-place.
// This is safe because the entire function runs under the store lock; we do not return
// the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS).
curr := m.store[key]
// if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set.
if casVersion > 0 && curr.version != casVersion {
return nil, 0, errVersionMismatch
Expand All @@ -1206,7 +1209,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed))

// Remove tombstones from change too. If change turns out to be empty after this,
// we don't need to change local value either!
// we don't need to gossip the change. However, the local value will be always be updated.
//
// Note that "result" and "change" may actually be the same Mergeable. That is why we
// call RemoveTombstones on "result" first, so that we get the correct metrics. Calling
Expand All @@ -1224,6 +1227,10 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
codecID: codec.CodecID(),
}

// The "changes" returned by Merge() can contain references to the "result"
// state. Therefore, make sure we clone it before releasing the lock.
change = change.Clone()

return change, newVersion, nil
}

Expand Down
3 changes: 3 additions & 0 deletions kv/memberlist/mergeable.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ type Mergeable interface {
// result of the merge in the following text, we don't mean the return value ("change"), but the
// end-state of receiver. That means Result of A.Merge(B) is end-state of A.
//
// Memberlist-based KV store will keep the result even if Merge returned no change. Implementations should
// be careful about not changing logical value when returning empty change.
//
// Idempotency:
// Result of applying the same state "B" to state "A" (A.Merge(B)) multiple times has the same effect as
// applying it only once. Only first Merge will return non-empty change.
Expand Down
107 changes: 107 additions & 0 deletions ring/bench/ring_memberlist_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package bench

import (
"context"
"fmt"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
)

type dnsProviderMock struct {
resolved []string
}

func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string) error {
p.resolved = addrs
return nil
}

func (p dnsProviderMock) Addresses() []string {
return p.resolved
}

func encodeMessage(b *testing.B, key string, d *ring.Desc) []byte {
c := ring.GetCodec()
val, err := c.Encode(d)
require.NoError(b, err)

kvPair := memberlist.KeyValuePair{
Key: key,
Value: val,
Codec: c.CodecID(),
}

ser, err := kvPair.Marshal()
require.NoError(b, err)
return ser
}

func generateUniqueTokens(ingester, numTokens int) []uint32 {
// Generate unique tokens without using ring.GenerateTokens in order to not
// rely on random number generation. Also, because generating unique tokens
// with GenerateTokens can be quite expensive, it pollutes the CPU profile
// to the point of being useless.
tokens := make([]uint32, numTokens)
for i := range tokens {
tokens[i] = uint32((ingester * 100000) + (i * 10))
}
return tokens
}

// Benchmark the memberlist receive path when it is being used as the ring backing store.
func BenchmarkMemberlistReceiveWithRingDesc(b *testing.B) {
c := ring.GetCodec()

var cfg memberlist.KVConfig
flagext.DefaultValues(&cfg)
cfg.TCPTransport = memberlist.TCPTransportConfig{
BindAddrs: []string{"localhost"},
}
cfg.Codecs = []codec.Codec{c}

mkv := memberlist.NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry())
require.NoError(b, services.StartAndAwaitRunning(context.Background(), mkv))
defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck

// Build the initial ring state:
// - The ring isn't actually in use, so the fields such as address/zone are not important.
// - The number of keys in the store has no impact for this test, so simulate a single ring.
// - The number of instances in the ring does have a big impact.
const numInstances = 600
const numTokens = 128
{
initialDesc := ring.NewDesc()
for i := 0; i < numInstances; i++ {
tokens := generateUniqueTokens(i, numTokens)
initialDesc.AddIngester(fmt.Sprintf("instance-%d", i), "127.0.0.1", "zone", tokens, ring.ACTIVE, time.Now())
}
// Send a single update to populate the store.
msg := encodeMessage(b, "ring", initialDesc)
mkv.NotifyMsg(msg)
}

// Pre-encode some payloads. It's not significant what the payloads actually
// update in the ring, though it may be important for future optimisations.
testMsgs := make([][]byte, 100)
for i := range testMsgs {
testDesc := ring.NewDesc()
testDesc.AddIngester(fmt.Sprintf("instance-%d", i), "127.0.0.1", "zone", nil, ring.ACTIVE, time.Now())
testMsgs[i] = encodeMessage(b, "ring", testDesc)
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
mkv.NotifyMsg(testMsgs[i%len(testMsgs)])
}
}

0 comments on commit 6ba7c3b

Please sign in to comment.