diff --git a/CHANGELOG.md b/CHANGELOG.md index b906b7d35..f356d4caa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,5 +17,6 @@ * [ENHANCEMENT] Add grpcclient, grpcencoding and grpcutil packages. #39 * [ENHANCEMENT] Replace go-kit/kit/log with go-kit/log. #52 * [ENHANCEMENT] Add spanlogger package. #42 -* [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58 +* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76 +* [BUGFIX] spanlogger: Support multiple tenant IDs. #59 diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 1b21fa5c4..6ec467313 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -1184,7 +1184,10 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui m.storeMu.Lock() defer m.storeMu.Unlock() - curr := m.store[key].Clone() + // Note that we do not take a deep copy of curr.value here, it is modified in-place. + // This is safe because the entire function runs under the store lock; we do not return + // the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS). + curr := m.store[key] // if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set. if casVersion > 0 && curr.version != casVersion { return nil, 0, errVersionMismatch @@ -1206,7 +1209,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed)) // Remove tombstones from change too. If change turns out to be empty after this, - // we don't need to change local value either! + // we don't need to gossip the change. However, the local value will be always be updated. // // Note that "result" and "change" may actually be the same Mergeable. That is why we // call RemoveTombstones on "result" first, so that we get the correct metrics. Calling @@ -1224,6 +1227,10 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui codecID: codec.CodecID(), } + // The "changes" returned by Merge() can contain references to the "result" + // state. Therefore, make sure we clone it before releasing the lock. + change = change.Clone() + return change, newVersion, nil } diff --git a/kv/memberlist/mergeable.go b/kv/memberlist/mergeable.go index a013e3498..e636e8c80 100644 --- a/kv/memberlist/mergeable.go +++ b/kv/memberlist/mergeable.go @@ -13,6 +13,9 @@ type Mergeable interface { // result of the merge in the following text, we don't mean the return value ("change"), but the // end-state of receiver. That means Result of A.Merge(B) is end-state of A. // + // Memberlist-based KV store will keep the result even if Merge returned no change. Implementations should + // be careful about not changing logical value when returning empty change. + // // Idempotency: // Result of applying the same state "B" to state "A" (A.Merge(B)) multiple times has the same effect as // applying it only once. Only first Merge will return non-empty change. diff --git a/ring/bench/ring_memberlist_test.go b/ring/bench/ring_memberlist_test.go new file mode 100644 index 000000000..395ff0496 --- /dev/null +++ b/ring/bench/ring_memberlist_test.go @@ -0,0 +1,107 @@ +package bench + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/kv/codec" + "github.com/grafana/dskit/kv/memberlist" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/services" +) + +type dnsProviderMock struct { + resolved []string +} + +func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string) error { + p.resolved = addrs + return nil +} + +func (p dnsProviderMock) Addresses() []string { + return p.resolved +} + +func encodeMessage(b *testing.B, key string, d *ring.Desc) []byte { + c := ring.GetCodec() + val, err := c.Encode(d) + require.NoError(b, err) + + kvPair := memberlist.KeyValuePair{ + Key: key, + Value: val, + Codec: c.CodecID(), + } + + ser, err := kvPair.Marshal() + require.NoError(b, err) + return ser +} + +func generateUniqueTokens(ingester, numTokens int) []uint32 { + // Generate unique tokens without using ring.GenerateTokens in order to not + // rely on random number generation. Also, because generating unique tokens + // with GenerateTokens can be quite expensive, it pollutes the CPU profile + // to the point of being useless. + tokens := make([]uint32, numTokens) + for i := range tokens { + tokens[i] = uint32((ingester * 100000) + (i * 10)) + } + return tokens +} + +// Benchmark the memberlist receive path when it is being used as the ring backing store. +func BenchmarkMemberlistReceiveWithRingDesc(b *testing.B) { + c := ring.GetCodec() + + var cfg memberlist.KVConfig + flagext.DefaultValues(&cfg) + cfg.TCPTransport = memberlist.TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + } + cfg.Codecs = []codec.Codec{c} + + mkv := memberlist.NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(b, services.StartAndAwaitRunning(context.Background(), mkv)) + defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck + + // Build the initial ring state: + // - The ring isn't actually in use, so the fields such as address/zone are not important. + // - The number of keys in the store has no impact for this test, so simulate a single ring. + // - The number of instances in the ring does have a big impact. + const numInstances = 600 + const numTokens = 128 + { + initialDesc := ring.NewDesc() + for i := 0; i < numInstances; i++ { + tokens := generateUniqueTokens(i, numTokens) + initialDesc.AddIngester(fmt.Sprintf("instance-%d", i), "127.0.0.1", "zone", tokens, ring.ACTIVE, time.Now()) + } + // Send a single update to populate the store. + msg := encodeMessage(b, "ring", initialDesc) + mkv.NotifyMsg(msg) + } + + // Pre-encode some payloads. It's not significant what the payloads actually + // update in the ring, though it may be important for future optimisations. + testMsgs := make([][]byte, 100) + for i := range testMsgs { + testDesc := ring.NewDesc() + testDesc.AddIngester(fmt.Sprintf("instance-%d", i), "127.0.0.1", "zone", nil, ring.ACTIVE, time.Now()) + testMsgs[i] = encodeMessage(b, "ring", testDesc) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mkv.NotifyMsg(testMsgs[i%len(testMsgs)]) + } +}