diff --git a/CHANGELOG.md b/CHANGELOG.md index e3ca073320..52c55d8b4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6891](https://github.com/thanos-io/thanos/pull/6891) Objstore: Bump `objstore` which adds support for Azure Workload Identity. - [#6453](https://github.com/thanos-io/thanos/pull/6453) Sidecar: Added `--reloader.method` to support configuration reloads via SIHUP signal. - [#6925](https://github.com/thanos-io/thanos/pull/6925) Store Gateway: Support float native histogram. +- [#6954](https://github.com/thanos-io/thanos/pull/6954) Index Cache: Support tracing for fetch APIs. ### Changed diff --git a/pkg/store/cache/factory.go b/pkg/store/cache/factory.go index 80ba850b4c..78279e7b2d 100644 --- a/pkg/store/cache/factory.go +++ b/pkg/store/cache/factory.go @@ -78,6 +78,7 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg return nil, errors.Wrap(err, fmt.Sprintf("create %s index cache", cacheConfig.Type)) } + cache = NewTracingIndexCache(string(cacheConfig.Type), cache) if len(cacheConfig.EnabledItems) > 0 { if err = ValidateEnabledItems(cacheConfig.EnabledItems); err != nil { return nil, err diff --git a/pkg/store/cache/tracing_index_cache.go b/pkg/store/cache/tracing_index_cache.go new file mode 100644 index 0000000000..a72ce0d664 --- /dev/null +++ b/pkg/store/cache/tracing_index_cache.go @@ -0,0 +1,92 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "context" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + + "github.com/thanos-io/thanos/pkg/tracing" +) + +type TracingIndexCache struct { + name string + cache IndexCache +} + +// NewTracingIndexCache creates an index cache wrapper with traces instrumentation. +func NewTracingIndexCache(name string, cache IndexCache) *TracingIndexCache { + return &TracingIndexCache{name: name, cache: cache} +} + +// StorePostings stores postings for a single series. +func (c *TracingIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { + c.cache.StorePostings(blockID, l, v, tenant) +} + +// FetchMultiPostings fetches multiple postings - each identified by a label - +// and returns a map containing cache hits, along with a list of missing keys. +func (c *TracingIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { + span, newCtx := tracing.StartSpan(ctx, "fetch_multi_postings", tracing.Tags{ + "name": c.name, + "block.id": blockID.String(), + "requested": len(keys), + }) + defer span.Finish() + hits, misses = c.cache.FetchMultiPostings(newCtx, blockID, keys, tenant) + span.SetTag("hits", len(hits)) + dataBytes := 0 + for _, v := range hits { + dataBytes += len(v) + } + span.SetTag("bytes", dataBytes) + return hits, misses +} + +// StoreExpandedPostings stores expanded postings for a set of label matchers. +func (c *TracingIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { + c.cache.StoreExpandedPostings(blockID, matchers, v, tenant) +} + +// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. +func (c *TracingIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) (data []byte, exists bool) { + span, newCtx := tracing.StartSpan(ctx, "fetch_expanded_postings", tracing.Tags{ + "name": c.name, + "block.id": blockID.String(), + }) + defer span.Finish() + data, exists = c.cache.FetchExpandedPostings(newCtx, blockID, matchers, tenant) + if exists { + span.SetTag("bytes", len(data)) + } + return data, exists +} + +// StoreSeries stores a single series. Skip intrumenting this method +// excessive spans as a single request can store millions of serieses. +func (c *TracingIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { + c.cache.StoreSeries(blockID, id, v, tenant) +} + +// FetchMultiSeries fetches multiple series - each identified by ID - from the cache +// and returns a map containing cache hits, along with a list of missing IDs. +func (c *TracingIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + span, newCtx := tracing.StartSpan(ctx, "fetch_multi_series", tracing.Tags{ + "name": c.name, + "block.id": blockID.String(), + "requested": len(ids), + }) + defer span.Finish() + hits, misses = c.cache.FetchMultiSeries(newCtx, blockID, ids, tenant) + span.SetTag("hits", len(hits)) + dataBytes := 0 + for _, v := range hits { + dataBytes += len(v) + } + span.SetTag("bytes", dataBytes) + return hits, misses +}