diff --git a/CHANGELOG.md b/CHANGELOG.md index 804b58761c..3d14d59657 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [FEATURE] Ruler: Add new `-ruler.query-stats-enabled` which when enabled will report the `cortex_ruler_query_seconds_total` as a per-user metric that tracks the sum of the wall time of executing queries in the ruler in seconds. #4317 * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 +* [CHANGE] Memberlist: the `memberlist_kv_store_value_bytes` has been removed due to values no longer being stored in-memory as encoded bytes. #4345 * [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262 * [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341 * [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342 @@ -13,6 +14,7 @@ * `-alertmanager.sharding-ring.heartbeat-timeout` * `-compactor.ring.heartbeat-timeout` * `-store-gateway.sharding-ring.heartbeat-timeout` +* [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345 * [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 ## 1.10.0-rc.0 / 2021-06-28 diff --git a/pkg/ring/kv/memberlist/kv_init_service.go b/pkg/ring/kv/memberlist/kv_init_service.go index da4d965b7c..517c674281 100644 --- a/pkg/ring/kv/memberlist/kv_init_service.go +++ b/pkg/ring/kv/memberlist/kv_init_service.go @@ -16,7 +16,6 @@ import ( "github.com/hashicorp/memberlist" "go.uber.org/atomic" - "github.com/cortexproject/cortex/pkg/ring/kv/codec" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -106,12 +105,12 @@ func (kvs *KVInitService) ServeHTTP(w http.ResponseWriter, req *http.Request) { if err := req.ParseForm(); err == nil { if req.Form[downloadKeyParam] != nil { - downloadKey(w, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest. + downloadKey(w, kv, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest. return } if req.Form[viewKeyParam] != nil { - viewKey(w, kv, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req)) + viewKey(w, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req)) return } @@ -179,30 +178,25 @@ func viewMessage(w http.ResponseWriter, kv *KV, msg message, format string) { return } - formatValue(w, c, msg.Pair.Value, format) + val, err := c.Decode(msg.Pair.Value) + if err != nil { + http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError) + return + } + + formatValue(w, val, format) } -func viewKey(w http.ResponseWriter, kv *KV, store map[string]valueDesc, key string, format string) { +func viewKey(w http.ResponseWriter, store map[string]valueDesc, key string, format string) { if store[key].value == nil { http.Error(w, "value not found", http.StatusNotFound) return } - c := kv.GetCodec(store[key].codecID) - if c == nil { - http.Error(w, "codec not found", http.StatusNotFound) - return - } - - formatValue(w, c, store[key].value, format) + formatValue(w, store[key].value, format) } -func formatValue(w http.ResponseWriter, codec codec.Codec, value []byte, format string) { - val, err := codec.Decode(value) - if err != nil { - http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError) - return - } +func formatValue(w http.ResponseWriter, val interface{}, format string) { w.WriteHeader(200) w.Header().Add("content-type", "text/plain") @@ -214,7 +208,7 @@ func formatValue(w http.ResponseWriter, codec codec.Codec, value []byte, format enc.SetIndent("", " ") } - err = enc.Encode(val) + err := enc.Encode(val) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } @@ -224,7 +218,7 @@ func formatValue(w http.ResponseWriter, codec codec.Codec, value []byte, format } } -func downloadKey(w http.ResponseWriter, store map[string]valueDesc, key string) { +func downloadKey(w http.ResponseWriter, kv *KV, store map[string]valueDesc, key string) { if store[key].value == nil { http.Error(w, "value not found", http.StatusNotFound) return @@ -232,14 +226,26 @@ func downloadKey(w http.ResponseWriter, store map[string]valueDesc, key string) val := store[key] + c := kv.GetCodec(store[key].codecID) + if c == nil { + http.Error(w, "codec not found", http.StatusNotFound) + return + } + + encoded, err := c.Encode(val.value) + if err != nil { + http.Error(w, fmt.Sprintf("failed to encode: %v", err), http.StatusInternalServerError) + return + } + w.Header().Add("content-type", "application/octet-stream") // Set content-length so that client knows whether it has received full response or not. - w.Header().Add("content-length", strconv.Itoa(len(val.value))) + w.Header().Add("content-length", strconv.Itoa(len(encoded))) w.Header().Add("content-disposition", fmt.Sprintf("attachment; filename=%d-%s", val.version, key)) w.WriteHeader(200) // Ignore errors, we cannot do anything about them. - _, _ = w.Write(val.value) + _, _ = w.Write(encoded) } type pageData struct { diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index c0f962993b..7e45868bd9 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -265,7 +265,6 @@ type KV struct { watchPrefixDroppedNotifications *prometheus.CounterVec storeValuesDesc *prometheus.Desc - storeSizesDesc *prometheus.Desc storeTombstones *prometheus.GaugeVec storeRemovedTombstones *prometheus.CounterVec @@ -292,9 +291,11 @@ type message struct { } type valueDesc struct { - // We store bytes here. Reason is that clients calling CAS function will modify the object in place, - // but unless CAS succeeds, we don't want those modifications to be visible. - value []byte + // We store the decoded value here to prevent decoding the entire state for every + // update we receive. Whilst the updates are small and fast to decode, + // the total state can be quite large. + // The CAS function is passed a deep copy because it modifies in-place. + value Mergeable // version (local only) is used to keep track of what we're gossiping about, and invalidate old messages version uint @@ -303,8 +304,16 @@ type valueDesc struct { codecID string } +func (v valueDesc) Clone() (result valueDesc) { + result = v + if v.value != nil { + result.value = v.value.Clone() + } + return +} + func (v valueDesc) String() string { - return fmt.Sprintf("size: %d, version: %d, codec: %s", len(v.value), v.version, v.codecID) + return fmt.Sprintf("version: %d, codec: %s", v.version, v.codecID) } var ( @@ -612,24 +621,16 @@ func (m *KV) Get(key string, codec codec.Codec) (interface{}, error) { // Returns current value with removed tombstones. func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint, err error) { m.storeMu.Lock() - v := m.store[key] + v := m.store[key].Clone() m.storeMu.Unlock() - out = nil if v.value != nil { - out, err = codec.Decode(v.value) - if err != nil { - return nil, 0, err - } - - if mr, ok := out.(Mergeable); ok { - // remove ALL tombstones before returning to client. - // No need for clients to see them. - _, _ = mr.RemoveTombstones(time.Time{}) - } + // remove ALL tombstones before returning to client. + // No need for clients to see them. + _, _ = v.value.RemoveTombstones(time.Time{}) } - return out, v.version, nil + return v.value, v.version, nil } // WatchKey watches for value changes for given key. When value changes, 'f' function is called with the @@ -1043,9 +1044,21 @@ func (m *KV) LocalState(join bool) []byte { continue } + codec := m.GetCodec(val.codecID) + if codec == nil { + level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.codecID, "key", key) + continue + } + + encoded, err := codec.Encode(val.value) + if err != nil { + level.Error(m.logger).Log("msg", "failed to encode remote state", "err", err) + continue + } + kvPair.Reset() kvPair.Key = key - kvPair.Value = val.value + kvPair.Value = encoded kvPair.Codec = val.codecID ser, err := kvPair.Marshal() @@ -1055,7 +1068,7 @@ func (m *KV) LocalState(join bool) []byte { } if uint(len(ser)) > math.MaxUint32 { - level.Error(m.logger).Log("msg", "value too long", "key", key, "value_length", len(val.value)) + level.Error(m.logger).Log("msg", "value too long", "key", key, "value_length", len(encoded)) continue } @@ -1177,12 +1190,12 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui m.storeMu.Lock() defer m.storeMu.Unlock() - curr := m.store[key] + curr := m.store[key].Clone() // if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set. if casVersion > 0 && curr.version != casVersion { return nil, 0, errVersionMismatch } - result, change, err := computeNewValue(incomingValue, curr.value, codec, casVersion > 0) + result, change, err := computeNewValue(incomingValue, curr.value, casVersion > 0) if err != nil { return nil, 0, err } @@ -1199,14 +1212,9 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed)) } - encoded, err := codec.Encode(result) - if err != nil { - return nil, 0, fmt.Errorf("failed to encode merged result: %v", err) - } - newVersion := curr.version + 1 m.store[key] = valueDesc{ - value: encoded, + value: result, version: newVersion, codecID: codec.CodecID(), } @@ -1215,25 +1223,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui } // returns [result, change, error] -func computeNewValue(incoming Mergeable, stored []byte, c codec.Codec, cas bool) (Mergeable, Mergeable, error) { - if len(stored) == 0 { - return incoming, incoming, nil - } - - old, err := c.Decode(stored) - if err != nil { - return incoming, incoming, fmt.Errorf("failed to decode stored value: %v", err) - } - - if old == nil { - return incoming, incoming, nil - } - - oldVal, ok := old.(Mergeable) - if !ok { - return incoming, incoming, fmt.Errorf("stored value is not Mergeable, got %T", old) - } - +func computeNewValue(incoming Mergeable, oldVal Mergeable, cas bool) (Mergeable, Mergeable, error) { if oldVal == nil { return incoming, incoming, nil } @@ -1249,7 +1239,7 @@ func (m *KV) storeCopy() map[string]valueDesc { result := make(map[string]valueDesc, len(m.store)) for k, v := range m.store { - result[k] = v + result[k] = v.Clone() } return result } diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index de337571da..2782c6f931 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -89,6 +89,26 @@ func (d *data) RemoveTombstones(limit time.Time) (_, _ int) { return } +func (m member) clone() member { + out := member{ + Timestamp: m.Timestamp, + Tokens: make([]uint32, len(m.Tokens)), + State: m.State, + } + copy(out.Tokens, m.Tokens) + return out +} + +func (d *data) Clone() Mergeable { + out := &data{ + Members: make(map[string]member, len(d.Members)), + } + for k, v := range d.Members { + out.Members[k] = v.clone() + } + return out +} + func (d *data) getAllTokens() []uint32 { out := []uint32(nil) for _, m := range d.Members { @@ -872,6 +892,14 @@ func (dc distributedCounter) RemoveTombstones(limit time.Time) (_, _ int) { return } +func (dc distributedCounter) Clone() Mergeable { + out := make(distributedCounter, len(dc)) + for k, v := range dc { + out[k] = v + } + return out +} + type distributedCounterCodec struct{} func (d distributedCounterCodec) CodecID() string { diff --git a/pkg/ring/kv/memberlist/mergeable.go b/pkg/ring/kv/memberlist/mergeable.go index 0cb9964f1f..a013e34988 100644 --- a/pkg/ring/kv/memberlist/mergeable.go +++ b/pkg/ring/kv/memberlist/mergeable.go @@ -40,4 +40,7 @@ type Mergeable interface { // time when client is accessing value from the store. It can be used to hide tombstones from the clients. // Returns the total number of tombstones present and the number of removed tombstones by this invocation. RemoveTombstones(limit time.Time) (total, removed int) + + // Clone should return a deep copy of the state. + Clone() Mergeable } diff --git a/pkg/ring/kv/memberlist/metrics.go b/pkg/ring/kv/memberlist/metrics.go index ff729723da..e6e7d9bb4d 100644 --- a/pkg/ring/kv/memberlist/metrics.go +++ b/pkg/ring/kv/memberlist/metrics.go @@ -117,11 +117,6 @@ func (m *KV) createAndRegisterMetrics() { "Number of values in KV Store", nil, nil) - m.storeSizesDesc = prometheus.NewDesc( - prometheus.BuildFQName(m.cfg.MetricsNamespace, subsystem, "kv_store_value_bytes"), // gauge - "Sizes of values in KV Store in bytes", - []string{"key"}, nil) - m.storeTombstones = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, @@ -222,7 +217,6 @@ func (m *KV) createAndRegisterMetrics() { // Describe returns prometheus descriptions via supplied channel func (m *KV) Describe(ch chan<- *prometheus.Desc) { ch <- m.storeValuesDesc - ch <- m.storeSizesDesc } // Collect returns extra metrics via supplied channel @@ -231,8 +225,4 @@ func (m *KV) Collect(ch chan<- prometheus.Metric) { defer m.storeMu.Unlock() ch <- prometheus.MustNewConstMetric(m.storeValuesDesc, prometheus.GaugeValue, float64(len(m.store))) - - for k, v := range m.store { - ch <- prometheus.MustNewConstMetric(m.storeSizesDesc, prometheus.GaugeValue, float64(len(v.value)), k) - } } diff --git a/pkg/ring/model.go b/pkg/ring/model.go index dac6103b52..3a257c3952 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -398,6 +398,11 @@ func (d *Desc) RemoveTombstones(limit time.Time) (total, removed int) { return } +// Clone returns a deep copy of the ring state. +func (d *Desc) Clone() memberlist.Mergeable { + return proto.Clone(d).(*Desc) +} + func (d *Desc) getTokensInfo() map[uint32]instanceInfo { out := map[uint32]instanceInfo{}