From 791e224c6f6af7d8ea10dae04c021d806da39b79 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 18 May 2023 00:17:18 -0700 Subject: [PATCH] Cache: various index cache client improvements (#6374) --- pkg/store/bucket.go | 2 +- pkg/store/cache/cache.go | 6 ++--- pkg/store/cache/cache_test.go | 16 ++++++------ pkg/store/cache/inmemory.go | 10 +++++--- pkg/store/cache/memcached.go | 46 +++++++++-------------------------- 5 files changed, 31 insertions(+), 49 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index b448768c96c..c2648ed4001 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2345,7 +2345,7 @@ type postingPtr struct { } // fetchPostings fill postings requested by posting groups. -// It returns one postings for each key, in the same order. +// It returns one posting for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil. func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, []func(), error) { var closeFns []func() diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index 0c3b3021810..aac90fc148d 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -47,7 +47,7 @@ type IndexCache interface { } type cacheKey struct { - block ulid.ULID + block string key interface{} } @@ -79,9 +79,9 @@ func (c cacheKey) string() string { // which would end up in wrong query results. lbl := c.key.(cacheKeyPostings) lblHash := blake2b.Sum256([]byte(lbl.Name + ":" + lbl.Value)) - return "P:" + c.block.String() + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:]) + return "P:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:]) case cacheKeySeries: - return "S:" + c.block.String() + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10) + return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10) default: return "" } diff --git a/pkg/store/cache/cache_test.go b/pkg/store/cache/cache_test.go index 3de763958ce..889009259bb 100644 --- a/pkg/store/cache/cache_test.go +++ b/pkg/store/cache/cache_test.go @@ -26,13 +26,14 @@ func TestCacheKey_string(t *testing.T) { t.Parallel() uid := ulid.MustNew(1, nil) + ulidString := uid.String() tests := map[string]struct { key cacheKey expected string }{ "should stringify postings cache key": { - key: cacheKey{uid, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"})}, + key: cacheKey{ulidString, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"})}, expected: func() string { hash := blake2b.Sum256([]byte("foo:bar")) encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:]) @@ -41,7 +42,7 @@ func TestCacheKey_string(t *testing.T) { }(), }, "should stringify series cache key": { - key: cacheKey{uid, cacheKeySeries(12345)}, + key: cacheKey{ulidString, cacheKeySeries(12345)}, expected: fmt.Sprintf("S:%s:12345", uid.String()), }, } @@ -58,6 +59,7 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) { t.Parallel() uid := ulid.MustNew(1, nil) + ulidString := uid.String() tests := map[string]struct { keys []cacheKey @@ -66,14 +68,14 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) { "should guarantee reasonably short key length for postings": { expectedLen: 72, keys: []cacheKey{ - {uid, cacheKeyPostings(labels.Label{Name: "a", Value: "b"})}, - {uid, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})}, + {ulidString, cacheKeyPostings(labels.Label{Name: "a", Value: "b"})}, + {ulidString, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})}, }, }, "should guarantee reasonably short key length for series": { expectedLen: 49, keys: []cacheKey{ - {uid, cacheKeySeries(math.MaxUint64)}, + {ulidString, cacheKeySeries(math.MaxUint64)}, }, }, } @@ -89,7 +91,7 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) { func BenchmarkCacheKey_string_Postings(b *testing.B) { uid := ulid.MustNew(1, nil) - key := cacheKey{uid, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})} + key := cacheKey{uid.String(), cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})} b.ResetTimer() for i := 0; i < b.N; i++ { @@ -99,7 +101,7 @@ func BenchmarkCacheKey_string_Postings(b *testing.B) { func BenchmarkCacheKey_string_Series(b *testing.B) { uid := ulid.MustNew(1, nil) - key := cacheKey{uid, cacheKeySeries(math.MaxUint64)} + key := cacheKey{uid.String(), cacheKeySeries(math.MaxUint64)} b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index 295bb5ed969..f0e121c2651 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -290,7 +290,7 @@ func copyToKey(l labels.Label) cacheKeyPostings { // StorePostings sets the postings identified by the ulid and label to the value v, // if the postings already exists in the cache it is not mutated. func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { - c.set(cacheTypePostings, cacheKey{block: blockID, key: copyToKey(l)}, v) + c.set(cacheTypePostings, cacheKey{block: blockID.String(), key: copyToKey(l)}, v) } // FetchMultiPostings fetches multiple postings - each identified by a label - @@ -298,8 +298,9 @@ func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { hits = map[labels.Label][]byte{} + blockIDKey := blockID.String() for _, key := range keys { - if b, ok := c.get(cacheTypePostings, cacheKey{blockID, cacheKeyPostings(key)}); ok { + if b, ok := c.get(cacheTypePostings, cacheKey{blockIDKey, cacheKeyPostings(key)}); ok { hits[key] = b continue } @@ -313,7 +314,7 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid. // StoreSeries sets the series identified by the ulid and id to the value v, // if the series already exists in the cache it is not mutated. func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { - c.set(cacheTypeSeries, cacheKey{blockID, cacheKeySeries(id)}, v) + c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id)}, v) } // FetchMultiSeries fetches multiple series - each identified by ID - from the cache @@ -321,8 +322,9 @@ func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef func (c *InMemoryIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { hits = map[storage.SeriesRef][]byte{} + blockIDKey := blockID.String() for _, id := range ids { - if b, ok := c.get(cacheTypeSeries, cacheKey{blockID, cacheKeySeries(id)}); ok { + if b, ok := c.get(cacheTypeSeries, cacheKey{blockIDKey, cacheKeySeries(id)}); ok { hits[id] = b continue } diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index e1c5d689a8d..8bdd0ca2710 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -64,7 +64,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { - key := cacheKey{blockID, cacheKeyPostings(l)}.string() + key := cacheKey{blockID.String(), cacheKeyPostings(l)}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) @@ -75,16 +75,12 @@ func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v [] // and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { - // Build the cache keys, while keeping a map between input label and the cache key - // so that we can easily reverse it back after the GetMulti(). keys := make([]string, 0, len(lbls)) - keysMapping := map[labels.Label]string{} + blockIDKey := blockID.String() for _, lbl := range lbls { - key := cacheKey{blockID, cacheKeyPostings(lbl)}.string() - + key := cacheKey{blockIDKey, cacheKeyPostings(lbl)}.string() keys = append(keys, key) - keysMapping[lbl] = key } // Fetch the keys from memcached in a single request. @@ -96,18 +92,11 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. // Construct the resulting hits map and list of missing keys. We iterate on the input // list of labels to be able to easily create the list of ones in a single iteration. - hits = map[labels.Label][]byte{} - - for _, lbl := range lbls { - key, ok := keysMapping[lbl] - if !ok { - level.Error(c.logger).Log("msg", "keys mapping inconsistency found in memcached index cache client", "type", "postings", "label", lbl.Name+":"+lbl.Value) - continue - } - + hits = make(map[labels.Label][]byte, len(results)) + for i, lbl := range lbls { // Check if the key has been found in memcached. If not, we add it to the list // of missing keys. - value, ok := results[key] + value, ok := results[keys[i]] if !ok { misses = append(misses, lbl) continue @@ -124,7 +113,7 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { - key := cacheKey{blockID, cacheKeySeries(id)}.string() + key := cacheKey{blockID.String(), cacheKeySeries(id)}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { level.Error(c.logger).Log("msg", "failed to cache series in memcached", "err", err) @@ -135,16 +124,12 @@ func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, // and returns a map containing cache hits, along with a list of missing IDs. // In case of error, it logs and return an empty cache hits map. func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { - // Build the cache keys, while keeping a map between input id and the cache key - // so that we can easily reverse it back after the GetMulti(). keys := make([]string, 0, len(ids)) - keysMapping := map[storage.SeriesRef]string{} + blockIDKey := blockID.String() for _, id := range ids { - key := cacheKey{blockID, cacheKeySeries(id)}.string() - + key := cacheKey{blockIDKey, cacheKeySeries(id)}.string() keys = append(keys, key) - keysMapping[id] = key } // Fetch the keys from memcached in a single request. @@ -156,18 +141,11 @@ func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.UL // Construct the resulting hits map and list of missing keys. We iterate on the input // list of ids to be able to easily create the list of ones in a single iteration. - hits = map[storage.SeriesRef][]byte{} - - for _, id := range ids { - key, ok := keysMapping[id] - if !ok { - level.Error(c.logger).Log("msg", "keys mapping inconsistency found in memcached index cache client", "type", "series", "id", id) - continue - } - + hits = make(map[storage.SeriesRef][]byte, len(results)) + for i, id := range ids { // Check if the key has been found in memcached. If not, we add it to the list // of missing keys. - value, ok := results[key] + value, ok := results[keys[i]] if !ok { misses = append(misses, id) continue