Skip to content

Commit

Permalink
Add tracing index cache (#6954)
Browse files Browse the repository at this point in the history
* add tracing index cache

Signed-off-by: Ben Ye <benye@amazon.com>

* changelog

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed Dec 6, 2023
1 parent 36ce448 commit 5cef7d9
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6891](https://github.com/thanos-io/thanos/pull/6891) Objstore: Bump `objstore` which adds support for Azure Workload Identity.
- [#6453](https://github.com/thanos-io/thanos/pull/6453) Sidecar: Added `--reloader.method` to support configuration reloads via SIHUP signal.
- [#6925](https://github.com/thanos-io/thanos/pull/6925) Store Gateway: Support float native histogram.
- [#6954](https://github.com/thanos-io/thanos/pull/6954) Index Cache: Support tracing for fetch APIs.

### Changed

Expand Down
1 change: 1 addition & 0 deletions pkg/store/cache/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg
return nil, errors.Wrap(err, fmt.Sprintf("create %s index cache", cacheConfig.Type))
}

cache = NewTracingIndexCache(string(cacheConfig.Type), cache)
if len(cacheConfig.EnabledItems) > 0 {
if err = ValidateEnabledItems(cacheConfig.EnabledItems); err != nil {
return nil, err
Expand Down
92 changes: 92 additions & 0 deletions pkg/store/cache/tracing_index_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storecache

import (
"context"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/tracing"
)

type TracingIndexCache struct {
name string
cache IndexCache
}

// NewTracingIndexCache creates an index cache wrapper with traces instrumentation.
func NewTracingIndexCache(name string, cache IndexCache) *TracingIndexCache {
return &TracingIndexCache{name: name, cache: cache}
}

// StorePostings stores postings for a single series.
func (c *TracingIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
c.cache.StorePostings(blockID, l, v, tenant)
}

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
func (c *TracingIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) {
span, newCtx := tracing.StartSpan(ctx, "fetch_multi_postings", tracing.Tags{
"name": c.name,
"block.id": blockID.String(),
"requested": len(keys),
})
defer span.Finish()
hits, misses = c.cache.FetchMultiPostings(newCtx, blockID, keys, tenant)
span.SetTag("hits", len(hits))
dataBytes := 0
for _, v := range hits {
dataBytes += len(v)
}
span.SetTag("bytes", dataBytes)
return hits, misses
}

// StoreExpandedPostings stores expanded postings for a set of label matchers.
func (c *TracingIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
c.cache.StoreExpandedPostings(blockID, matchers, v, tenant)
}

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
func (c *TracingIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) (data []byte, exists bool) {
span, newCtx := tracing.StartSpan(ctx, "fetch_expanded_postings", tracing.Tags{
"name": c.name,
"block.id": blockID.String(),
})
defer span.Finish()
data, exists = c.cache.FetchExpandedPostings(newCtx, blockID, matchers, tenant)
if exists {
span.SetTag("bytes", len(data))
}
return data, exists
}

// StoreSeries stores a single series. Skip intrumenting this method
// excessive spans as a single request can store millions of serieses.
func (c *TracingIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
c.cache.StoreSeries(blockID, id, v, tenant)
}

// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
// and returns a map containing cache hits, along with a list of missing IDs.
func (c *TracingIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
span, newCtx := tracing.StartSpan(ctx, "fetch_multi_series", tracing.Tags{
"name": c.name,
"block.id": blockID.String(),
"requested": len(ids),
})
defer span.Finish()
hits, misses = c.cache.FetchMultiSeries(newCtx, blockID, ids, tenant)
span.SetTag("hits", len(hits))
dataBytes := 0
for _, v := range hits {
dataBytes += len(v)
}
span.SetTag("bytes", dataBytes)
return hits, misses
}

0 comments on commit 5cef7d9

Please sign in to comment.