Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cortex_frontend_query_result_cache_requests_total and cortex_frontend_query_result_cache_hits_total metrics #5235

Merged
merged 1 commit into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
* [CHANGE] Store-gateway: skip verifying index header integrity upon loading. To enable verification set `blocks_storage.bucket_store.index_header.verify_on_load: true`.
* [CHANGE] Querier: change the default value of the experimental `-querier.streaming-chunks-per-ingester-buffer-size` flag to 256. #5203
* [FEATURE] Cardinality API: Add a new `count_method` parameter which enables counting active series #5136
* [FEATURE] Query-frontend: added experimental support to cache cardinality query responses. The cache will be used when `-query-frontend.cache-results` is enabled and `-query-frontend.results-cache-ttl-for-cardinality-query` set to a value greater than 0. #5212
* [FEATURE] Query-frontend: added experimental support to cache cardinality query responses. The cache will be used when `-query-frontend.cache-results` is enabled and `-query-frontend.results-cache-ttl-for-cardinality-query` set to a value greater than 0. The following metrics have been added to track the query results cache hit ratio per `request_type`: #5212 #5235
* `cortex_frontend_query_result_cache_requests_total{request_type="query_range|cardinality"}`
* `cortex_frontend_query_result_cache_hits_total{request_type="query_range|cardinality"}`
* [ENHANCEMENT] Cardinality API: When zone aware replication is enabled, the label values cardinality API can now tolerate single zone failure #5178
* [ENHANCEMENT] Distributor: optimize sending requests to ingesters when incoming requests don't need to be modified. For now this feature can be disabled by setting `-timeseries-unmarshal-caching-optimization-enabled=false`. #5137
* [ENHANCEMENT] Add advanced CLI flags to control gRPC client behaviour: #5161
Expand Down
23 changes: 14 additions & 9 deletions pkg/frontend/querymiddleware/cardinality_query_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/cache"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/cardinality"
Expand All @@ -39,18 +40,20 @@ type cardinalityQueryRequest interface {
// cardinalityQueryCache is a http.RoundTripped wrapping the downstream with an HTTP response cache.
// This RoundTripper is used to add caching support to cardinality analysis API endpoints.
type cardinalityQueryCache struct {
cache cache.Cache
limits Limits
next http.RoundTripper
logger log.Logger
cache cache.Cache
limits Limits
metrics *resultsCacheMetrics
next http.RoundTripper
logger log.Logger
}

func newCardinalityQueryCacheRoundTripper(cache cache.Cache, limits Limits, next http.RoundTripper, logger log.Logger) http.RoundTripper {
func newCardinalityQueryCacheRoundTripper(cache cache.Cache, limits Limits, next http.RoundTripper, logger log.Logger, reg prometheus.Registerer) http.RoundTripper {
return &cardinalityQueryCache{
cache: cache,
limits: limits,
next: next,
logger: logger,
cache: cache,
limits: limits,
metrics: newResultsCacheMetrics("cardinality", reg),
next: next,
logger: logger,
}
}

Expand Down Expand Up @@ -92,9 +95,11 @@ func (c *cardinalityQueryCache) RoundTrip(req *http.Request) (*http.Response, er
}

// Lookup the cache.
c.metrics.cacheRequests.Inc()
cacheKey, hashedCacheKey := generateCardinalityQueryRequestCacheKey(tenantIDs, queryReq)
res := c.fetchCachedResponse(ctx, cacheKey, hashedCacheKey)
if res != nil {
c.metrics.cacheHits.Inc()
level.Debug(spanLog).Log("msg", "response fetched from the cache")
return res, nil
}
Expand Down
44 changes: 40 additions & 4 deletions pkg/frontend/querymiddleware/cardinality_query_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package querymiddleware
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
Expand All @@ -14,6 +15,8 @@ import (

"github.com/grafana/dskit/cache"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/util/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -47,6 +50,7 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
expectedHeader http.Header
expectedBody []byte
expectedDownstreamCalled bool
expectedLookupFromCache bool
expectedStoredToCache bool
}{
"should fetch the response from the downstream and store it the cache if the downstream request succeed": {
Expand All @@ -56,6 +60,7 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{content:"fresh"}`),
expectedDownstreamCalled: true,
expectedLookupFromCache: true,
expectedStoredToCache: true,
},
"should not store the response in the cache if disabled for the tenant": {
Expand All @@ -65,6 +70,7 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{content:"fresh"}`),
expectedDownstreamCalled: true,
expectedLookupFromCache: false,
expectedStoredToCache: false,
},
"should not store the response in the cache if disabled for the request": {
Expand All @@ -75,24 +81,27 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{content:"fresh"}`),
expectedDownstreamCalled: true,
expectedLookupFromCache: false,
expectedStoredToCache: false,
},
"should not store the response in the cache if the downstream returned a 4xx status code": {
cacheTTL: 0,
cacheTTL: time.Minute,
downstreamRes: downstreamRes(400, []byte(`{error:"400"}`)),
expectedStatusCode: 400,
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{error:"400"}`),
expectedDownstreamCalled: true,
expectedLookupFromCache: true,
expectedStoredToCache: false,
},
"should not store the response in the cache if the downstream returned a 5xx status code": {
cacheTTL: 0,
cacheTTL: time.Minute,
downstreamRes: downstreamRes(500, []byte(`{error:"500"}`)),
expectedStatusCode: 500,
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{error:"500"}`),
expectedDownstreamCalled: true,
expectedLookupFromCache: true,
expectedStoredToCache: false,
},
"should fetch the response from the cache if the cached response is not expired": {
Expand All @@ -109,6 +118,7 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{content:"cached"}`),
expectedDownstreamCalled: false, // Should not call the downstream.
expectedLookupFromCache: true,
expectedStoredToCache: false, // Should not store anything to the cache.
},
"should fetch the response from the downstream and overwrite the cached response if corrupted": {
Expand All @@ -121,6 +131,7 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{content:"fresh"}`),
expectedDownstreamCalled: true,
expectedLookupFromCache: true,
expectedStoredToCache: true,
},
"should fetch the response from the downstream and overwrite the cached response if a key collision was detected": {
Expand All @@ -137,6 +148,7 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{content:"fresh"}`),
expectedDownstreamCalled: true,
expectedLookupFromCache: true,
expectedStoredToCache: true,
},
}
Expand Down Expand Up @@ -187,7 +199,8 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
}
initialStoreCallsCount := cacheBackend.CountStoreCalls()

