Skip to content

Commit

Permalink
Optimise memberlist kv store access by storing data unencoded. (corte…
Browse files Browse the repository at this point in the history
…xproject#4345)

* Optimise memberlist kv store access by storing data unencoded.

The following profile data was taken from running 50 idle ingesters with
memberlist, with almost everything at default values (5s heartbeats):

```
52.16% mergeBytesValueForKey
+- 52.16% mergeValueForKey
   +- 47.84% computeNewValue
      +- 27.24% codec Proto Decode
      +- 26.25% mergeWithTime
```

It is apparent from the this that a lot of time is spent on the memberlist
receive path, as might be expected, specifically, the merging of the update
into the current state. The cost however is not in decoding the incoming
states (occurs in `mergeBytesValueForKey` before `mergeValueForKey`), but
in fact decoding _current state_ of the value in the store (as it is stored
encoded). The ring state was measured at 123K (50 ingesters), so it makes
sense that decoding could be costly.

This can be avoided by storing the value in it's decoded `Mergeable` form.
When doing this, care has to be taken to deep copy the value when
accessed, as it is modified in place before being updated in the store,
and accessed outside the store mutex.

Note a side effect of this change is that is no longer straightforward
to expose the `memberlist_kv_store_value_bytes` metric, as this reported
the size of the encoded data, therefore it has been removed.

Signed-off-by: Steve Simpson <steve.simpson@grafana.com>

* Typo.

Signed-off-by: Steve Simpson <steve.simpson@grafana.com>

* Review comments.

Signed-off-by: Steve Simpson <steve.simpson@grafana.com>
Signed-off-by: Alvin Lin <alvinlin@amazon.com>
  • Loading branch information
stevesg authored and alvinlin123 committed Jan 14, 2022
1 parent 9ceb7d6 commit 6418248
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 80 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [FEATURE] Ruler: Add new `-ruler.query-stats-enabled` which when enabled will report the `cortex_ruler_query_seconds_total` as a per-user metric that tracks the sum of the wall time of executing queries in the ruler in seconds. #4317

* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
* [CHANGE] Memberlist: the `memberlist_kv_store_value_bytes` has been removed due to values no longer being stored in-memory as encoded bytes. #4345
* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
* [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341
* [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342
Expand All @@ -13,6 +14,7 @@
* `-alertmanager.sharding-ring.heartbeat-timeout`
* `-compactor.ring.heartbeat-timeout`
* `-store-gateway.sharding-ring.heartbeat-timeout`
* [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4318

Expand Down
50 changes: 28 additions & 22 deletions pkg/ring/kv/memberlist/kv_init_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/hashicorp/memberlist"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)
Expand Down Expand Up @@ -106,12 +105,12 @@ func (kvs *KVInitService) ServeHTTP(w http.ResponseWriter, req *http.Request) {

if err := req.ParseForm(); err == nil {
if req.Form[downloadKeyParam] != nil {
downloadKey(w, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest.
downloadKey(w, kv, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest.
return
}

if req.Form[viewKeyParam] != nil {
viewKey(w, kv, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req))
viewKey(w, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req))
return
}

Expand Down Expand Up @@ -179,30 +178,25 @@ func viewMessage(w http.ResponseWriter, kv *KV, msg message, format string) {
return
}

formatValue(w, c, msg.Pair.Value, format)
val, err := c.Decode(msg.Pair.Value)
if err != nil {
http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError)
return
}

formatValue(w, val, format)
}

func viewKey(w http.ResponseWriter, kv *KV, store map[string]valueDesc, key string, format string) {
func viewKey(w http.ResponseWriter, store map[string]valueDesc, key string, format string) {
if store[key].value == nil {
http.Error(w, "value not found", http.StatusNotFound)
return
}

c := kv.GetCodec(store[key].codecID)
if c == nil {
http.Error(w, "codec not found", http.StatusNotFound)
return
}

formatValue(w, c, store[key].value, format)
formatValue(w, store[key].value, format)
}

func formatValue(w http.ResponseWriter, codec codec.Codec, value []byte, format string) {
val, err := codec.Decode(value)
if err != nil {
http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError)
return
}
func formatValue(w http.ResponseWriter, val interface{}, format string) {

w.WriteHeader(200)
w.Header().Add("content-type", "text/plain")
Expand All @@ -214,7 +208,7 @@ func formatValue(w http.ResponseWriter, codec codec.Codec, value []byte, format
enc.SetIndent("", " ")
}

err = enc.Encode(val)
err := enc.Encode(val)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
Expand All @@ -224,22 +218,34 @@ func formatValue(w http.ResponseWriter, codec codec.Codec, value []byte, format
}
}

func downloadKey(w http.ResponseWriter, store map[string]valueDesc, key string) {
func downloadKey(w http.ResponseWriter, kv *KV, store map[string]valueDesc, key string) {
if store[key].value == nil {
http.Error(w, "value not found", http.StatusNotFound)
return
}

val := store[key]

c := kv.GetCodec(store[key].codecID)
if c == nil {
http.Error(w, "codec not found", http.StatusNotFound)
return
}

encoded, err := c.Encode(val.value)
if err != nil {
http.Error(w, fmt.Sprintf("failed to encode: %v", err), http.StatusInternalServerError)
return
}

w.Header().Add("content-type", "application/octet-stream")
// Set content-length so that client knows whether it has received full response or not.
w.Header().Add("content-length", strconv.Itoa(len(val.value)))
w.Header().Add("content-length", strconv.Itoa(len(encoded)))
w.Header().Add("content-disposition", fmt.Sprintf("attachment; filename=%d-%s", val.version, key))
w.WriteHeader(200)

// Ignore errors, we cannot do anything about them.
_, _ = w.Write(val.value)
_, _ = w.Write(encoded)
}

type pageData struct {
Expand Down
86 changes: 38 additions & 48 deletions pkg/ring/kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ type KV struct {
watchPrefixDroppedNotifications *prometheus.CounterVec

storeValuesDesc *prometheus.Desc
storeSizesDesc *prometheus.Desc
storeTombstones *prometheus.GaugeVec
storeRemovedTombstones *prometheus.CounterVec

Expand All @@ -292,9 +291,11 @@ type message struct {
}

type valueDesc struct {
// We store bytes here. Reason is that clients calling CAS function will modify the object in place,
// but unless CAS succeeds, we don't want those modifications to be visible.
value []byte
// We store the decoded value here to prevent decoding the entire state for every
// update we receive. Whilst the updates are small and fast to decode,
// the total state can be quite large.
// The CAS function is passed a deep copy because it modifies in-place.
value Mergeable

// version (local only) is used to keep track of what we're gossiping about, and invalidate old messages
version uint
Expand All @@ -303,8 +304,16 @@ type valueDesc struct {
codecID string
}

func (v valueDesc) Clone() (result valueDesc) {
result = v
if v.value != nil {
result.value = v.value.Clone()
}
return
}

func (v valueDesc) String() string {
return fmt.Sprintf("size: %d, version: %d, codec: %s", len(v.value), v.version, v.codecID)
return fmt.Sprintf("version: %d, codec: %s", v.version, v.codecID)
}

var (
Expand Down Expand Up @@ -612,24 +621,16 @@ func (m *KV) Get(key string, codec codec.Codec) (interface{}, error) {
// Returns current value with removed tombstones.
func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint, err error) {
m.storeMu.Lock()
v := m.store[key]
v := m.store[key].Clone()
m.storeMu.Unlock()

out = nil
if v.value != nil {
out, err = codec.Decode(v.value)
if err != nil {
return nil, 0, err
}

if mr, ok := out.(Mergeable); ok {
// remove ALL tombstones before returning to client.
// No need for clients to see them.
_, _ = mr.RemoveTombstones(time.Time{})
}
// remove ALL tombstones before returning to client.
// No need for clients to see them.
_, _ = v.value.RemoveTombstones(time.Time{})
}

return out, v.version, nil
return v.value, v.version, nil
}

// WatchKey watches for value changes for given key. When value changes, 'f' function is called with the
Expand Down Expand Up @@ -1043,9 +1044,21 @@ func (m *KV) LocalState(join bool) []byte {
continue
}

codec := m.GetCodec(val.codecID)
if codec == nil {
level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.codecID, "key", key)
continue
}

encoded, err := codec.Encode(val.value)
if err != nil {
level.Error(m.logger).Log("msg", "failed to encode remote state", "err", err)
continue
}

kvPair.Reset()
kvPair.Key = key
kvPair.Value = val.value
kvPair.Value = encoded
kvPair.Codec = val.codecID

ser, err := kvPair.Marshal()
Expand All @@ -1055,7 +1068,7 @@ func (m *KV) LocalState(join bool) []byte {
}

if uint(len(ser)) > math.MaxUint32 {
level.Error(m.logger).Log("msg", "value too long", "key", key, "value_length", len(val.value))
level.Error(m.logger).Log("msg", "value too long", "key", key, "value_length", len(encoded))
continue
}

Expand Down Expand Up @@ -1177,12 +1190,12 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
m.storeMu.Lock()
defer m.storeMu.Unlock()

curr := m.store[key]
curr := m.store[key].Clone()
// if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set.
if casVersion > 0 && curr.version != casVersion {
return nil, 0, errVersionMismatch
}
result, change, err := computeNewValue(incomingValue, curr.value, codec, casVersion > 0)
result, change, err := computeNewValue(incomingValue, curr.value, casVersion > 0)
if err != nil {
return nil, 0, err
}
Expand All @@ -1199,14 +1212,9 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed))
}

encoded, err := codec.Encode(result)
if err != nil {
return nil, 0, fmt.Errorf("failed to encode merged result: %v", err)
}

newVersion := curr.version + 1
m.store[key] = valueDesc{
value: encoded,
value: result,
version: newVersion,
codecID: codec.CodecID(),
}
Expand All @@ -1215,25 +1223,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
}

// returns [result, change, error]
func computeNewValue(incoming Mergeable, stored []byte, c codec.Codec, cas bool) (Mergeable, Mergeable, error) {
if len(stored) == 0 {
return incoming, incoming, nil
}

old, err := c.Decode(stored)
if err != nil {
return incoming, incoming, fmt.Errorf("failed to decode stored value: %v", err)
}

if old == nil {
return incoming, incoming, nil
}

oldVal, ok := old.(Mergeable)
if !ok {
return incoming, incoming, fmt.Errorf("stored value is not Mergeable, got %T", old)
}

func computeNewValue(incoming Mergeable, oldVal Mergeable, cas bool) (Mergeable, Mergeable, error) {
if oldVal == nil {
return incoming, incoming, nil
}
Expand All @@ -1249,7 +1239,7 @@ func (m *KV) storeCopy() map[string]valueDesc {

result := make(map[string]valueDesc, len(m.store))
for k, v := range m.store {
result[k] = v
result[k] = v.Clone()
}
return result
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/ring/kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,26 @@ func (d *data) RemoveTombstones(limit time.Time) (_, _ int) {
return
}

func (m member) clone() member {
out := member{
Timestamp: m.Timestamp,
Tokens: make([]uint32, len(m.Tokens)),
State: m.State,
}
copy(out.Tokens, m.Tokens)
return out
}

func (d *data) Clone() Mergeable {
out := &data{
Members: make(map[string]member, len(d.Members)),
}
for k, v := range d.Members {
out.Members[k] = v.clone()
}
return out
}

func (d *data) getAllTokens() []uint32 {
out := []uint32(nil)
for _, m := range d.Members {
Expand Down Expand Up @@ -872,6 +892,14 @@ func (dc distributedCounter) RemoveTombstones(limit time.Time) (_, _ int) {
return
}

func (dc distributedCounter) Clone() Mergeable {
out := make(distributedCounter, len(dc))
for k, v := range dc {
out[k] = v
}
return out
}

type distributedCounterCodec struct{}

func (d distributedCounterCodec) CodecID() string {
Expand Down
3 changes: 3 additions & 0 deletions pkg/ring/kv/memberlist/mergeable.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,7 @@ type Mergeable interface {
// time when client is accessing value from the store. It can be used to hide tombstones from the clients.
// Returns the total number of tombstones present and the number of removed tombstones by this invocation.
RemoveTombstones(limit time.Time) (total, removed int)

// Clone should return a deep copy of the state.
Clone() Mergeable
}
10 changes: 0 additions & 10 deletions pkg/ring/kv/memberlist/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,6 @@ func (m *KV) createAndRegisterMetrics() {
"Number of values in KV Store",
nil, nil)

m.storeSizesDesc = prometheus.NewDesc(
prometheus.BuildFQName(m.cfg.MetricsNamespace, subsystem, "kv_store_value_bytes"), // gauge
"Sizes of values in KV Store in bytes",
[]string{"key"}, nil)

m.storeTombstones = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Expand Down Expand Up @@ -222,7 +217,6 @@ func (m *KV) createAndRegisterMetrics() {
// Describe returns prometheus descriptions via supplied channel
func (m *KV) Describe(ch chan<- *prometheus.Desc) {
ch <- m.storeValuesDesc
ch <- m.storeSizesDesc
}

// Collect returns extra metrics via supplied channel
Expand All @@ -231,8 +225,4 @@ func (m *KV) Collect(ch chan<- prometheus.Metric) {
defer m.storeMu.Unlock()

ch <- prometheus.MustNewConstMetric(m.storeValuesDesc, prometheus.GaugeValue, float64(len(m.store)))

for k, v := range m.store {
ch <- prometheus.MustNewConstMetric(m.storeSizesDesc, prometheus.GaugeValue, float64(len(v.value)), k)
}
}
5 changes: 5 additions & 0 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,11 @@ func (d *Desc) RemoveTombstones(limit time.Time) (total, removed int) {
return
}

// Clone returns a deep copy of the ring state.
func (d *Desc) Clone() memberlist.Mergeable {
return proto.Clone(d).(*Desc)
}

func (d *Desc) getTokensInfo() map[uint32]instanceInfo {
out := map[uint32]instanceInfo{}

Expand Down

0 comments on commit 6418248

Please sign in to comment.