Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query-frontend: inject query cache keys for LabelValues/Cardinality requests #6849

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions pkg/frontend/querymiddleware/cardinality_query_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package querymiddleware

import (
"context"
"errors"
"net/http"
"net/url"
Expand All @@ -22,53 +23,53 @@ const (
cardinalityActiveSeriesQueryCachePrefix = "ca:"
)

func newCardinalityQueryCacheRoundTripper(cache cache.Cache, limits Limits, next http.RoundTripper, logger log.Logger, reg prometheus.Registerer) http.RoundTripper {
delegate := &cardinalityQueryCache{
func newCardinalityQueryCacheRoundTripper(cache cache.Cache, splitter CacheSplitter, limits Limits, next http.RoundTripper, logger log.Logger, reg prometheus.Registerer) http.RoundTripper {
ttl := &cardinalityQueryTTL{
limits: limits,
}

return newGenericQueryCacheRoundTripper(cache, delegate, next, logger, newResultsCacheMetrics("cardinality", reg))
return newGenericQueryCacheRoundTripper(cache, splitter.GenerateLabelValuesCardinalityCacheKey, ttl, next, logger, newResultsCacheMetrics("cardinality", reg))
}

type cardinalityQueryCache struct {
type cardinalityQueryTTL struct {
limits Limits
}

func (c *cardinalityQueryCache) getTTL(userID string) time.Duration {
func (c *cardinalityQueryTTL) ttl(userID string) time.Duration {
return c.limits.ResultsCacheTTLForCardinalityQuery(userID)
}

func (c *cardinalityQueryCache) parseRequest(path string, values url.Values) (*genericQueryRequest, error) {
func (DefaultCacheSplitter) GenerateLabelValuesCardinalityCacheKey(ctx context.Context, userID, path string, values url.Values) (*GenericQueryCacheKey, error) {
switch {
case strings.HasSuffix(path, cardinalityLabelNamesPathSuffix):
parsed, err := cardinality.DecodeLabelNamesRequestFromValues(values)
if err != nil {
return nil, err
}

return &genericQueryRequest{
cacheKey: parsed.String(),
cacheKeyPrefix: cardinalityLabelNamesQueryCachePrefix,
return &GenericQueryCacheKey{
CacheKey: parsed.String(),
CacheKeyPrefix: cardinalityLabelNamesQueryCachePrefix,
}, nil
case strings.HasSuffix(path, cardinalityLabelValuesPathSuffix):
parsed, err := cardinality.DecodeLabelValuesRequestFromValues(values)
if err != nil {
return nil, err
}

return &genericQueryRequest{
cacheKey: parsed.String(),
cacheKeyPrefix: cardinalityLabelValuesQueryCachePrefix,
return &GenericQueryCacheKey{
CacheKey: parsed.String(),
CacheKeyPrefix: cardinalityLabelValuesQueryCachePrefix,
}, nil
case strings.HasSuffix(path, cardinalityActiveSeriesPathSuffix):
parsed, err := cardinality.DecodeActiveSeriesRequestFromValues(values)
if err != nil {
return nil, err
}

return &genericQueryRequest{
cacheKey: parsed.String(),
cacheKeyPrefix: cardinalityActiveSeriesQueryCachePrefix,
return &GenericQueryCacheKey{
CacheKey: parsed.String(),
CacheKeyPrefix: cardinalityActiveSeriesQueryCachePrefix,
}, nil
default:
return nil, errors.New("unknown cardinality API endpoint")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestCardinalityQueryCache_RoundTrip_WithTenantFederation(t *testing.T) {
cacheBackend := cache.NewInstrumentedMockCache()
limits := multiTenantMockLimits{byTenant: testData.limits}

rt := newCardinalityQueryCacheRoundTripper(cacheBackend, limits, downstream, testutil.NewLogger(t), nil)
rt := newCardinalityQueryCacheRoundTripper(cacheBackend, DefaultCacheSplitter(0), limits, downstream, testutil.NewLogger(t), nil)
res, err := rt.RoundTrip(req)
require.NoError(t, err)

Expand Down
55 changes: 28 additions & 27 deletions pkg/frontend/querymiddleware/generic_query_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,40 @@ import (
"github.com/grafana/mimir/pkg/util/validation"
)

type genericQueryRequest struct {
// cacheKey is a full non-hashed representation of the request, used to uniquely identify
type GenericQueryCacheKey struct {
// CacheKey is a full non-hashed representation of the request, used to uniquely identify
// a request in the cache.
cacheKey string
CacheKey string

// cacheKeyPrefix returns the cache key prefix to use for this request.
cacheKeyPrefix string
// CacheKeyPrefix is a cache key prefix to use for this request.
CacheKeyPrefix string
}

type genericQueryDelegate interface {
// parseRequest parses the input request and returns a genericQueryRequest, or an error if parsing fails.
parseRequest(path string, values url.Values) (*genericQueryRequest, error)

// getTTL returns the cache TTL for the input userID.
getTTL(userID string) time.Duration
type tenantCacheTTL interface {
// ttl returns the cache TTL for the input userID.
ttl(userID string) time.Duration
}

type splittingFunc func(ctx context.Context, userID, path string, values url.Values) (*GenericQueryCacheKey, error)

// genericQueryCache is a http.RoundTripped wrapping the downstream with a generic HTTP response cache.
type genericQueryCache struct {
cache cache.Cache
delegate genericQueryDelegate
metrics *resultsCacheMetrics
next http.RoundTripper
logger log.Logger
cache cache.Cache
tenantTTL tenantCacheTTL
splitter splittingFunc
metrics *resultsCacheMetrics
next http.RoundTripper
logger log.Logger
}

func newGenericQueryCacheRoundTripper(cache cache.Cache, delegate genericQueryDelegate, next http.RoundTripper, logger log.Logger, metrics *resultsCacheMetrics) http.RoundTripper {
func newGenericQueryCacheRoundTripper(cache cache.Cache, splitter splittingFunc, tenantTTL tenantCacheTTL, next http.RoundTripper, logger log.Logger, metrics *resultsCacheMetrics) http.RoundTripper {
return &genericQueryCache{
cache: cache,
delegate: delegate,
metrics: metrics,
next: next,
logger: logger,
cache: cache,
tenantTTL: tenantTTL,
metrics: metrics,
splitter: splitter,
next: next,
logger: logger,
}
}

Expand All @@ -75,7 +76,7 @@ func (c *genericQueryCache) RoundTrip(req *http.Request) (*http.Response, error)

// Skip the cache if disabled for the tenant. We look at the minimum TTL so that we skip the cache
// if it's disabled for any of tenants.
cacheTTL := validation.MinDurationPerTenant(tenantIDs, c.delegate.getTTL)
cacheTTL := validation.MinDurationPerTenant(tenantIDs, c.tenantTTL.ttl)
if cacheTTL <= 0 {
spanLog.DebugLog("msg", "cache disabled for the tenant")
return c.next.RoundTrip(req)
Expand All @@ -89,7 +90,7 @@ func (c *genericQueryCache) RoundTrip(req *http.Request) (*http.Response, error)
return nil, apierror.New(apierror.TypeBadData, err.Error())
}

queryReq, err := c.delegate.parseRequest(req.URL.Path, reqValues)
queryReq, err := c.splitter(ctx, tenant.JoinTenantIDs(tenantIDs), req.URL.Path, reqValues)
if err != nil {
// Logging as info because it's not an actionable error here.
// We defer it to the downstream.
Expand Down Expand Up @@ -186,9 +187,9 @@ func (c *genericQueryCache) recordCacheStoreQueryDetails(ctx context.Context, to
}
}

func generateGenericQueryRequestCacheKey(tenantIDs []string, req *genericQueryRequest) (cacheKey, hashedCacheKey string) {
cacheKey = fmt.Sprintf("%s:%s", tenant.JoinTenantIDs(tenantIDs), req.cacheKey)
hashedCacheKey = fmt.Sprintf("%s%s", req.cacheKeyPrefix, cacheHashKey(cacheKey))
func generateGenericQueryRequestCacheKey(tenantIDs []string, req *GenericQueryCacheKey) (cacheKey, hashedCacheKey string) {
cacheKey = fmt.Sprintf("%s:%s", tenant.JoinTenantIDs(tenantIDs), req.CacheKey)
hashedCacheKey = fmt.Sprintf("%s%s", req.CacheKeyPrefix, cacheHashKey(cacheKey))
return
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/querymiddleware/generic_query_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/require"
)

type newGenericQueryCacheFunc func(cache cache.Cache, limits Limits, next http.RoundTripper, logger log.Logger, reg prometheus.Registerer) http.RoundTripper
type newGenericQueryCacheFunc func(cache cache.Cache, splitter CacheSplitter, limits Limits, next http.RoundTripper, logger log.Logger, reg prometheus.Registerer) http.RoundTripper

type testGenericQueryCacheRequestType struct {
reqPath string
Expand Down Expand Up @@ -226,7 +226,7 @@ func testGenericQueryCacheRoundTrip(t *testing.T, newRoundTripper newGenericQuer
initialStoreCallsCount := cacheBackend.CountStoreCalls()

reg := prometheus.NewPedanticRegistry()
rt := newRoundTripper(cacheBackend, limits, downstream, testutil.NewLogger(t), reg)
rt := newRoundTripper(cacheBackend, DefaultCacheSplitter(0), limits, downstream, testutil.NewLogger(t), reg)
res, err := rt.RoundTrip(req)
require.NoError(t, err)

Expand Down
19 changes: 10 additions & 9 deletions pkg/frontend/querymiddleware/labels_query_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package querymiddleware

import (
"context"
"fmt"
"net/http"
"net/url"
Expand All @@ -28,23 +29,23 @@ const (
stringParamSeparator = rune(0)
)

func newLabelsQueryCacheRoundTripper(cache cache.Cache, limits Limits, next http.RoundTripper, logger log.Logger, reg prometheus.Registerer) http.RoundTripper {
delegate := &labelsQueryCache{
func newLabelsQueryCacheRoundTripper(cache cache.Cache, cacheSplitter CacheSplitter, limits Limits, next http.RoundTripper, logger log.Logger, reg prometheus.Registerer) http.RoundTripper {
ttl := &labelsQueryTTL{
limits: limits,
}

return newGenericQueryCacheRoundTripper(cache, delegate, next, logger, newResultsCacheMetrics("label_names_and_values", reg))
return newGenericQueryCacheRoundTripper(cache, cacheSplitter.GenerateLabelValuesCacheKey, ttl, next, logger, newResultsCacheMetrics("label_names_and_values", reg))
}

type labelsQueryCache struct {
type labelsQueryTTL struct {
limits Limits
}

func (c *labelsQueryCache) getTTL(userID string) time.Duration {
func (c *labelsQueryTTL) ttl(userID string) time.Duration {
return c.limits.ResultsCacheTTLForLabelsQuery(userID)
}

func (c *labelsQueryCache) parseRequest(path string, values url.Values) (*genericQueryRequest, error) {
func (DefaultCacheSplitter) GenerateLabelValuesCacheKey(ctx context.Context, userID, path string, values url.Values) (*GenericQueryCacheKey, error) {
var (
cacheKeyPrefix string
labelName string
Expand Down Expand Up @@ -78,9 +79,9 @@ func (c *labelsQueryCache) parseRequest(path string, values url.Values) (*generi
return nil, err
}

return &genericQueryRequest{
cacheKey: generateLabelsQueryRequestCacheKey(startTime, endTime, labelName, matcherSets),
cacheKeyPrefix: cacheKeyPrefix,
return &GenericQueryCacheKey{
CacheKey: generateLabelsQueryRequestCacheKey(startTime, endTime, labelName, matcherSets),
CacheKeyPrefix: cacheKeyPrefix,
}, nil
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/frontend/querymiddleware/labels_query_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package querymiddleware

import (
"context"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -33,7 +34,7 @@ func TestLabelsQueryCache_RoundTrip(t *testing.T) {
})
}

func TestLabelsQueryCache_parseRequest(t *testing.T) {
func TestDefaultCacheSplitter_LabelValuesCacheKey(t *testing.T) {
const labelName = "test"

tests := map[string]struct {
Expand Down Expand Up @@ -143,16 +144,16 @@ func TestLabelsQueryCache_parseRequest(t *testing.T) {
t.Run(testName, func(t *testing.T) {
for requestTypeName, requestTypeData := range requestTypes {
t.Run(requestTypeName, func(t *testing.T) {
c := &labelsQueryCache{}
actual, err := c.parseRequest(requestTypeData.requestPath, testData.params)
c := DefaultCacheSplitter(0)
actual, err := c.GenerateLabelValuesCacheKey(context.Background(), "user-1", requestTypeData.requestPath, testData.params)
require.NoError(t, err)

assert.Equal(t, requestTypeData.expectedCacheKeyPrefix, actual.cacheKeyPrefix)
assert.Equal(t, requestTypeData.expectedCacheKeyPrefix, actual.CacheKeyPrefix)

if requestTypeData.expectedCacheKeyWithLabelName {
assert.Equal(t, testData.expectedCacheKeyWithLabelName, actual.cacheKey)
assert.Equal(t, testData.expectedCacheKeyWithLabelName, actual.CacheKey)
} else {
assert.Equal(t, testData.expectedCacheKeyWithoutLabelName, actual.cacheKey)
assert.Equal(t, testData.expectedCacheKeyWithoutLabelName, actual.CacheKey)
}
})
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/frontend/querymiddleware/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"flag"
"fmt"
"hash/fnv"
"net/url"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -183,13 +184,15 @@ func (PrometheusResponseExtractor) ResponseWithoutHeaders(resp Response) Respons
// consumers who wish to implement their own strategies.
type CacheSplitter interface {
GenerateCacheKey(ctx context.Context, userID string, r Request) string
GenerateLabelValuesCacheKey(ctx context.Context, userID, path string, values url.Values) (*GenericQueryCacheKey, error)
GenerateLabelValuesCardinalityCacheKey(ctx context.Context, userID, path string, values url.Values) (*GenericQueryCacheKey, error)
Copy link
Contributor Author

@dimitarvdimitrov dimitarvdimitrov Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the Generate prefix unnecessary and repetitive. I went with it for consistency, but I would rather drop it. If reviewers agree, I can introduce new methods as LabelValuesCacheKey and LabelValuesCardinalityCacheKey and open a follow-up PR to rename GenerateCacheKey to CacheKey

Also: should we still call it CacheSplitter if it generates cache keys, and doesn't actually split anything?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the Generate prefix unnecessary and repetitive

I don't feel strongly about it if you want to change it.

Also: should we still call it CacheSplitter if it generates cache keys, and doesn't actually split anything?

I definitely think this should change.

}

// ConstSplitter is a utility for using a constant split interval when determining cache keys
type ConstSplitter time.Duration
// DefaultCacheSplitter is a utility for using a constant split interval when determining cache keys
type DefaultCacheSplitter time.Duration

// GenerateCacheKey generates a cache key based on the userID, Request and interval.
func (t ConstSplitter) GenerateCacheKey(_ context.Context, userID string, r Request) string {
func (t DefaultCacheSplitter) GenerateCacheKey(_ context.Context, userID string, r Request) string {
startInterval := r.GetStart() / time.Duration(t).Milliseconds()
stepOffset := r.GetStart() % r.GetStep()

Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func TestConstSplitter_generateCacheKey(t *testing.T) {
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%s - %s", tt.name, tt.interval), func(t *testing.T) {
if got := ConstSplitter(tt.interval).GenerateCacheKey(ctx, "fake", tt.r); got != tt.want {
if got := DefaultCacheSplitter(tt.interval).GenerateCacheKey(ctx, "fake", tt.r); got != tt.want {
t.Errorf("generateKey() = %v, want %v", got, tt.want)
}
})
Expand Down
18 changes: 9 additions & 9 deletions pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Config struct {
TargetSeriesPerShard uint64 `yaml:"query_sharding_target_series_per_shard" category:"advanced"`

// CacheSplitter allows to inject a CacheSplitter to use for generating cache keys.
// If nil, the querymiddleware package uses a ConstSplitter with SplitQueriesByInterval.
// If nil, the querymiddleware package uses a DefaultCacheSplitter with SplitQueriesByInterval.
CacheSplitter CacheSplitter `yaml:"-"`

QueryResultResponseFormat string `yaml:"query_result_response_format"`
Expand Down Expand Up @@ -234,25 +234,25 @@ func newQueryTripperware(
c = cache.NewCompression(cfg.ResultsCacheConfig.Compression, c, log)
}

cacheSplitter := cfg.CacheSplitter
if cacheSplitter == nil {
cacheSplitter = DefaultCacheSplitter(cfg.SplitQueriesByInterval)
}

// Inject the middleware to split requests by interval + results cache (if at least one of the two is enabled).
if cfg.SplitQueriesByInterval > 0 || cfg.CacheResults {
shouldCache := func(r Request) bool {
return !r.GetOptions().CacheDisabled
}

splitter := cfg.CacheSplitter
if splitter == nil {
splitter = ConstSplitter(cfg.SplitQueriesByInterval)
}

queryRangeMiddleware = append(queryRangeMiddleware, newInstrumentMiddleware("split_by_interval_and_results_cache", metrics), newSplitAndCacheMiddleware(
cfg.SplitQueriesByInterval > 0,
cfg.CacheResults,
cfg.SplitQueriesByInterval,
limits,
codec,
c,
splitter,
cacheSplitter,
cacheExtractor,
shouldCache,
log,
Expand Down Expand Up @@ -327,8 +327,8 @@ func newQueryTripperware(

// Inject the cardinality and labels query cache roundtripper only if the query results cache is enabled.
if cfg.CacheResults {
cardinality = newCardinalityQueryCacheRoundTripper(c, limits, cardinality, log, registerer)
labels = newLabelsQueryCacheRoundTripper(c, limits, labels, log, registerer)
cardinality = newCardinalityQueryCacheRoundTripper(c, cacheSplitter, limits, cardinality, log, registerer)
labels = newLabelsQueryCacheRoundTripper(c, cacheSplitter, limits, labels, log, registerer)
}

return RoundTripFunc(func(r *http.Request) (*http.Response, error) {
Expand Down
Loading
Loading