rt := newCardinalityQueryCacheRoundTripper(cacheBackend, limits, downstream, testutil.NewLogger(t))
reg := prometheus.NewPedanticRegistry()
rt := newCardinalityQueryCacheRoundTripper(cacheBackend, limits, downstream, testutil.NewLogger(t), reg)
res, err := rt.RoundTrip(req)
require.NoError(t, err)

Expand Down Expand Up @@ -220,6 +233,29 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
} else {
assert.Equal(t, initialStoreCallsCount, cacheBackend.CountStoreCalls())
}

// Assert on metrics.
expectedRequestsCount := 0
expectedHitsCount := 0
if testData.expectedLookupFromCache {
expectedRequestsCount = 1
if !testData.expectedDownstreamCalled {
expectedHitsCount = 1
}
}

assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_frontend_query_result_cache_requests_total Total number of requests (or partial requests) looked up in the results cache.
# TYPE cortex_frontend_query_result_cache_requests_total counter
cortex_frontend_query_result_cache_requests_total{request_type="cardinality"} %d

# HELP cortex_frontend_query_result_cache_hits_total Total number of requests (or partial requests) fetched from the results cache.
# TYPE cortex_frontend_query_result_cache_hits_total counter
cortex_frontend_query_result_cache_hits_total{request_type="cardinality"} %d
`, expectedRequestsCount, expectedHitsCount)),
"cortex_frontend_query_result_cache_requests_total",
"cortex_frontend_query_result_cache_hits_total",
))
})
}
})
Expand Down Expand Up @@ -279,7 +315,7 @@ func TestCardinalityQueryCache_RoundTrip_WithTenantFederation(t *testing.T) {
cacheBackend := cache.NewInstrumentedMockCache()
limits := multiTenantMockLimits{byTenant: testData.limits}

rt := newCardinalityQueryCacheRoundTripper(cacheBackend, limits, downstream, testutil.NewLogger(t))
rt := newCardinalityQueryCacheRoundTripper(cacheBackend, limits, downstream, testutil.NewLogger(t), nil)
res, err := rt.RoundTrip(req)
require.NoError(t, err)

Expand Down
21 changes: 21 additions & 0 deletions pkg/frontend/querymiddleware/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
Expand Down Expand Up @@ -93,6 +94,26 @@ func errUnsupportedResultsCacheBackend(backend string) error {
return fmt.Errorf("%w: %q, supported values: %v", errUnsupportedBackend, backend, supportedResultsCacheBackends)
}

type resultsCacheMetrics struct {
cacheRequests prometheus.Counter
cacheHits prometheus.Counter
}

func newResultsCacheMetrics(requestType string, reg prometheus.Registerer) *resultsCacheMetrics {
return &resultsCacheMetrics{
cacheRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_frontend_query_result_cache_requests_total",
Help: "Total number of requests (or partial requests) looked up in the results cache.",
ConstLabels: map[string]string{"request_type": requestType},
}),
cacheHits: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_frontend_query_result_cache_hits_total",
Help: "Total number of requests (or partial requests) fetched from the results cache.",
ConstLabels: map[string]string{"request_type": requestType},
}),
}
}

// newResultsCache creates a new results cache based on the input configuration.
func newResultsCache(cfg ResultsCacheConfig, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
// Add the "component" label similarly to other components, so that metrics don't clash and have the same labels set
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func newQueryTripperware(
// Inject the cardinality query cache roundtripper only if the query results cache is enabled.
cardinality := next
if cfg.CacheResults {
cardinality = newCardinalityQueryCacheRoundTripper(c, limits, next, log)
cardinality = newCardinalityQueryCacheRoundTripper(c, limits, next, log, registerer)
}

return RoundTripFunc(func(r *http.Request) (*http.Response, error) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/frontend/querymiddleware/split_and_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ var (
)

type splitAndCacheMiddlewareMetrics struct {
*resultsCacheMetrics

splitQueriesCount prometheus.Counter
queryResultCacheAttemptedCount prometheus.Counter
queryResultCacheSkippedCount *prometheus.CounterVec
}

func newSplitAndCacheMiddlewareMetrics(reg prometheus.Registerer) *splitAndCacheMiddlewareMetrics {
m := &splitAndCacheMiddlewareMetrics{
resultsCacheMetrics: newResultsCacheMetrics("query_range", reg),
splitQueriesCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_frontend_split_queries_total",
Help: "Total number of underlying query requests after the split by interval is applied.",
Expand Down Expand Up @@ -344,7 +347,10 @@ func (s *splitAndCacheMiddleware) fetchCacheExtents(ctx context.Context, now tim
spanLog.LogKV("key", key, "hashedKey", hashed)
}

// Lookup the cache.
s.metrics.cacheRequests.Add(float64(len(keys)))
founds := s.cache.Fetch(ctx, hashedKeys)
s.metrics.cacheHits.Add(float64(len(founds)))

// Decode all cached responses.
extents := make([][]Extent, len(keys))
Expand Down
47 changes: 46 additions & 1 deletion pkg/frontend/querymiddleware/split_and_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ func TestSplitAndCacheMiddleware_SplitByInterval(t *testing.T) {
# HELP cortex_frontend_split_queries_total Total number of underlying query requests after the split by interval is applied.
# TYPE cortex_frontend_split_queries_total counter
cortex_frontend_split_queries_total 4
# HELP cortex_frontend_query_result_cache_hits_total Total number of requests (or partial requests) fetched from the results cache.
# TYPE cortex_frontend_query_result_cache_hits_total counter
cortex_frontend_query_result_cache_hits_total{request_type="query_range"} 0
# HELP cortex_frontend_query_result_cache_requests_total Total number of requests (or partial requests) looked up in the results cache.
# TYPE cortex_frontend_query_result_cache_requests_total counter
cortex_frontend_query_result_cache_requests_total{request_type="query_range"} 0
`)))

