Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memberlist: Optimise receive path by not cloning current ring state. #76

Merged
merged 14 commits into from
Nov 26, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@
* [ENHANCEMENT] Add spanlogger package. #42
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
stevesg marked this conversation as resolved.
Show resolved Hide resolved
* [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58
* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #
stevesg marked this conversation as resolved.
Show resolved Hide resolved
96 changes: 96 additions & 0 deletions bench/ring_memberlist_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package bench
stevesg marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"fmt"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
)

type dnsProviderMock struct {
resolved []string
}

func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string) error {
p.resolved = addrs
return nil
}

func (p dnsProviderMock) Addresses() []string {
return p.resolved
}

func encodeMessage(b *testing.B, key string, d *ring.Desc) []byte {
c := ring.GetCodec()
val, err := c.Encode(d)
require.NoError(b, err)

kvPair := memberlist.KeyValuePair{
Key: key,
Value: val,
Codec: c.CodecID(),
}

ser, err := kvPair.Marshal()
require.NoError(b, err)
return ser
}

// Benchmark the memberlist receive path when it us being used as the ring backing store.
func BenchmarkMemberlistReceiveWithRingDesc(b *testing.B) {
c := ring.GetCodec()

var cfg memberlist.KVConfig
flagext.DefaultValues(&cfg)
cfg.TCPTransport = memberlist.TCPTransportConfig{
BindAddrs: []string{"localhost"},
}
cfg.Codecs = []codec.Codec{c}

mkv := memberlist.NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry())
require.NoError(b, services.StartAndAwaitRunning(context.Background(), mkv))
defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck

_, err := memberlist.NewClient(mkv, c)
require.NoError(b, err)

// Build the initial ring state:
// - The ring isn't actually in use, so the fields such as address/zone are not important.
// - The number of keys in the store has no impact for this test, so simulate a single ring.
// - The number of instances in the ring does have a big impact.
const numInstances = 600
{
initialDesc := ring.NewDesc()
for i := 0; i < numInstances; i++ {
initialDesc.AddIngester(fmt.Sprintf("instance-%d", i), "127.0.0.1", "zone", nil, ring.ACTIVE, time.Now())
stevesg marked this conversation as resolved.
Show resolved Hide resolved
}
// Send a single update to populate the store.
msg := encodeMessage(b, "ring", initialDesc)
mkv.NotifyMsg(msg)
}

// Pre-encode some payloads. It's not significant what the payloads actually
// update in the ring, though it may be important for future optimisations.
testMsgs := make([][]byte, 100)
for i := range testMsgs {
testDesc := ring.NewDesc()
testDesc.AddIngester(fmt.Sprintf("instance-%d", i), "127.0.0.1", "zone", nil, ring.ACTIVE, time.Now())
testMsgs[i] = encodeMessage(b, "ring", testDesc)
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
mkv.NotifyMsg(testMsgs[i%len(testMsgs)])
}
}
5 changes: 4 additions & 1 deletion kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,10 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
m.storeMu.Lock()
defer m.storeMu.Unlock()

curr := m.store[key].Clone()
// Note that we do not take a copy of the store value here, it is modified in-place.
stevesg marked this conversation as resolved.
Show resolved Hide resolved
// This is safe because the entire function runs under the store lock; we do not return
// the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS).
curr := m.store[key]
stevesg marked this conversation as resolved.
Show resolved Hide resolved
// if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set.
if casVersion > 0 && curr.version != casVersion {
return nil, 0, errVersionMismatch
Expand Down