diff --git a/CHANGELOG.md b/CHANGELOG.md index 83f7b07f30..48c05c12a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6322](https://github.com/thanos-io/thanos/pull/6322) Logging: Avoid expensive log.Valuer evaluation for disallowed levels. - [#6358](https://github.com/thanos-io/thanos/pull/6358) Query: Add +Inf bucket to query duration metrics - [#6363](https://github.com/thanos-io/thanos/pull/6363) Store: Check context error when expanding postings. +- [#6405](https://github.com/thanos-io/thanos/pull/6405) Index Cache: Change postings cache key to include the encoding format used so that older Thanos versions would not try to decode it during the deployment of a new version. ### Removed diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index aac90fc148..c849073c74 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -49,6 +49,8 @@ type IndexCache interface { type cacheKey struct { block string key interface{} + + compression string } func (c cacheKey) keyType() string { @@ -79,7 +81,11 @@ func (c cacheKey) string() string { // which would end up in wrong query results. lbl := c.key.(cacheKeyPostings) lblHash := blake2b.Sum256([]byte(lbl.Name + ":" + lbl.Value)) - return "P:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:]) + key := "P:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:]) + if len(c.compression) > 0 { + key += ":" + c.compression + } + return key case cacheKeySeries: return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10) default: diff --git a/pkg/store/cache/cache_test.go b/pkg/store/cache/cache_test.go index 889009259b..542c5084c3 100644 --- a/pkg/store/cache/cache_test.go +++ b/pkg/store/cache/cache_test.go @@ -33,7 +33,7 @@ func TestCacheKey_string(t *testing.T) { expected string }{ "should stringify postings cache key": { - key: cacheKey{ulidString, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"})}, + key: cacheKey{ulidString, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"}), ""}, expected: func() string { hash := blake2b.Sum256([]byte("foo:bar")) encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:]) @@ -42,7 +42,7 @@ func TestCacheKey_string(t *testing.T) { }(), }, "should stringify series cache key": { - key: cacheKey{ulidString, cacheKeySeries(12345)}, + key: cacheKey{ulidString, cacheKeySeries(12345), ""}, expected: fmt.Sprintf("S:%s:12345", uid.String()), }, } @@ -68,14 +68,14 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) { "should guarantee reasonably short key length for postings": { expectedLen: 72, keys: []cacheKey{ - {ulidString, cacheKeyPostings(labels.Label{Name: "a", Value: "b"})}, - {ulidString, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})}, + {ulidString, cacheKeyPostings(labels.Label{Name: "a", Value: "b"}), ""}, + {ulidString, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)}), ""}, }, }, "should guarantee reasonably short key length for series": { expectedLen: 49, keys: []cacheKey{ - {ulidString, cacheKeySeries(math.MaxUint64)}, + {ulidString, cacheKeySeries(math.MaxUint64), ""}, }, }, } @@ -91,7 +91,7 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) { func BenchmarkCacheKey_string_Postings(b *testing.B) { uid := ulid.MustNew(1, nil) - key := cacheKey{uid.String(), cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})} + key := cacheKey{uid.String(), cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)}), ""} b.ResetTimer() for i := 0; i < b.N; i++ { @@ -101,7 +101,7 @@ func BenchmarkCacheKey_string_Postings(b *testing.B) { func BenchmarkCacheKey_string_Series(b *testing.B) { uid := ulid.MustNew(1, nil) - key := cacheKey{uid.String(), cacheKeySeries(math.MaxUint64)} + key := cacheKey{uid.String(), cacheKeySeries(math.MaxUint64), ""} b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index f0e121c265..d7ecc60814 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -300,7 +300,7 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid. blockIDKey := blockID.String() for _, key := range keys { - if b, ok := c.get(cacheTypePostings, cacheKey{blockIDKey, cacheKeyPostings(key)}); ok { + if b, ok := c.get(cacheTypePostings, cacheKey{blockIDKey, cacheKeyPostings(key), ""}); ok { hits[key] = b continue } @@ -314,7 +314,7 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid. // StoreSeries sets the series identified by the ulid and id to the value v, // if the series already exists in the cache it is not mutated. func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { - c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id)}, v) + c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id), ""}, v) } // FetchMultiSeries fetches multiple series - each identified by ID - from the cache @@ -324,7 +324,7 @@ func (c *InMemoryIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.UL blockIDKey := blockID.String() for _, id := range ids { - if b, ok := c.get(cacheTypeSeries, cacheKey{blockIDKey, cacheKeySeries(id)}); ok { + if b, ok := c.get(cacheTypeSeries, cacheKey{blockIDKey, cacheKeySeries(id), ""}); ok { hits[id] = b continue } diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index 8bdd0ca271..16a5b92cec 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -22,11 +22,17 @@ const ( memcachedDefaultTTL = 24 * time.Hour ) +const ( + compressionSchemeStreamedSnappy = "dss" +) + // RemoteIndexCache is a memcached-based index cache. type RemoteIndexCache struct { logger log.Logger memcached cacheutil.RemoteCacheClient + compressionScheme string + // Metrics. postingRequests prometheus.Counter seriesRequests prometheus.Counter @@ -37,8 +43,9 @@ type RemoteIndexCache struct { // NewRemoteIndexCache makes a new RemoteIndexCache. func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheClient, reg prometheus.Registerer) (*RemoteIndexCache, error) { c := &RemoteIndexCache{ - logger: logger, - memcached: cacheClient, + logger: logger, + memcached: cacheClient, + compressionScheme: compressionSchemeStreamedSnappy, // Hardcode it for now. Expose it once we support different types of compressions. } requests := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ @@ -64,8 +71,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { - key := cacheKey{blockID.String(), cacheKeyPostings(l)}.string() - + key := cacheKey{blockID.String(), cacheKeyPostings(l), c.compressionScheme}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) } @@ -79,7 +85,7 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. blockIDKey := blockID.String() for _, lbl := range lbls { - key := cacheKey{blockIDKey, cacheKeyPostings(lbl)}.string() + key := cacheKey{blockIDKey, cacheKeyPostings(lbl), c.compressionScheme}.string() keys = append(keys, key) } @@ -113,7 +119,7 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { - key := cacheKey{blockID.String(), cacheKeySeries(id)}.string() + key := cacheKey{blockID.String(), cacheKeySeries(id), ""}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { level.Error(c.logger).Log("msg", "failed to cache series in memcached", "err", err) @@ -128,7 +134,7 @@ func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.UL blockIDKey := blockID.String() for _, id := range ids { - key := cacheKey{blockIDKey, cacheKeySeries(id)}.string() + key := cacheKey{blockIDKey, cacheKeySeries(id), ""}.string() keys = append(keys, key) }