From 858130196ca4e3dd819f3205d18cfbc5b138aa1f Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 31 May 2023 14:06:27 -0700 Subject: [PATCH 1/9] extend postings cache key with codec Signed-off-by: Ben Ye --- pkg/store/bucket.go | 8 +++--- pkg/store/bucket_e2e_test.go | 8 +++--- pkg/store/cache/cache.go | 11 ++++++-- pkg/store/cache/inmemory.go | 4 +-- pkg/store/cache/inmemory_test.go | 44 +++++++++++++++---------------- pkg/store/cache/memcached.go | 13 ++++++--- pkg/store/cache/memcached_test.go | 4 +-- pkg/store/postings_codec.go | 30 +++++++++------------ 8 files changed, 66 insertions(+), 56 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index dafc08bdd9..fe7980cbff 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -369,8 +369,8 @@ func (s *BucketStore) validate() error { type noopCache struct{} -func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte) {} -func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) { +func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte, storecache.PostingsCodec) {} +func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label, _ storecache.PostingsCodec) (map[labels.Label][]byte, []labels.Label) { return map[labels.Label][]byte{}, keys } @@ -2388,7 +2388,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab output := make([]index.Postings, len(keys)) // Fetch postings from the cache with a single call. - fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys) + fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, storecache.CodecHeaderStreamedSnappy) for _, dataFromCache := range fromCache { if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err) @@ -2528,7 +2528,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab // Truncate first 4 bytes which are length of posting. output[p.keyID] = newBigEndianPostings(pBytes[4:]) - r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache) + r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache, storecache.CodecHeaderStreamedSnappy) // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 9e1fd17ca4..607f2398df 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -58,12 +58,12 @@ func (c *swappableCache) SwapWith(ptr2 storecache.IndexCache) { c.ptr = ptr2 } -func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { - c.ptr.StorePostings(blockID, l, v) +func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, codec storecache.PostingsCodec) { + c.ptr.StorePostings(blockID, l, v, codec) } -func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) { - return c.ptr.FetchMultiPostings(ctx, blockID, keys) +func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, codec storecache.PostingsCodec) (map[labels.Label][]byte, []labels.Label) { + return c.ptr.FetchMultiPostings(ctx, blockID, keys, codec) } func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index aac90fc148..d2ef4064a0 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -25,6 +25,13 @@ var ( ulidSize = uint64(len(ulid.ULID{})) ) +type PostingsCodec string + +const ( + CodecHeaderSnappy PostingsCodec = "dvs" // As in "diff+varint+snappy". + CodecHeaderStreamedSnappy PostingsCodec = "dss" // As in "diffvarint+streamed snappy". +) + // IndexCache is the interface exported by index cache backends. // Store operations do not support context.Context, deadlines need to be // supported by the backends themselves. This is because Set operations are @@ -32,11 +39,11 @@ var ( // (potentially with a deadline) as in the original user's request. type IndexCache interface { // StorePostings stores postings for a single series. - StorePostings(blockID ulid.ULID, l labels.Label, v []byte) + StorePostings(blockID ulid.ULID, l labels.Label, v []byte, codec PostingsCodec) // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. - FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) + FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, codec PostingsCodec) (hits map[labels.Label][]byte, misses []labels.Label) // StoreSeries stores a single series. StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index f0e121c265..6b6973c687 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -289,13 +289,13 @@ func copyToKey(l labels.Label) cacheKeyPostings { // StorePostings sets the postings identified by the ulid and label to the value v, // if the postings already exists in the cache it is not mutated. -func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { +func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, _ PostingsCodec) { c.set(cacheTypePostings, cacheKey{block: blockID.String(), key: copyToKey(l)}, v) } // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. -func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { +func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label, _ PostingsCodec) (hits map[labels.Label][]byte, misses []labels.Label) { hits = map[labels.Label][]byte{} blockIDKey := blockID.String() diff --git a/pkg/store/cache/inmemory_test.go b/pkg/store/cache/inmemory_test.go index d97d125cf8..d891449012 100644 --- a/pkg/store/cache/inmemory_test.go +++ b/pkg/store/cache/inmemory_test.go @@ -79,14 +79,14 @@ func TestInMemoryIndexCache_AvoidsDeadlock(t *testing.T) { testutil.Ok(t, err) cache.lru = l - cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11}) + cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11}, CodecHeaderStreamedSnappy) testutil.Equals(t, uint64(sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(cache.curSize), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) // This triggers deadlock logic. - cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test1", Value: "1"}, []byte{42}) + cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test1", Value: "1"}, []byte{42}, CodecHeaderStreamedSnappy) testutil.Equals(t, uint64(sliceHeaderSize+1), cache.curSize) testutil.Equals(t, float64(cache.curSize), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) @@ -131,9 +131,9 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { }{ { typ: cacheTypePostings, - set: func(id storage.SeriesRef, b []byte) { cache.StorePostings(uid(id), lbl, b) }, + set: func(id storage.SeriesRef, b []byte) { cache.StorePostings(uid(id), lbl, b, CodecHeaderStreamedSnappy) }, get: func(id storage.SeriesRef) ([]byte, bool) { - hits, _ := cache.FetchMultiPostings(ctx, uid(id), []labels.Label{lbl}) + hits, _ := cache.FetchMultiPostings(ctx, uid(id), []labels.Label{lbl}, CodecHeaderStreamedSnappy) b, ok := hits[lbl] return b, ok @@ -210,9 +210,9 @@ func TestInMemoryIndexCache_MaxNumberOfItemsHit(t *testing.T) { id := ulid.MustNew(0, nil) - cache.StorePostings(id, labels.Label{Name: "test", Value: "123"}, []byte{42, 33}) - cache.StorePostings(id, labels.Label{Name: "test", Value: "124"}, []byte{42, 33}) - cache.StorePostings(id, labels.Label{Name: "test", Value: "125"}, []byte{42, 33}) + cache.StorePostings(id, labels.Label{Name: "test", Value: "123"}, []byte{42, 33}, CodecHeaderStreamedSnappy) + cache.StorePostings(id, labels.Label{Name: "test", Value: "124"}, []byte{42, 33}, CodecHeaderStreamedSnappy) + cache.StorePostings(id, labels.Label{Name: "test", Value: "125"}, []byte{42, 33}, CodecHeaderStreamedSnappy) testutil.Equals(t, uint64(2*sliceHeaderSize+4), cache.curSize) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) @@ -243,12 +243,12 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { emptySeriesHits := map[storage.SeriesRef][]byte{} emptySeriesMisses := []storage.SeriesRef(nil) - pHits, pMisses := cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}) + pHits, pMisses := cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}, CodecHeaderStreamedSnappy) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{lbls}, pMisses) // Add sliceHeaderSize + 2 bytes. - cache.StorePostings(id, lbls, []byte{42, 33}) + cache.StorePostings(id, lbls, []byte{42, 33}, CodecHeaderStreamedSnappy) testutil.Equals(t, uint64(sliceHeaderSize+2), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(sliceHeaderSize+2), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) @@ -261,15 +261,15 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}, CodecHeaderStreamedSnappy) testutil.Equals(t, map[labels.Label][]byte{lbls: {42, 33}}, pHits, "key exists") testutil.Equals(t, emptyPostingsMisses, pMisses) - pHits, pMisses = cache.FetchMultiPostings(ctx, ulid.MustNew(1, nil), []labels.Label{lbls}) + pHits, pMisses = cache.FetchMultiPostings(ctx, ulid.MustNew(1, nil), []labels.Label{lbls}, CodecHeaderStreamedSnappy) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{lbls}, pMisses) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{{Name: "test", Value: "124"}}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{{Name: "test", Value: "124"}}, CodecHeaderStreamedSnappy) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{{Name: "test", Value: "124"}}, pMisses) @@ -298,7 +298,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { for i := 0; i < sliceHeaderSize; i++ { v = append(v, 3) } - cache.StorePostings(id, lbls2, v) + cache.StorePostings(id, lbls2, v, CodecHeaderStreamedSnappy) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -313,7 +313,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) // Eviction. // Evicted. - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}, CodecHeaderStreamedSnappy) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{lbls}, pMisses) @@ -321,12 +321,12 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, emptySeriesHits, sHits, "no such key") testutil.Equals(t, []storage.SeriesRef{1234}, sMisses) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}, CodecHeaderStreamedSnappy) testutil.Equals(t, map[labels.Label][]byte{lbls2: v}, pHits) testutil.Equals(t, emptyPostingsMisses, pMisses) // Add same item again. - cache.StorePostings(id, lbls2, v) + cache.StorePostings(id, lbls2, v, CodecHeaderStreamedSnappy) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -340,12 +340,12 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}, CodecHeaderStreamedSnappy) testutil.Equals(t, map[labels.Label][]byte{lbls2: v}, pHits) testutil.Equals(t, emptyPostingsMisses, pMisses) // Add too big item. - cache.StorePostings(id, labels.Label{Name: "test", Value: "toobig"}, append(v, 5)) + cache.StorePostings(id, labels.Label{Name: "test", Value: "toobig"}, append(v, 5), CodecHeaderStreamedSnappy) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(2*sliceHeaderSize+5), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) @@ -378,7 +378,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { lbls3 := labels.Label{Name: "test", Value: "124"} - cache.StorePostings(id, lbls3, []byte{}) + cache.StorePostings(id, lbls3, []byte{}, CodecHeaderStreamedSnappy) testutil.Equals(t, uint64(sliceHeaderSize), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -392,13 +392,13 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls3}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls3}, CodecHeaderStreamedSnappy) testutil.Equals(t, map[labels.Label][]byte{lbls3: {}}, pHits, "key exists") testutil.Equals(t, emptyPostingsMisses, pMisses) // nil works and still allocates empty slice. lbls4 := labels.Label{Name: "test", Value: "125"} - cache.StorePostings(id, lbls4, []byte(nil)) + cache.StorePostings(id, lbls4, []byte(nil), CodecHeaderStreamedSnappy) testutil.Equals(t, 2*uint64(sliceHeaderSize), cache.curSize) testutil.Equals(t, float64(2), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -412,7 +412,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls4}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls4}, CodecHeaderStreamedSnappy) testutil.Equals(t, map[labels.Label][]byte{lbls4: {}}, pHits, "key exists") testutil.Equals(t, emptyPostingsMisses, pMisses) diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index 8bdd0ca271..9f986cfb62 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -63,8 +63,11 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli // StorePostings sets the postings identified by the ulid and label to the value v. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. -func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { +func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, codec PostingsCodec) { key := cacheKey{blockID.String(), cacheKeyPostings(l)}.string() + if codec == CodecHeaderStreamedSnappy { + key += ":" + string(codec) + } if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) @@ -74,12 +77,16 @@ func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v [] // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. -func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { +func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label, codec PostingsCodec) (hits map[labels.Label][]byte, misses []labels.Label) { keys := make([]string, 0, len(lbls)) blockIDKey := blockID.String() + suffix := "" + if codec == CodecHeaderStreamedSnappy { + suffix = ":" + string(codec) + } for _, lbl := range lbls { - key := cacheKey{blockIDKey, cacheKeyPostings(lbl)}.string() + key := cacheKey{blockIDKey, cacheKeyPostings(lbl)}.string() + suffix keys = append(keys, key) } diff --git a/pkg/store/cache/memcached_test.go b/pkg/store/cache/memcached_test.go index 4911f77845..ae99700a36 100644 --- a/pkg/store/cache/memcached_test.go +++ b/pkg/store/cache/memcached_test.go @@ -92,11 +92,11 @@ func TestMemcachedIndexCache_FetchMultiPostings(t *testing.T) { // Store the postings expected before running the test. ctx := context.Background() for _, p := range testData.setup { - c.StorePostings(p.block, p.label, p.value) + c.StorePostings(p.block, p.label, p.value, CodecHeaderStreamedSnappy) } // Fetch postings from cached and assert on it. - hits, misses := c.FetchMultiPostings(ctx, testData.fetchBlockID, testData.fetchLabels) + hits, misses := c.FetchMultiPostings(ctx, testData.fetchBlockID, testData.fetchLabels, CodecHeaderStreamedSnappy) testutil.Equals(t, testData.expectedHits, hits) testutil.Equals(t, testData.expectedMisses, misses) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 5e2f0a9cc2..97357e7a76 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" extsnappy "github.com/thanos-io/thanos/pkg/extgrpc/snappy" + storecache "github.com/thanos-io/thanos/pkg/store/cache" ) // This file implements encoding and decoding of postings using diff (or delta) + varint @@ -28,11 +29,6 @@ import ( // single byte, values < 16384 are encoded as two bytes). Diff + varint reduces postings size // significantly (to about 20% of original), snappy then halves it to ~10% of the original. -const ( - codecHeaderSnappy = "dvs" // As in "diff+varint+snappy". - codecHeaderStreamedSnappy = "dss" // As in "diffvarint+streamed snappy". -) - func decodePostings(input []byte) (closeablePostings, error) { var df func([]byte) (closeablePostings, error) @@ -50,12 +46,12 @@ func decodePostings(input []byte) (closeablePostings, error) { // isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec. func isDiffVarintSnappyEncodedPostings(input []byte) bool { - return bytes.HasPrefix(input, []byte(codecHeaderSnappy)) + return bytes.HasPrefix(input, []byte(storecache.CodecHeaderSnappy)) } // isDiffVarintSnappyStreamedEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy streamed codec. func isDiffVarintSnappyStreamedEncodedPostings(input []byte) bool { - return bytes.HasPrefix(input, []byte(codecHeaderStreamedSnappy)) + return bytes.HasPrefix(input, []byte(storecache.CodecHeaderStreamedSnappy)) } // estimateSnappyStreamSize estimates the number of bytes @@ -69,7 +65,7 @@ func estimateSnappyStreamSize(length int) int { // Each encoded (or uncompressed) chunk needs tag (chunk type 1B + chunk len 3B) + checksum 4B. // Mark for encoded data. - ret := len(codecHeaderStreamedSnappy) + ret := len(storecache.CodecHeaderStreamedSnappy) // Magic snappy stream start. ret += 10 @@ -90,9 +86,9 @@ func estimateSnappyStreamSize(length int) int { func diffVarintSnappyStreamedEncode(p index.Postings, length int) ([]byte, error) { compressedBuf := bytes.NewBuffer(make([]byte, 0, estimateSnappyStreamSize(length))) - if n, err := compressedBuf.WriteString(codecHeaderStreamedSnappy); err != nil { + if n, err := compressedBuf.WriteString(string(storecache.CodecHeaderStreamedSnappy)); err != nil { return nil, fmt.Errorf("writing streamed snappy header") - } else if n != len(codecHeaderStreamedSnappy) { + } else if n != len(storecache.CodecHeaderStreamedSnappy) { return nil, fmt.Errorf("short-write streamed snappy header") } @@ -134,7 +130,7 @@ func diffVarintSnappyStreamedDecode(input []byte) (closeablePostings, error) { return nil, errors.New("header not found") } - return newStreamedDiffVarintPostings(input[len(codecHeaderStreamedSnappy):]) + return newStreamedDiffVarintPostings(input[len(storecache.CodecHeaderStreamedSnappy):]) } type streamedDiffVarintPostings struct { @@ -195,7 +191,7 @@ func (it *streamedDiffVarintPostings) Seek(x storage.SeriesRef) bool { // diffVarintSnappyEncode encodes postings into diff+varint representation, // and applies snappy compression on the result. -// Returned byte slice starts with codecHeaderSnappy header. +// Returned byte slice starts with storecache.CodecHeaderSnappy header. // Length argument is expected number of postings, used for preallocating buffer. // TODO(GiedriusS): remove for v1.0. func diffVarintSnappyEncode(p index.Postings, length int) ([]byte, error) { @@ -205,13 +201,13 @@ func diffVarintSnappyEncode(p index.Postings, length int) ([]byte, error) { } // Make result buffer large enough to hold our header and compressed block. - result := make([]byte, len(codecHeaderSnappy)+snappy.MaxEncodedLen(len(buf))) - copy(result, codecHeaderSnappy) + result := make([]byte, len(storecache.CodecHeaderSnappy)+snappy.MaxEncodedLen(len(buf))) + copy(result, storecache.CodecHeaderSnappy) - compressed := snappy.Encode(result[len(codecHeaderSnappy):], buf) + compressed := snappy.Encode(result[len(storecache.CodecHeaderSnappy):], buf) // Slice result buffer based on compressed size. - result = result[:len(codecHeaderSnappy)+len(compressed)] + result = result[:len(storecache.CodecHeaderSnappy)+len(compressed)] return result, nil } @@ -273,7 +269,7 @@ func diffVarintSnappyDecode(input []byte) (closeablePostings, error) { toFree = append(toFree, dstBuf) } - raw, err := s2.Decode(dstBuf, input[len(codecHeaderSnappy):]) + raw, err := s2.Decode(dstBuf, input[len(storecache.CodecHeaderSnappy):]) if err != nil { return nil, errors.Wrap(err, "snappy decode") } From 2151bf6b52a940bd92db79f1de941c632400d4ed Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 31 May 2023 14:56:55 -0700 Subject: [PATCH 2/9] add changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 83f7b07f30..416d44b18f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6322](https://github.com/thanos-io/thanos/pull/6322) Logging: Avoid expensive log.Valuer evaluation for disallowed levels. - [#6358](https://github.com/thanos-io/thanos/pull/6358) Query: Add +Inf bucket to query duration metrics - [#6363](https://github.com/thanos-io/thanos/pull/6363) Store: Check context error when expanding postings. +- [#6405](https://github.com/thanos-io/thanos/pull/6405) Store/Index Cache: Add codec header as part of the postings cache key. ### Removed From be50786d45c78fdd3a2611a899b02c37a7764714 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 2 Jun 2023 11:29:47 -0700 Subject: [PATCH 3/9] update code back Signed-off-by: Ben Ye --- pkg/store/bucket.go | 8 +++--- pkg/store/bucket_e2e_test.go | 8 +++--- pkg/store/cache/cache.go | 4 +-- pkg/store/cache/inmemory.go | 4 +-- pkg/store/cache/inmemory_test.go | 44 +++++++++++++++---------------- pkg/store/cache/memcached.go | 13 +++------ pkg/store/cache/memcached_test.go | 4 +-- pkg/store/postings_codec.go | 30 ++++++++++++--------- 8 files changed, 56 insertions(+), 59 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index fe7980cbff..dafc08bdd9 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -369,8 +369,8 @@ func (s *BucketStore) validate() error { type noopCache struct{} -func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte, storecache.PostingsCodec) {} -func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label, _ storecache.PostingsCodec) (map[labels.Label][]byte, []labels.Label) { +func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte) {} +func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) { return map[labels.Label][]byte{}, keys } @@ -2388,7 +2388,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab output := make([]index.Postings, len(keys)) // Fetch postings from the cache with a single call. - fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, storecache.CodecHeaderStreamedSnappy) + fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys) for _, dataFromCache := range fromCache { if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err) @@ -2528,7 +2528,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab // Truncate first 4 bytes which are length of posting. output[p.keyID] = newBigEndianPostings(pBytes[4:]) - r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache, storecache.CodecHeaderStreamedSnappy) + r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache) // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 607f2398df..9e1fd17ca4 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -58,12 +58,12 @@ func (c *swappableCache) SwapWith(ptr2 storecache.IndexCache) { c.ptr = ptr2 } -func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, codec storecache.PostingsCodec) { - c.ptr.StorePostings(blockID, l, v, codec) +func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { + c.ptr.StorePostings(blockID, l, v) } -func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, codec storecache.PostingsCodec) (map[labels.Label][]byte, []labels.Label) { - return c.ptr.FetchMultiPostings(ctx, blockID, keys, codec) +func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) { + return c.ptr.FetchMultiPostings(ctx, blockID, keys) } func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index d2ef4064a0..d7fc72f45e 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -39,11 +39,11 @@ const ( // (potentially with a deadline) as in the original user's request. type IndexCache interface { // StorePostings stores postings for a single series. - StorePostings(blockID ulid.ULID, l labels.Label, v []byte, codec PostingsCodec) + StorePostings(blockID ulid.ULID, l labels.Label, v []byte) // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. - FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, codec PostingsCodec) (hits map[labels.Label][]byte, misses []labels.Label) + FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) // StoreSeries stores a single series. StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index 6b6973c687..f0e121c265 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -289,13 +289,13 @@ func copyToKey(l labels.Label) cacheKeyPostings { // StorePostings sets the postings identified by the ulid and label to the value v, // if the postings already exists in the cache it is not mutated. -func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, _ PostingsCodec) { +func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { c.set(cacheTypePostings, cacheKey{block: blockID.String(), key: copyToKey(l)}, v) } // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. -func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label, _ PostingsCodec) (hits map[labels.Label][]byte, misses []labels.Label) { +func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { hits = map[labels.Label][]byte{} blockIDKey := blockID.String() diff --git a/pkg/store/cache/inmemory_test.go b/pkg/store/cache/inmemory_test.go index d891449012..d97d125cf8 100644 --- a/pkg/store/cache/inmemory_test.go +++ b/pkg/store/cache/inmemory_test.go @@ -79,14 +79,14 @@ func TestInMemoryIndexCache_AvoidsDeadlock(t *testing.T) { testutil.Ok(t, err) cache.lru = l - cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11}, CodecHeaderStreamedSnappy) + cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11}) testutil.Equals(t, uint64(sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(cache.curSize), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) // This triggers deadlock logic. - cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test1", Value: "1"}, []byte{42}, CodecHeaderStreamedSnappy) + cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test1", Value: "1"}, []byte{42}) testutil.Equals(t, uint64(sliceHeaderSize+1), cache.curSize) testutil.Equals(t, float64(cache.curSize), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) @@ -131,9 +131,9 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { }{ { typ: cacheTypePostings, - set: func(id storage.SeriesRef, b []byte) { cache.StorePostings(uid(id), lbl, b, CodecHeaderStreamedSnappy) }, + set: func(id storage.SeriesRef, b []byte) { cache.StorePostings(uid(id), lbl, b) }, get: func(id storage.SeriesRef) ([]byte, bool) { - hits, _ := cache.FetchMultiPostings(ctx, uid(id), []labels.Label{lbl}, CodecHeaderStreamedSnappy) + hits, _ := cache.FetchMultiPostings(ctx, uid(id), []labels.Label{lbl}) b, ok := hits[lbl] return b, ok @@ -210,9 +210,9 @@ func TestInMemoryIndexCache_MaxNumberOfItemsHit(t *testing.T) { id := ulid.MustNew(0, nil) - cache.StorePostings(id, labels.Label{Name: "test", Value: "123"}, []byte{42, 33}, CodecHeaderStreamedSnappy) - cache.StorePostings(id, labels.Label{Name: "test", Value: "124"}, []byte{42, 33}, CodecHeaderStreamedSnappy) - cache.StorePostings(id, labels.Label{Name: "test", Value: "125"}, []byte{42, 33}, CodecHeaderStreamedSnappy) + cache.StorePostings(id, labels.Label{Name: "test", Value: "123"}, []byte{42, 33}) + cache.StorePostings(id, labels.Label{Name: "test", Value: "124"}, []byte{42, 33}) + cache.StorePostings(id, labels.Label{Name: "test", Value: "125"}, []byte{42, 33}) testutil.Equals(t, uint64(2*sliceHeaderSize+4), cache.curSize) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) @@ -243,12 +243,12 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { emptySeriesHits := map[storage.SeriesRef][]byte{} emptySeriesMisses := []storage.SeriesRef(nil) - pHits, pMisses := cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}, CodecHeaderStreamedSnappy) + pHits, pMisses := cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{lbls}, pMisses) // Add sliceHeaderSize + 2 bytes. - cache.StorePostings(id, lbls, []byte{42, 33}, CodecHeaderStreamedSnappy) + cache.StorePostings(id, lbls, []byte{42, 33}) testutil.Equals(t, uint64(sliceHeaderSize+2), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(sliceHeaderSize+2), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) @@ -261,15 +261,15 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}, CodecHeaderStreamedSnappy) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}) testutil.Equals(t, map[labels.Label][]byte{lbls: {42, 33}}, pHits, "key exists") testutil.Equals(t, emptyPostingsMisses, pMisses) - pHits, pMisses = cache.FetchMultiPostings(ctx, ulid.MustNew(1, nil), []labels.Label{lbls}, CodecHeaderStreamedSnappy) + pHits, pMisses = cache.FetchMultiPostings(ctx, ulid.MustNew(1, nil), []labels.Label{lbls}) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{lbls}, pMisses) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{{Name: "test", Value: "124"}}, CodecHeaderStreamedSnappy) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{{Name: "test", Value: "124"}}) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{{Name: "test", Value: "124"}}, pMisses) @@ -298,7 +298,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { for i := 0; i < sliceHeaderSize; i++ { v = append(v, 3) } - cache.StorePostings(id, lbls2, v, CodecHeaderStreamedSnappy) + cache.StorePostings(id, lbls2, v) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -313,7 +313,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) // Eviction. // Evicted. - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}, CodecHeaderStreamedSnappy) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{lbls}, pMisses) @@ -321,12 +321,12 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, emptySeriesHits, sHits, "no such key") testutil.Equals(t, []storage.SeriesRef{1234}, sMisses) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}, CodecHeaderStreamedSnappy) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}) testutil.Equals(t, map[labels.Label][]byte{lbls2: v}, pHits) testutil.Equals(t, emptyPostingsMisses, pMisses) // Add same item again. - cache.StorePostings(id, lbls2, v, CodecHeaderStreamedSnappy) + cache.StorePostings(id, lbls2, v) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -340,12 +340,12 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}, CodecHeaderStreamedSnappy) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}) testutil.Equals(t, map[labels.Label][]byte{lbls2: v}, pHits) testutil.Equals(t, emptyPostingsMisses, pMisses) // Add too big item. - cache.StorePostings(id, labels.Label{Name: "test", Value: "toobig"}, append(v, 5), CodecHeaderStreamedSnappy) + cache.StorePostings(id, labels.Label{Name: "test", Value: "toobig"}, append(v, 5)) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(2*sliceHeaderSize+5), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) @@ -378,7 +378,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { lbls3 := labels.Label{Name: "test", Value: "124"} - cache.StorePostings(id, lbls3, []byte{}, CodecHeaderStreamedSnappy) + cache.StorePostings(id, lbls3, []byte{}) testutil.Equals(t, uint64(sliceHeaderSize), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -392,13 +392,13 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls3}, CodecHeaderStreamedSnappy) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls3}) testutil.Equals(t, map[labels.Label][]byte{lbls3: {}}, pHits, "key exists") testutil.Equals(t, emptyPostingsMisses, pMisses) // nil works and still allocates empty slice. lbls4 := labels.Label{Name: "test", Value: "125"} - cache.StorePostings(id, lbls4, []byte(nil), CodecHeaderStreamedSnappy) + cache.StorePostings(id, lbls4, []byte(nil)) testutil.Equals(t, 2*uint64(sliceHeaderSize), cache.curSize) testutil.Equals(t, float64(2), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -412,7 +412,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls4}, CodecHeaderStreamedSnappy) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls4}) testutil.Equals(t, map[labels.Label][]byte{lbls4: {}}, pHits, "key exists") testutil.Equals(t, emptyPostingsMisses, pMisses) diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index 9f986cfb62..8bdd0ca271 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -63,11 +63,8 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli // StorePostings sets the postings identified by the ulid and label to the value v. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. -func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, codec PostingsCodec) { +func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { key := cacheKey{blockID.String(), cacheKeyPostings(l)}.string() - if codec == CodecHeaderStreamedSnappy { - key += ":" + string(codec) - } if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) @@ -77,16 +74,12 @@ func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v [] // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. -func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label, codec PostingsCodec) (hits map[labels.Label][]byte, misses []labels.Label) { +func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { keys := make([]string, 0, len(lbls)) blockIDKey := blockID.String() - suffix := "" - if codec == CodecHeaderStreamedSnappy { - suffix = ":" + string(codec) - } for _, lbl := range lbls { - key := cacheKey{blockIDKey, cacheKeyPostings(lbl)}.string() + suffix + key := cacheKey{blockIDKey, cacheKeyPostings(lbl)}.string() keys = append(keys, key) } diff --git a/pkg/store/cache/memcached_test.go b/pkg/store/cache/memcached_test.go index ae99700a36..4911f77845 100644 --- a/pkg/store/cache/memcached_test.go +++ b/pkg/store/cache/memcached_test.go @@ -92,11 +92,11 @@ func TestMemcachedIndexCache_FetchMultiPostings(t *testing.T) { // Store the postings expected before running the test. ctx := context.Background() for _, p := range testData.setup { - c.StorePostings(p.block, p.label, p.value, CodecHeaderStreamedSnappy) + c.StorePostings(p.block, p.label, p.value) } // Fetch postings from cached and assert on it. - hits, misses := c.FetchMultiPostings(ctx, testData.fetchBlockID, testData.fetchLabels, CodecHeaderStreamedSnappy) + hits, misses := c.FetchMultiPostings(ctx, testData.fetchBlockID, testData.fetchLabels) testutil.Equals(t, testData.expectedHits, hits) testutil.Equals(t, testData.expectedMisses, misses) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 97357e7a76..dfd19b2c60 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -17,7 +17,6 @@ import ( "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" extsnappy "github.com/thanos-io/thanos/pkg/extgrpc/snappy" - storecache "github.com/thanos-io/thanos/pkg/store/cache" ) // This file implements encoding and decoding of postings using diff (or delta) + varint @@ -29,6 +28,11 @@ import ( // single byte, values < 16384 are encoded as two bytes). Diff + varint reduces postings size // significantly (to about 20% of original), snappy then halves it to ~10% of the original. +const ( + codecHeaderSnappy = "dvs" // As in "diff+varint+snappy". + codecHeaderStreamedSnappy = "dss" // As in "diffvarint+streamed snappy". +) + func decodePostings(input []byte) (closeablePostings, error) { var df func([]byte) (closeablePostings, error) @@ -46,12 +50,12 @@ func decodePostings(input []byte) (closeablePostings, error) { // isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec. func isDiffVarintSnappyEncodedPostings(input []byte) bool { - return bytes.HasPrefix(input, []byte(storecache.CodecHeaderSnappy)) + return bytes.HasPrefix(input, []byte(codecHeaderSnappy)) } // isDiffVarintSnappyStreamedEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy streamed codec. func isDiffVarintSnappyStreamedEncodedPostings(input []byte) bool { - return bytes.HasPrefix(input, []byte(storecache.CodecHeaderStreamedSnappy)) + return bytes.HasPrefix(input, []byte(codecHeaderStreamedSnappy)) } // estimateSnappyStreamSize estimates the number of bytes @@ -65,7 +69,7 @@ func estimateSnappyStreamSize(length int) int { // Each encoded (or uncompressed) chunk needs tag (chunk type 1B + chunk len 3B) + checksum 4B. // Mark for encoded data. - ret := len(storecache.CodecHeaderStreamedSnappy) + ret := len(codecHeaderStreamedSnappy) // Magic snappy stream start. ret += 10 @@ -86,9 +90,9 @@ func estimateSnappyStreamSize(length int) int { func diffVarintSnappyStreamedEncode(p index.Postings, length int) ([]byte, error) { compressedBuf := bytes.NewBuffer(make([]byte, 0, estimateSnappyStreamSize(length))) - if n, err := compressedBuf.WriteString(string(storecache.CodecHeaderStreamedSnappy)); err != nil { + if n, err := compressedBuf.WriteString(string(codecHeaderStreamedSnappy)); err != nil { return nil, fmt.Errorf("writing streamed snappy header") - } else if n != len(storecache.CodecHeaderStreamedSnappy) { + } else if n != len(codecHeaderStreamedSnappy) { return nil, fmt.Errorf("short-write streamed snappy header") } @@ -130,7 +134,7 @@ func diffVarintSnappyStreamedDecode(input []byte) (closeablePostings, error) { return nil, errors.New("header not found") } - return newStreamedDiffVarintPostings(input[len(storecache.CodecHeaderStreamedSnappy):]) + return newStreamedDiffVarintPostings(input[len(codecHeaderStreamedSnappy):]) } type streamedDiffVarintPostings struct { @@ -191,7 +195,7 @@ func (it *streamedDiffVarintPostings) Seek(x storage.SeriesRef) bool { // diffVarintSnappyEncode encodes postings into diff+varint representation, // and applies snappy compression on the result. -// Returned byte slice starts with storecache.CodecHeaderSnappy header. +// Returned byte slice starts with codecHeaderSnappy header. // Length argument is expected number of postings, used for preallocating buffer. // TODO(GiedriusS): remove for v1.0. func diffVarintSnappyEncode(p index.Postings, length int) ([]byte, error) { @@ -201,13 +205,13 @@ func diffVarintSnappyEncode(p index.Postings, length int) ([]byte, error) { } // Make result buffer large enough to hold our header and compressed block. - result := make([]byte, len(storecache.CodecHeaderSnappy)+snappy.MaxEncodedLen(len(buf))) - copy(result, storecache.CodecHeaderSnappy) + result := make([]byte, len(codecHeaderSnappy)+snappy.MaxEncodedLen(len(buf))) + copy(result, codecHeaderSnappy) - compressed := snappy.Encode(result[len(storecache.CodecHeaderSnappy):], buf) + compressed := snappy.Encode(result[len(codecHeaderSnappy):], buf) // Slice result buffer based on compressed size. - result = result[:len(storecache.CodecHeaderSnappy)+len(compressed)] + result = result[:len(codecHeaderSnappy)+len(compressed)] return result, nil } @@ -269,7 +273,7 @@ func diffVarintSnappyDecode(input []byte) (closeablePostings, error) { toFree = append(toFree, dstBuf) } - raw, err := s2.Decode(dstBuf, input[len(storecache.CodecHeaderSnappy):]) + raw, err := s2.Decode(dstBuf, input[len(codecHeaderSnappy):]) if err != nil { return nil, errors.Wrap(err, "snappy decode") } From bd8afe58f8fc44ebcb05e374f9eff4053970ab0c Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 2 Jun 2023 12:02:47 -0700 Subject: [PATCH 4/9] add colon Signed-off-by: Ben Ye --- pkg/store/cache/cache.go | 10 ++-------- pkg/store/postings_codec.go | 2 +- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index d7fc72f45e..fcad461b75 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -25,13 +25,6 @@ var ( ulidSize = uint64(len(ulid.ULID{})) ) -type PostingsCodec string - -const ( - CodecHeaderSnappy PostingsCodec = "dvs" // As in "diff+varint+snappy". - CodecHeaderStreamedSnappy PostingsCodec = "dss" // As in "diffvarint+streamed snappy". -) - // IndexCache is the interface exported by index cache backends. // Store operations do not support context.Context, deadlines need to be // supported by the backends themselves. This is because Set operations are @@ -86,7 +79,8 @@ func (c cacheKey) string() string { // which would end up in wrong query results. lbl := c.key.(cacheKeyPostings) lblHash := blake2b.Sum256([]byte(lbl.Name + ":" + lbl.Value)) - return "P:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:]) + // Add : at the end to force using a new cache key for postings. + return "P:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:]) + ":" case cacheKeySeries: return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10) default: diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index dfd19b2c60..5e2f0a9cc2 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -90,7 +90,7 @@ func estimateSnappyStreamSize(length int) int { func diffVarintSnappyStreamedEncode(p index.Postings, length int) ([]byte, error) { compressedBuf := bytes.NewBuffer(make([]byte, 0, estimateSnappyStreamSize(length))) - if n, err := compressedBuf.WriteString(string(codecHeaderStreamedSnappy)); err != nil { + if n, err := compressedBuf.WriteString(codecHeaderStreamedSnappy); err != nil { return nil, fmt.Errorf("writing streamed snappy header") } else if n != len(codecHeaderStreamedSnappy) { return nil, fmt.Errorf("short-write streamed snappy header") From e7d3a53bf0bda055a8a1de088cdaf89d7d838ded Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 2 Jun 2023 12:04:56 -0700 Subject: [PATCH 5/9] update changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 416d44b18f..e878c80956 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,7 +53,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6322](https://github.com/thanos-io/thanos/pull/6322) Logging: Avoid expensive log.Valuer evaluation for disallowed levels. - [#6358](https://github.com/thanos-io/thanos/pull/6358) Query: Add +Inf bucket to query duration metrics - [#6363](https://github.com/thanos-io/thanos/pull/6363) Store: Check context error when expanding postings. -- [#6405](https://github.com/thanos-io/thanos/pull/6405) Store/Index Cache: Add codec header as part of the postings cache key. +- [#6405](https://github.com/thanos-io/thanos/pull/6405) Index Cache: Change postings cache key due to the streaming snappy encoding change. ### Removed From d1bf9040f0e434be7ef904efb5fe767f0f20d3c6 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 2 Jun 2023 13:14:56 -0700 Subject: [PATCH 6/9] fix another test Signed-off-by: Ben Ye --- pkg/store/cache/cache_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/store/cache/cache_test.go b/pkg/store/cache/cache_test.go index 889009259b..7db379af52 100644 --- a/pkg/store/cache/cache_test.go +++ b/pkg/store/cache/cache_test.go @@ -38,7 +38,7 @@ func TestCacheKey_string(t *testing.T) { hash := blake2b.Sum256([]byte("foo:bar")) encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:]) - return fmt.Sprintf("P:%s:%s", uid.String(), encodedHash) + return fmt.Sprintf("P:%s:%s:", uid.String(), encodedHash) }(), }, "should stringify series cache key": { @@ -66,7 +66,7 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) { expectedLen int }{ "should guarantee reasonably short key length for postings": { - expectedLen: 72, + expectedLen: 73, keys: []cacheKey{ {ulidString, cacheKeyPostings(labels.Label{Name: "a", Value: "b"})}, {ulidString, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})}, From 335377384894f61730960c99d33021360e315c07 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 5 Jun 2023 00:21:03 -0700 Subject: [PATCH 7/9] add compression scheme const to remote index cache Signed-off-by: Ben Ye --- pkg/store/cache/cache.go | 2 +- pkg/store/cache/cache_test.go | 4 ++-- pkg/store/cache/memcached.go | 20 +++++++++++++++++--- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index fcad461b75..3c25ecc527 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -80,7 +80,7 @@ func (c cacheKey) string() string { lbl := c.key.(cacheKeyPostings) lblHash := blake2b.Sum256([]byte(lbl.Name + ":" + lbl.Value)) // Add : at the end to force using a new cache key for postings. - return "P:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:]) + ":" + return "P:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:]) case cacheKeySeries: return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10) default: diff --git a/pkg/store/cache/cache_test.go b/pkg/store/cache/cache_test.go index 7db379af52..889009259b 100644 --- a/pkg/store/cache/cache_test.go +++ b/pkg/store/cache/cache_test.go @@ -38,7 +38,7 @@ func TestCacheKey_string(t *testing.T) { hash := blake2b.Sum256([]byte("foo:bar")) encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:]) - return fmt.Sprintf("P:%s:%s:", uid.String(), encodedHash) + return fmt.Sprintf("P:%s:%s", uid.String(), encodedHash) }(), }, "should stringify series cache key": { @@ -66,7 +66,7 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) { expectedLen int }{ "should guarantee reasonably short key length for postings": { - expectedLen: 73, + expectedLen: 72, keys: []cacheKey{ {ulidString, cacheKeyPostings(labels.Label{Name: "a", Value: "b"})}, {ulidString, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})}, diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index 8bdd0ca271..c7eeb6dbf3 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -22,11 +22,17 @@ const ( memcachedDefaultTTL = 24 * time.Hour ) +const ( + compressionSchemeStreamedSnappy = "dvs" +) + // RemoteIndexCache is a memcached-based index cache. type RemoteIndexCache struct { logger log.Logger memcached cacheutil.RemoteCacheClient + compressionScheme string + // Metrics. postingRequests prometheus.Counter seriesRequests prometheus.Counter @@ -37,8 +43,9 @@ type RemoteIndexCache struct { // NewRemoteIndexCache makes a new RemoteIndexCache. func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheClient, reg prometheus.Registerer) (*RemoteIndexCache, error) { c := &RemoteIndexCache{ - logger: logger, - memcached: cacheClient, + logger: logger, + memcached: cacheClient, + compressionScheme: compressionSchemeStreamedSnappy, // Hardcode it for now. Expose it once we supporter different types of compressions. } requests := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ @@ -65,6 +72,9 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli // asynchronously stored in the cache. func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { key := cacheKey{blockID.String(), cacheKeyPostings(l)}.string() + if len(c.compressionScheme) > 0 { + key += ":" + c.compressionScheme + } if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) @@ -78,8 +88,12 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. keys := make([]string, 0, len(lbls)) blockIDKey := blockID.String() + suffix := "" + if len(c.compressionScheme) > 0 { + suffix += ":" + c.compressionScheme + } for _, lbl := range lbls { - key := cacheKey{blockIDKey, cacheKeyPostings(lbl)}.string() + key := cacheKey{blockIDKey, cacheKeyPostings(lbl)}.string() + suffix keys = append(keys, key) } From 29d86cc58c5aaebe01acadd3e214291e41ddd4ad Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 5 Jun 2023 08:59:55 -0700 Subject: [PATCH 8/9] address required comments Signed-off-by: Ben Ye --- CHANGELOG.md | 2 +- pkg/store/cache/cache.go | 9 +++++++-- pkg/store/cache/cache_test.go | 14 +++++++------- pkg/store/cache/inmemory.go | 6 +++--- pkg/store/cache/memcached.go | 18 +++++------------- 5 files changed, 23 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e878c80956..48c05c12a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,7 +53,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6322](https://github.com/thanos-io/thanos/pull/6322) Logging: Avoid expensive log.Valuer evaluation for disallowed levels. - [#6358](https://github.com/thanos-io/thanos/pull/6358) Query: Add +Inf bucket to query duration metrics - [#6363](https://github.com/thanos-io/thanos/pull/6363) Store: Check context error when expanding postings. -- [#6405](https://github.com/thanos-io/thanos/pull/6405) Index Cache: Change postings cache key due to the streaming snappy encoding change. +- [#6405](https://github.com/thanos-io/thanos/pull/6405) Index Cache: Change postings cache key to include the encoding format used so that older Thanos versions would not try to decode it during the deployment of a new version. ### Removed diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index 3c25ecc527..c849073c74 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -49,6 +49,8 @@ type IndexCache interface { type cacheKey struct { block string key interface{} + + compression string } func (c cacheKey) keyType() string { @@ -79,8 +81,11 @@ func (c cacheKey) string() string { // which would end up in wrong query results. lbl := c.key.(cacheKeyPostings) lblHash := blake2b.Sum256([]byte(lbl.Name + ":" + lbl.Value)) - // Add : at the end to force using a new cache key for postings. - return "P:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:]) + key := "P:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:]) + if len(c.compression) > 0 { + key += ":" + c.compression + } + return key case cacheKeySeries: return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10) default: diff --git a/pkg/store/cache/cache_test.go b/pkg/store/cache/cache_test.go index 889009259b..542c5084c3 100644 --- a/pkg/store/cache/cache_test.go +++ b/pkg/store/cache/cache_test.go @@ -33,7 +33,7 @@ func TestCacheKey_string(t *testing.T) { expected string }{ "should stringify postings cache key": { - key: cacheKey{ulidString, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"})}, + key: cacheKey{ulidString, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"}), ""}, expected: func() string { hash := blake2b.Sum256([]byte("foo:bar")) encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:]) @@ -42,7 +42,7 @@ func TestCacheKey_string(t *testing.T) { }(), }, "should stringify series cache key": { - key: cacheKey{ulidString, cacheKeySeries(12345)}, + key: cacheKey{ulidString, cacheKeySeries(12345), ""}, expected: fmt.Sprintf("S:%s:12345", uid.String()), }, } @@ -68,14 +68,14 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) { "should guarantee reasonably short key length for postings": { expectedLen: 72, keys: []cacheKey{ - {ulidString, cacheKeyPostings(labels.Label{Name: "a", Value: "b"})}, - {ulidString, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})}, + {ulidString, cacheKeyPostings(labels.Label{Name: "a", Value: "b"}), ""}, + {ulidString, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)}), ""}, }, }, "should guarantee reasonably short key length for series": { expectedLen: 49, keys: []cacheKey{ - {ulidString, cacheKeySeries(math.MaxUint64)}, + {ulidString, cacheKeySeries(math.MaxUint64), ""}, }, }, } @@ -91,7 +91,7 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) { func BenchmarkCacheKey_string_Postings(b *testing.B) { uid := ulid.MustNew(1, nil) - key := cacheKey{uid.String(), cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})} + key := cacheKey{uid.String(), cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)}), ""} b.ResetTimer() for i := 0; i < b.N; i++ { @@ -101,7 +101,7 @@ func BenchmarkCacheKey_string_Postings(b *testing.B) { func BenchmarkCacheKey_string_Series(b *testing.B) { uid := ulid.MustNew(1, nil) - key := cacheKey{uid.String(), cacheKeySeries(math.MaxUint64)} + key := cacheKey{uid.String(), cacheKeySeries(math.MaxUint64), ""} b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index f0e121c265..d7ecc60814 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -300,7 +300,7 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid. blockIDKey := blockID.String() for _, key := range keys { - if b, ok := c.get(cacheTypePostings, cacheKey{blockIDKey, cacheKeyPostings(key)}); ok { + if b, ok := c.get(cacheTypePostings, cacheKey{blockIDKey, cacheKeyPostings(key), ""}); ok { hits[key] = b continue } @@ -314,7 +314,7 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid. // StoreSeries sets the series identified by the ulid and id to the value v, // if the series already exists in the cache it is not mutated. func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { - c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id)}, v) + c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id), ""}, v) } // FetchMultiSeries fetches multiple series - each identified by ID - from the cache @@ -324,7 +324,7 @@ func (c *InMemoryIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.UL blockIDKey := blockID.String() for _, id := range ids { - if b, ok := c.get(cacheTypeSeries, cacheKey{blockIDKey, cacheKeySeries(id)}); ok { + if b, ok := c.get(cacheTypeSeries, cacheKey{blockIDKey, cacheKeySeries(id), ""}); ok { hits[id] = b continue } diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index c7eeb6dbf3..a65cbae8a5 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -45,7 +45,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli c := &RemoteIndexCache{ logger: logger, memcached: cacheClient, - compressionScheme: compressionSchemeStreamedSnappy, // Hardcode it for now. Expose it once we supporter different types of compressions. + compressionScheme: compressionSchemeStreamedSnappy, // Hardcode it for now. Expose it once we support different types of compressions. } requests := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ @@ -71,11 +71,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { - key := cacheKey{blockID.String(), cacheKeyPostings(l)}.string() - if len(c.compressionScheme) > 0 { - key += ":" + c.compressionScheme - } - + key := cacheKey{blockID.String(), cacheKeyPostings(l), c.compressionScheme}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) } @@ -88,12 +84,8 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. keys := make([]string, 0, len(lbls)) blockIDKey := blockID.String() - suffix := "" - if len(c.compressionScheme) > 0 { - suffix += ":" + c.compressionScheme - } for _, lbl := range lbls { - key := cacheKey{blockIDKey, cacheKeyPostings(lbl)}.string() + suffix + key := cacheKey{blockIDKey, cacheKeyPostings(lbl), c.compressionScheme}.string() keys = append(keys, key) } @@ -127,7 +119,7 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { - key := cacheKey{blockID.String(), cacheKeySeries(id)}.string() + key := cacheKey{blockID.String(), cacheKeySeries(id), ""}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { level.Error(c.logger).Log("msg", "failed to cache series in memcached", "err", err) @@ -142,7 +134,7 @@ func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.UL blockIDKey := blockID.String() for _, id := range ids { - key := cacheKey{blockIDKey, cacheKeySeries(id)}.string() + key := cacheKey{blockIDKey, cacheKeySeries(id), ""}.string() keys = append(keys, key) } From 7f9b88b6a55c4a8c39abc781e7a1720cf2b131c5 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 6 Jun 2023 16:10:26 -0700 Subject: [PATCH 9/9] fix compression scheme name Signed-off-by: Ben Ye --- pkg/store/cache/memcached.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index a65cbae8a5..16a5b92cec 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -23,7 +23,7 @@ const ( ) const ( - compressionSchemeStreamedSnappy = "dvs" + compressionSchemeStreamedSnappy = "dss" ) // RemoteIndexCache is a memcached-based index cache.