From 6418248dc1d47fd008f78b8910eb8c24f5fd4703 Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Fri, 9 Jul 2021 13:43:02 +0200 Subject: [PATCH] Optimise memberlist kv store access by storing data unencoded. (#4345) * Optimise memberlist kv store access by storing data unencoded. The following profile data was taken from running 50 idle ingesters with memberlist, with almost everything at default values (5s heartbeats): ``` 52.16% mergeBytesValueForKey +- 52.16% mergeValueForKey +- 47.84% computeNewValue +- 27.24% codec Proto Decode +- 26.25% mergeWithTime ``` It is apparent from the this that a lot of time is spent on the memberlist receive path, as might be expected, specifically, the merging of the update into the current state. The cost however is not in decoding the incoming states (occurs in `mergeBytesValueForKey` before `mergeValueForKey`), but in fact decoding _current state_ of the value in the store (as it is stored encoded). The ring state was measured at 123K (50 ingesters), so it makes sense that decoding could be costly. This can be avoided by storing the value in it's decoded `Mergeable` form. When doing this, care has to be taken to deep copy the value when accessed, as it is modified in place before being updated in the store, and accessed outside the store mutex. Note a side effect of this change is that is no longer straightforward to expose the `memberlist_kv_store_value_bytes` metric, as this reported the size of the encoded data, therefore it has been removed. Signed-off-by: Steve Simpson * Typo. Signed-off-by: Steve Simpson * Review comments. Signed-off-by: Steve Simpson Signed-off-by: Alvin Lin --- CHANGELOG.md | 2 + pkg/ring/kv/memberlist/kv_init_service.go | 50 ++++++----- pkg/ring/kv/memberlist/memberlist_client.go | 86 ++++++++----------- .../kv/memberlist/memberlist_client_test.go | 28 ++++++ pkg/ring/kv/memberlist/mergeable.go | 3 + pkg/ring/kv/memberlist/metrics.go | 10 --- pkg/ring/model.go | 5 ++ 7 files changed, 104 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ce523859c..114a40c7da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [FEATURE] Ruler: Add new `-ruler.query-stats-enabled` which when enabled will report the `cortex_ruler_query_seconds_total` as a per-user metric that tracks the sum of the wall time of executing queries in the ruler in seconds. #4317 * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 +* [CHANGE] Memberlist: the `memberlist_kv_store_value_bytes` has been removed due to values no longer being stored in-memory as encoded bytes. #4345 * [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262 * [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341 * [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342 @@ -13,6 +14,7 @@ * `-alertmanager.sharding-ring.heartbeat-timeout` * `-compactor.ring.heartbeat-timeout` * `-store-gateway.sharding-ring.heartbeat-timeout` +* [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345 * [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 * [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4318 diff --git a/pkg/ring/kv/memberlist/kv_init_service.go b/pkg/ring/kv/memberlist/kv_init_service.go index da4d965b7c..517c674281 100644 --- a/pkg/ring/kv/memberlist/kv_init_service.go +++ b/pkg/ring/kv/memberlist/kv_init_service.go @@ -16,7 +16,6 @@ import ( "github.com/hashicorp/memberlist" "go.uber.org/atomic" - "github.com/cortexproject/cortex/pkg/ring/kv/codec" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -106,12 +105,12 @@ func (kvs *KVInitService) ServeHTTP(w http.ResponseWriter, req *http.Request) { if err := req.ParseForm(); err == nil { if req.Form[downloadKeyParam] != nil { - downloadKey(w, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest. + downloadKey(w, kv, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest. return } if req.Form[viewKeyParam] != nil { - viewKey(w, kv, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req)) + viewKey(w, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req)) return } @@ -179,30 +178,25 @@ func viewMessage(w http.ResponseWriter, kv *KV, msg message, format string) { return } - formatValue(w, c, msg.Pair.Value, format) + val, err := c.Decode(msg.Pair.Value) + if err != nil { + http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError) + return + } + + formatValue(w, val, format) } -func viewKey(w http.ResponseWriter, kv *KV, store map[string]valueDesc, key string, format string) { +func viewKey(w http.ResponseWriter, store map[string]valueDesc, key string, format string) { if store[key].value == nil { http.Error(w, "value not found", http.StatusNotFound) return } - c := kv.GetCodec(store[key].codecID) - if c == nil { - http.Error(w, "codec not found", http.StatusNotFound) - return - } - - formatValue(w, c, store[key].value, format) + formatValue(w, store[key].value, format) } -func formatValue(w http.ResponseWriter, codec codec.Codec, value []byte, format string) { - val, err := codec.Decode(value) - if err != nil { - http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError) - return - } +func formatValue(w http.ResponseWriter, val interface{}, format string) { w.WriteHeader(200) w.Header().Add("content-type", "text/plain") @@ -214,7 +208,7 @@ func formatValue(w http.ResponseWriter, codec codec.Codec, value []byte, format enc.SetIndent("", " ") } - err = enc.Encode(val) + err := enc.Encode(val) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } @@ -224,7 +218,7 @@ func formatValue(w http.ResponseWriter, codec codec.Codec, value []byte, format } } -func downloadKey(w http.ResponseWriter, store map[string]valueDesc, key string) { +func downloadKey(w http.ResponseWriter, kv *KV, store map[string]valueDesc, key string) { if store[key].value == nil { http.Error(w, "value not found", http.StatusNotFound) return @@ -232,14 +226,26 @@ func downloadKey(w http.ResponseWriter, store map[string]valueDesc, key string) val := store[key] + c := kv.GetCodec(store[key].codecID) + if c == nil { + http.Error(w, "codec not found", http.StatusNotFound) + return + } + + encoded, err := c.Encode(val.value) + if err != nil { + http.Error(w, fmt.Sprintf("failed to encode: %v", err), http.StatusInternalServerError) + return + } + w.Header().Add("content-type", "application/octet-stream") // Set content-length so that client knows whether it has received full response or not. - w.Header().Add("content-length", strconv.Itoa(len(val.value))) + w.Header().Add("content-length", strconv.Itoa(len(encoded))) w.Header().Add("content-disposition", fmt.Sprintf("attachment; filename=%d-%s", val.version, key)) w.WriteHeader(200) // Ignore errors, we cannot do anything about them. - _, _ = w.Write(val.value) + _, _ = w.Write(encoded) } type pageData struct { diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index c0f962993b..7e45868bd9 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -265,7 +265,6 @@ type KV struct { watchPrefixDroppedNotifications *prometheus.CounterVec storeValuesDesc *prometheus.Desc - storeSizesDesc *prometheus.Desc storeTombstones *prometheus.GaugeVec storeRemovedTombstones *prometheus.CounterVec @@ -292,9 +291,11 @@ type message struct { } type valueDesc struct { - // We store bytes here. Reason is that clients calling CAS function will modify the object in place, - // but unless CAS succeeds, we don't want those modifications to be visible. - value []byte + // We store the decoded value here to prevent decoding the entire state for every + // update we receive. Whilst the updates are small and fast to decode, + // the total state can be quite large. + // The CAS function is passed a deep copy because it modifies in-place. + value Mergeable // version (local only) is used to keep track of what we're gossiping about, and invalidate old messages version uint @@ -303,8 +304,16 @@ type valueDesc struct { codecID string } +func (v valueDesc) Clone() (result valueDesc) { + result = v + if v.value != nil { + result.value = v.value.Clone() + } + return +} + func (v valueDesc) String() string { - return fmt.Sprintf("size: %d, version: %d, codec: %s", len(v.value), v.version, v.codecID) + return fmt.Sprintf("version: %d, codec: %s", v.version, v.codecID) } var ( @@ -612,24 +621,16 @@ func (m *KV) Get(key string, codec codec.Codec) (interface{}, error) { // Returns current value with removed tombstones. func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint, err error) { m.storeMu.Lock() - v := m.store[key] + v := m.store[key].Clone() m.storeMu.Unlock() - out = nil if v.value != nil { - out, err = codec.Decode(v.value) - if err != nil { - return nil, 0, err - } - - if mr, ok := out.(Mergeable); ok { - // remove ALL tombstones before returning to client. - // No need for clients to see them. - _, _ = mr.RemoveTombstones(time.Time{}) - } + // remove ALL tombstones before returning to client. + // No need for clients to see them. + _, _ = v.value.RemoveTombstones(time.Time{}) } - return out, v.version, nil + return v.value, v.version, nil } // WatchKey watches for value changes for given key. When value changes, 'f' function is called with the @@ -1043,9 +1044,21 @@ func (m *KV) LocalState(join bool) []byte { continue } + codec := m.GetCodec(val.codecID) + if codec == nil { + level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.codecID, "key", key) + continue + } + + encoded, err := codec.Encode(val.value) + if err != nil { + level.Error(m.logger).Log("msg", "failed to encode remote state", "err", err) + continue + } + kvPair.Reset() kvPair.Key = key - kvPair.Value = val.value + kvPair.Value = encoded kvPair.Codec = val.codecID ser, err := kvPair.Marshal() @@ -1055,7 +1068,7 @@ func (m *KV) LocalState(join bool) []byte { } if uint(len(ser)) > math.MaxUint32 { - level.Error(m.logger).Log("msg", "value too long", "key", key, "value_length", len(val.value)) + level.Error(m.logger).Log("msg", "value too long", "key", key, "value_length", len(encoded)) continue } @@ -1177,12 +1190,12 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui m.storeMu.Lock() defer m.storeMu.Unlock() - curr := m.store[key] + curr := m.store[key].Clone() // if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set. if casVersion > 0 && curr.version != casVersion { return nil, 0, errVersionMismatch } - result, change, err := computeNewValue(incomingValue, curr.value, codec, casVersion > 0) + result, change, err := computeNewValue(incomingValue, curr.value, casVersion > 0) if err != nil { return nil, 0, err } @@ -1199,14 +1212,9 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed)) } - encoded, err := codec.Encode(result) - if err != nil { - return nil, 0, fmt.Errorf("failed to encode merged result: %v", err) - } - newVersion := curr.version + 1 m.store[key] = valueDesc{ - value: encoded, + value: result, version: newVersion, codecID: codec.CodecID(), } @@ -1215,25 +1223,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui } // returns [result, change, error] -func computeNewValue(incoming Mergeable, stored []byte, c codec.Codec, cas bool) (Mergeable, Mergeable, error) { - if len(stored) == 0 { - return incoming, incoming, nil - } - - old, err := c.Decode(stored) - if err != nil { - return incoming, incoming, fmt.Errorf("failed to decode stored value: %v", err) - } - - if old == nil { - return incoming, incoming, nil - } - - oldVal, ok := old.(Mergeable) - if !ok { - return incoming, incoming, fmt.Errorf("stored value is not Mergeable, got %T", old) - } - +func computeNewValue(incoming Mergeable, oldVal Mergeable, cas bool) (Mergeable, Mergeable, error) { if oldVal == nil { return incoming, incoming, nil } @@ -1249,7 +1239,7 @@ func (m *KV) storeCopy() map[string]valueDesc { result := make(map[string]valueDesc, len(m.store)) for k, v := range m.store { - result[k] = v + result[k] = v.Clone() } return result } diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index de337571da..2782c6f931 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -89,6 +89,26 @@ func (d *data) RemoveTombstones(limit time.Time) (_, _ int) { return } +func (m member) clone() member { + out := member{ + Timestamp: m.Timestamp, + Tokens: make([]uint32, len(m.Tokens)), + State: m.State, + } + copy(out.Tokens, m.Tokens) + return out +} + +func (d *data) Clone() Mergeable { + out := &data{ + Members: make(map[string]member, len(d.Members)), + } + for k, v := range d.Members { + out.Members[k] = v.clone() + } + return out +} + func (d *data) getAllTokens() []uint32 { out := []uint32(nil) for _, m := range d.Members { @@ -872,6 +892,14 @@ func (dc distributedCounter) RemoveTombstones(limit time.Time) (_, _ int) { return } +func (dc distributedCounter) Clone() Mergeable { + out := make(distributedCounter, len(dc)) + for k, v := range dc { + out[k] = v + } + return out +} + type distributedCounterCodec struct{} func (d distributedCounterCodec) CodecID() string { diff --git a/pkg/ring/kv/memberlist/mergeable.go b/pkg/ring/kv/memberlist/mergeable.go index 0cb9964f1f..a013e34988 100644 --- a/pkg/ring/kv/memberlist/mergeable.go +++ b/pkg/ring/kv/memberlist/mergeable.go @@ -40,4 +40,7 @@ type Mergeable interface { // time when client is accessing value from the store. It can be used to hide tombstones from the clients. // Returns the total number of tombstones present and the number of removed tombstones by this invocation. RemoveTombstones(limit time.Time) (total, removed int) + + // Clone should return a deep copy of the state. + Clone() Mergeable } diff --git a/pkg/ring/kv/memberlist/metrics.go b/pkg/ring/kv/memberlist/metrics.go index ff729723da..e6e7d9bb4d 100644 --- a/pkg/ring/kv/memberlist/metrics.go +++ b/pkg/ring/kv/memberlist/metrics.go @@ -117,11 +117,6 @@ func (m *KV) createAndRegisterMetrics() { "Number of values in KV Store", nil, nil) - m.storeSizesDesc = prometheus.NewDesc( - prometheus.BuildFQName(m.cfg.MetricsNamespace, subsystem, "kv_store_value_bytes"), // gauge - "Sizes of values in KV Store in bytes", - []string{"key"}, nil) - m.storeTombstones = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, @@ -222,7 +217,6 @@ func (m *KV) createAndRegisterMetrics() { // Describe returns prometheus descriptions via supplied channel func (m *KV) Describe(ch chan<- *prometheus.Desc) { ch <- m.storeValuesDesc - ch <- m.storeSizesDesc } // Collect returns extra metrics via supplied channel @@ -231,8 +225,4 @@ func (m *KV) Collect(ch chan<- prometheus.Metric) { defer m.storeMu.Unlock() ch <- prometheus.MustNewConstMetric(m.storeValuesDesc, prometheus.GaugeValue, float64(len(m.store))) - - for k, v := range m.store { - ch <- prometheus.MustNewConstMetric(m.storeSizesDesc, prometheus.GaugeValue, float64(len(v.value)), k) - } } diff --git a/pkg/ring/model.go b/pkg/ring/model.go index dac6103b52..3a257c3952 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -398,6 +398,11 @@ func (d *Desc) RemoveTombstones(limit time.Time) (total, removed int) { return } +// Clone returns a deep copy of the ring state. +func (d *Desc) Clone() memberlist.Mergeable { + return proto.Clone(d).(*Desc) +} + func (d *Desc) getTokensInfo() map[uint32]instanceInfo { out := map[uint32]instanceInfo{}