// Assert query stats from context
Expand All @@ -231,6 +237,7 @@ func TestSplitAndCacheMiddleware_SplitByInterval(t *testing.T) {
func TestSplitAndCacheMiddleware_ResultsCache(t *testing.T) {
cacheBackend := cache.NewInstrumentedMockCache()

reg := prometheus.NewPedanticRegistry()
mw := newSplitAndCacheMiddleware(
true,
true,
Expand All @@ -243,7 +250,7 @@ func TestSplitAndCacheMiddleware_ResultsCache(t *testing.T) {
PrometheusResponseExtractor{},
resultsCacheAlwaysEnabled,
log.NewNopLogger(),
prometheus.NewPedanticRegistry(),
reg,
)

expectedResponse := &PrometheusResponse{
Expand Down Expand Up @@ -333,6 +340,31 @@ func TestSplitAndCacheMiddleware_ResultsCache(t *testing.T) {
// Assert query stats from context
queryStats = stats.FromContext(ctx)
assert.Equal(t, uint32(2), queryStats.LoadSplitQueries())

// Assert metrics
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_frontend_query_result_cache_attempted_total Total number of queries that were attempted to be fetched from cache.
# TYPE cortex_frontend_query_result_cache_attempted_total counter
cortex_frontend_query_result_cache_attempted_total 3

# HELP cortex_frontend_query_result_cache_skipped_total Total number of times a query was not cacheable because of a reason. This metric is tracked for each partial query when time-splitting is enabled.
# TYPE cortex_frontend_query_result_cache_skipped_total counter
cortex_frontend_query_result_cache_skipped_total{reason="has-modifiers"} 0
cortex_frontend_query_result_cache_skipped_total{reason="too-new"} 0
cortex_frontend_query_result_cache_skipped_total{reason="unaligned-time-range"} 0

# HELP cortex_frontend_split_queries_total Total number of underlying query requests after the split by interval is applied.
# TYPE cortex_frontend_split_queries_total counter
cortex_frontend_split_queries_total 3

# HELP cortex_frontend_query_result_cache_requests_total Total number of requests (or partial requests) looked up in the results cache.
# TYPE cortex_frontend_query_result_cache_requests_total counter
cortex_frontend_query_result_cache_requests_total{request_type="query_range"} 3

# HELP cortex_frontend_query_result_cache_hits_total Total number of requests (or partial requests) fetched from the results cache.
# TYPE cortex_frontend_query_result_cache_hits_total counter
cortex_frontend_query_result_cache_hits_total{request_type="query_range"} 2
`)))
}

func TestSplitAndCacheMiddleware_ResultsCache_ShouldNotLookupCacheIfStepIsNotAligned(t *testing.T) {
Expand Down Expand Up @@ -423,6 +455,7 @@ func TestSplitAndCacheMiddleware_ResultsCache_ShouldNotLookupCacheIfStepIsNotAli
// Assert query stats from context
queryStats := stats.FromContext(ctx)
assert.Equal(t, uint32(1), queryStats.LoadSplitQueries())

// Assert metrics
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_frontend_query_result_cache_attempted_total Total number of queries that were attempted to be fetched from cache.
Expand All @@ -436,6 +469,12 @@ func TestSplitAndCacheMiddleware_ResultsCache_ShouldNotLookupCacheIfStepIsNotAli
# HELP cortex_frontend_split_queries_total Total number of underlying query requests after the split by interval is applied.
# TYPE cortex_frontend_split_queries_total counter
cortex_frontend_split_queries_total 1
# HELP cortex_frontend_query_result_cache_hits_total Total number of requests (or partial requests) fetched from the results cache.
# TYPE cortex_frontend_query_result_cache_hits_total counter
cortex_frontend_query_result_cache_hits_total{request_type="query_range"} 0
# HELP cortex_frontend_query_result_cache_requests_total Total number of requests (or partial requests) looked up in the results cache.
# TYPE cortex_frontend_query_result_cache_requests_total counter
cortex_frontend_query_result_cache_requests_total{request_type="query_range"} 0
`)))
}

Expand Down Expand Up @@ -572,6 +611,12 @@ func TestSplitAndCacheMiddleware_ResultsCache_ShouldNotCacheRequestEarlierThanMa
# HELP cortex_frontend_split_queries_total Total number of underlying query requests after the split by interval is applied.
# TYPE cortex_frontend_split_queries_total counter
cortex_frontend_split_queries_total 0
# HELP cortex_frontend_query_result_cache_hits_total Total number of requests (or partial requests) fetched from the results cache.
# TYPE cortex_frontend_query_result_cache_hits_total counter
cortex_frontend_query_result_cache_hits_total{request_type="query_range"} 0
# HELP cortex_frontend_query_result_cache_requests_total Total number of requests (or partial requests) looked up in the results cache.
# TYPE cortex_frontend_query_result_cache_requests_total counter
cortex_frontend_query_result_cache_requests_total{request_type="query_range"} 0
`,
},
"should cache a response up until max cache freshness time ago": {
Expand Down