Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memberlist: Optimise receive path by not cloning current ring state. #76

Merged
merged 14 commits into from
Nov 26, 2021
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
* [ENHANCEMENT] Add grpcclient, grpcencoding and grpcutil packages. #39
* [ENHANCEMENT] Replace go-kit/kit/log with go-kit/log. #52
* [ENHANCEMENT] Add spanlogger package. #42
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58
* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
11 changes: 9 additions & 2 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,10 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
m.storeMu.Lock()
defer m.storeMu.Unlock()

curr := m.store[key].Clone()
// Note that we do not take a deep copy of curr.value here, it is modified in-place.
// This is safe because the entire function runs under the store lock; we do not return
// the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS).
curr := m.store[key]
stevesg marked this conversation as resolved.
Show resolved Hide resolved
// if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set.
if casVersion > 0 && curr.version != casVersion {
return nil, 0, errVersionMismatch
Expand All @@ -1206,7 +1209,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed))

// Remove tombstones from change too. If change turns out to be empty after this,
// we don't need to change local value either!
// we don't need to gossip the change. However, the local value will be always be updated.
//
// Note that "result" and "change" may actually be the same Mergeable. That is why we
// call RemoveTombstones on "result" first, so that we get the correct metrics. Calling
Expand All @@ -1224,6 +1227,10 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
codecID: codec.CodecID(),
}

// The "changes" returned by Merge() can contain references to the "result"
// state. Therefore, make sure we clone it before releasing the lock.
change = change.Clone()

return change, newVersion, nil
}

Expand Down
3 changes: 3 additions & 0 deletions kv/memberlist/mergeable.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ type Mergeable interface {
// If merge doesn't result in any change, returns nil.
// Error can be returned if merging with given 'other' value is not possible.
//
// Implementations may modify the internal state even if a change is not returned,
// so long as the logical value does not change, i.e. the properties below are maintained.
//
stevesg marked this conversation as resolved.
Show resolved Hide resolved
// In order for state merging to work correctly, Merge function must have some properties. When talking about the
// result of the merge in the following text, we don't mean the return value ("change"), but the
// end-state of receiver. That means Result of A.Merge(B) is end-state of A.
Expand Down
107 changes: 107 additions & 0 deletions ring/bench/ring_memberlist_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package bench

import (
"context"
"fmt"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
)

type dnsProviderMock struct {
resolved []string
}

func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string) error {
p.resolved = addrs
return nil
}

func (p dnsProviderMock) Addresses() []string {
return p.resolved
}

func encodeMessage(b *testing.B, key string, d *ring.Desc) []byte {
c := ring.GetCodec()
val, err := c.Encode(d)
require.NoError(b, err)

kvPair := memberlist.KeyValuePair{
Key: key,
Value: val,
Codec: c.CodecID(),
}

ser, err := kvPair.Marshal()
require.NoError(b, err)
return ser
}

func generateUniqueTokens(ingester, numTokens int) []uint32 {
// Generate unique tokens without using ring.GenerateTokens in order to not
// rely on random number generation. Also, because generating unique tokens
// with GenerateTokens can be quite expensive, it pollutes the CPU profile
// to the point of being useless.
tokens := make([]uint32, numTokens)
for i := range tokens {
tokens[i] = uint32((ingester * 100000) + (i * 10))
}
return tokens
}

// Benchmark the memberlist receive path when it is being used as the ring backing store.
func BenchmarkMemberlistReceiveWithRingDesc(b *testing.B) {
c := ring.GetCodec()

var cfg memberlist.KVConfig
flagext.DefaultValues(&cfg)
cfg.TCPTransport = memberlist.TCPTransportConfig{
BindAddrs: []string{"localhost"},
}
cfg.Codecs = []codec.Codec{c}

mkv := memberlist.NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry())
require.NoError(b, services.StartAndAwaitRunning(context.Background(), mkv))
defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck

// Build the initial ring state:
// - The ring isn't actually in use, so the fields such as address/zone are not important.
// - The number of keys in the store has no impact for this test, so simulate a single ring.
// - The number of instances in the ring does have a big impact.
const numInstances = 600
const numTokens = 128
{
initialDesc := ring.NewDesc()
for i := 0; i < numInstances; i++ {
tokens := generateUniqueTokens(i, numTokens)
initialDesc.AddIngester(fmt.Sprintf("instance-%d", i), "127.0.0.1", "zone", tokens, ring.ACTIVE, time.Now())
}
// Send a single update to populate the store.
msg := encodeMessage(b, "ring", initialDesc)
mkv.NotifyMsg(msg)
}

// Pre-encode some payloads. It's not significant what the payloads actually
// update in the ring, though it may be important for future optimisations.
testMsgs := make([][]byte, 100)
for i := range testMsgs {
testDesc := ring.NewDesc()
testDesc.AddIngester(fmt.Sprintf("instance-%d", i), "127.0.0.1", "zone", nil, ring.ACTIVE, time.Now())
testMsgs[i] = encodeMessage(b, "ring", testDesc)
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
mkv.NotifyMsg(testMsgs[i%len(testMsgs)])
}
}