From 6ba7c3b156945be2964ca29e076c91603fe5114d Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Fri, 26 Nov 2021 09:16:43 +0100 Subject: [PATCH] Memberlist: Optimise receive path by not cloning current ring state. (#76) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When memberlist is used as the backing store for the ring, processing updates can be costly if the ring state is very large. This is partly due to the ring state being cloned upon the arrival of every message. It is believed that this can be avoided. Additionally, the reduced memory allocations will reduce the amount of work required by the GC. The included benchmark shows a good improvement: Command: ``` dskit/bench$ go test -v -bench BenchmarkMemberlistReceiveWithRingDesc -benchmem -cpu 6 ``` ``` name old time/op new time/op delta MemberlistReceiveWithRingDesc-6 1.76ms ± 5% 1.01ms ± 6% -42.32% (p=0.000 n=10+9) name old alloc/op new alloc/op delta MemberlistReceiveWithRingDesc-6 566kB ± 0% 254kB ± 0% -55.09% (p=0.000 n=9+10) name old allocs/op new allocs/op delta MemberlistReceiveWithRingDesc-6 1.86k ± 0% 0.64k ± 0% -65.83% (p=0.000 n=10+10) ``` --- CHANGELOG.md | 3 +- kv/memberlist/memberlist_client.go | 11 ++- kv/memberlist/mergeable.go | 3 + ring/bench/ring_memberlist_test.go | 107 +++++++++++++++++++++++++++++ 4 files changed, 121 insertions(+), 3 deletions(-) create mode 100644 ring/bench/ring_memberlist_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index b906b7d35..f356d4caa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,5 +17,6 @@ * [ENHANCEMENT] Add grpcclient, grpcencoding and grpcutil packages. #39 * [ENHANCEMENT] Replace go-kit/kit/log with go-kit/log. #52 * [ENHANCEMENT] Add spanlogger package. #42 -* [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58 +* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76 +* [BUGFIX] spanlogger: Support multiple tenant IDs. #59 diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 1b21fa5c4..6ec467313 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -1184,7 +1184,10 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui m.storeMu.Lock() defer m.storeMu.Unlock() - curr := m.store[key].Clone() + // Note that we do not take a deep copy of curr.value here, it is modified in-place. + // This is safe because the entire function runs under the store lock; we do not return + // the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS). + curr := m.store[key] // if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set. if casVersion > 0 && curr.version != casVersion { return nil, 0, errVersionMismatch @@ -1206,7 +1209,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed)) // Remove tombstones from change too. If change turns out to be empty after this, - // we don't need to change local value either! + // we don't need to gossip the change. However, the local value will be always be updated. // // Note that "result" and "change" may actually be the same Mergeable. That is why we // call RemoveTombstones on "result" first, so that we get the correct metrics. Calling @@ -1224,6 +1227,10 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui codecID: codec.CodecID(), } + // The "changes" returned by Merge() can contain references to the "result" + // state. Therefore, make sure we clone it before releasing the lock. + change = change.Clone() + return change, newVersion, nil } diff --git a/kv/memberlist/mergeable.go b/kv/memberlist/mergeable.go index a013e3498..e636e8c80 100644 --- a/kv/memberlist/mergeable.go +++ b/kv/memberlist/mergeable.go @@ -13,6 +13,9 @@ type Mergeable interface { // result of the merge in the following text, we don't mean the return value ("change"), but the // end-state of receiver. That means Result of A.Merge(B) is end-state of A. // + // Memberlist-based KV store will keep the result even if Merge returned no change. Implementations should + // be careful about not changing logical value when returning empty change. + // // Idempotency: // Result of applying the same state "B" to state "A" (A.Merge(B)) multiple times has the same effect as // applying it only once. Only first Merge will return non-empty change. diff --git a/ring/bench/ring_memberlist_test.go b/ring/bench/ring_memberlist_test.go new file mode 100644 index 000000000..395ff0496 --- /dev/null +++ b/ring/bench/ring_memberlist_test.go @@ -0,0 +1,107 @@ +package bench + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/kv/codec" + "github.com/grafana/dskit/kv/memberlist" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/services" +) + +type dnsProviderMock struct { + resolved []string +} + +func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string) error { + p.resolved = addrs + return nil +} + +func (p dnsProviderMock) Addresses() []string { + return p.resolved +} + +func encodeMessage(b *testing.B, key string, d *ring.Desc) []byte { + c := ring.GetCodec() + val, err := c.Encode(d) + require.NoError(b, err) + + kvPair := memberlist.KeyValuePair{ + Key: key, + Value: val, + Codec: c.CodecID(), + } + + ser, err := kvPair.Marshal() + require.NoError(b, err) + return ser +} + +func generateUniqueTokens(ingester, numTokens int) []uint32 { + // Generate unique tokens without using ring.GenerateTokens in order to not + // rely on random number generation. Also, because generating unique tokens + // with GenerateTokens can be quite expensive, it pollutes the CPU profile + // to the point of being useless. + tokens := make([]uint32, numTokens) + for i := range tokens { + tokens[i] = uint32((ingester * 100000) + (i * 10)) + } + return tokens +} + +// Benchmark the memberlist receive path when it is being used as the ring backing store. +func BenchmarkMemberlistReceiveWithRingDesc(b *testing.B) { + c := ring.GetCodec() + + var cfg memberlist.KVConfig + flagext.DefaultValues(&cfg) + cfg.TCPTransport = memberlist.TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + } + cfg.Codecs = []codec.Codec{c} + + mkv := memberlist.NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(b, services.StartAndAwaitRunning(context.Background(), mkv)) + defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck + + // Build the initial ring state: + // - The ring isn't actually in use, so the fields such as address/zone are not important. + // - The number of keys in the store has no impact for this test, so simulate a single ring. + // - The number of instances in the ring does have a big impact. + const numInstances = 600 + const numTokens = 128 + { + initialDesc := ring.NewDesc() + for i := 0; i < numInstances; i++ { + tokens := generateUniqueTokens(i, numTokens) + initialDesc.AddIngester(fmt.Sprintf("instance-%d", i), "127.0.0.1", "zone", tokens, ring.ACTIVE, time.Now()) + } + // Send a single update to populate the store. + msg := encodeMessage(b, "ring", initialDesc) + mkv.NotifyMsg(msg) + } + + // Pre-encode some payloads. It's not significant what the payloads actually + // update in the ring, though it may be important for future optimisations. + testMsgs := make([][]byte, 100) + for i := range testMsgs { + testDesc := ring.NewDesc() + testDesc.AddIngester(fmt.Sprintf("instance-%d", i), "127.0.0.1", "zone", nil, ring.ACTIVE, time.Now()) + testMsgs[i] = encodeMessage(b, "ring", testDesc) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mkv.NotifyMsg(testMsgs[i%len(testMsgs)]) + } +}