diff --git a/pkg/api/api.go b/pkg/api/api.go index 24d1773e790..535ac66c288 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -34,6 +34,7 @@ import ( "github.com/grafana/mimir/pkg/ingester/client" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/querier" + querierapi "github.com/grafana/mimir/pkg/querier/api" "github.com/grafana/mimir/pkg/querier/tenantfederation" "github.com/grafana/mimir/pkg/ruler" "github.com/grafana/mimir/pkg/scheduler" @@ -158,6 +159,10 @@ func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth } func (a *API) newRoute(path string, handler http.Handler, isPrefix, auth, gzip bool, methods ...string) (route *mux.Route) { + // Propagate the consistency level on all HTTP routes. + // They are not used everywhere, but for consistency and less surprise it's added everywhere. + handler = querierapi.ConsistencyMiddleware().Wrap(handler) + if auth { handler = a.AuthMiddleware.Wrap(handler) } diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index d3d03a1620c..8bb41337366 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -32,6 +32,7 @@ import ( v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/grafana/mimir/pkg/querier" + querierapi "github.com/grafana/mimir/pkg/querier/api" "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/usagestats" "github.com/grafana/mimir/pkg/util" @@ -282,6 +283,8 @@ func NewQuerierHandler( InflightRequests: inflightRequests, } router.Use(instrumentMiddleware.Wrap) + // Since we don't use the regular RegisterQueryAPI, we need to add the consistency middleware manually. + router.Use(querierapi.ConsistencyMiddleware().Wrap) // Define the prefixes for all routes prefix := path.Join(cfg.ServerPrefix, cfg.PrometheusHTTPPrefix) diff --git a/pkg/frontend/querymiddleware/codec.go b/pkg/frontend/querymiddleware/codec.go index 70c02cb2c52..befbbe6fdf4 100644 --- a/pkg/frontend/querymiddleware/codec.go +++ b/pkg/frontend/querymiddleware/codec.go @@ -32,6 +32,7 @@ import ( apierror "github.com/grafana/mimir/pkg/api/error" "github.com/grafana/mimir/pkg/mimirpb" + "github.com/grafana/mimir/pkg/querier/api" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -401,6 +402,10 @@ func (c prometheusCodec) EncodeRequest(ctx context.Context, r Request) (*http.Re return nil, fmt.Errorf("unknown query result response format '%s'", c.preferredQueryResultResponseFormat) } + if consistency, ok := api.ReadConsistencyFromContext(ctx); ok { + req.Header.Add(api.ReadConsistencyHeader, consistency) + } + return req.WithContext(ctx), nil } diff --git a/pkg/frontend/querymiddleware/codec_test.go b/pkg/frontend/querymiddleware/codec_test.go index 33028a62ab1..963ef555b6e 100644 --- a/pkg/frontend/querymiddleware/codec_test.go +++ b/pkg/frontend/querymiddleware/codec_test.go @@ -30,6 +30,7 @@ import ( apierror "github.com/grafana/mimir/pkg/api/error" "github.com/grafana/mimir/pkg/mimirpb" + "github.com/grafana/mimir/pkg/querier/api" ) var ( @@ -128,6 +129,18 @@ func TestPrometheusCodec_EncodeRequest_AcceptHeader(t *testing.T) { } } +func TestPrometheusCodec_EncodeRequest_ReadConsistency(t *testing.T) { + for _, consistencyLevel := range api.ReadConsistencies { + t.Run(consistencyLevel, func(t *testing.T) { + codec := NewPrometheusCodec(prometheus.NewPedanticRegistry(), formatProtobuf) + ctx := api.ContextWithReadConsistency(context.Background(), consistencyLevel) + encodedRequest, err := codec.EncodeRequest(ctx, &PrometheusInstantQueryRequest{}) + require.NoError(t, err) + require.Equal(t, consistencyLevel, encodedRequest.Header.Get(api.ReadConsistencyHeader)) + }) + } +} + func TestPrometheusCodec_EncodeResponse_ContentNegotiation(t *testing.T) { testResponse := &PrometheusResponse{ Status: statusError, diff --git a/pkg/frontend/querymiddleware/model.proto b/pkg/frontend/querymiddleware/model.proto index 5c648deef93..effbb6201d3 100644 --- a/pkg/frontend/querymiddleware/model.proto +++ b/pkg/frontend/querymiddleware/model.proto @@ -138,4 +138,4 @@ message CachedHTTPResponse { message CachedHTTPHeader { string name = 1; string value = 2; -} \ No newline at end of file +} diff --git a/pkg/frontend/querymiddleware/stats.go b/pkg/frontend/querymiddleware/stats.go index edba2be92cc..1453045f191 100644 --- a/pkg/frontend/querymiddleware/stats.go +++ b/pkg/frontend/querymiddleware/stats.go @@ -7,6 +7,7 @@ import ( "errors" "time" + "github.com/grafana/dskit/tenant" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" @@ -14,6 +15,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/grafana/mimir/pkg/querier/api" "github.com/grafana/mimir/pkg/querier/stats" ) @@ -22,6 +24,7 @@ type queryStatsMiddleware struct { nonAlignedQueries prometheus.Counter regexpMatcherCount prometheus.Counter regexpMatcherOptimizedCount prometheus.Counter + consistencyCounter *prometheus.CounterVec next Handler } @@ -38,6 +41,10 @@ func newQueryStatsMiddleware(reg prometheus.Registerer, engine *promql.Engine) M Name: "cortex_query_frontend_regexp_matcher_optimized_count", Help: "Total number of optimized regexp matchers", }) + consistencyCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_frontend_queries_consistency_total", + Help: "Total number of queries that explicitly request a level of consistency.", + }, []string{"user", "consistency"}) return MiddlewareFunc(func(next Handler) Handler { return &queryStatsMiddleware{ @@ -45,6 +52,7 @@ func newQueryStatsMiddleware(reg prometheus.Registerer, engine *promql.Engine) M nonAlignedQueries: nonAlignedQueries, regexpMatcherCount: regexpMatcherCount, regexpMatcherOptimizedCount: regexpMatcherOptimizedCount, + consistencyCounter: consistencyCounter, next: next, } }) @@ -55,26 +63,32 @@ func (s queryStatsMiddleware) Do(ctx context.Context, req Request) (Response, er s.nonAlignedQueries.Inc() } - if expr, err := parser.ParseExpr(req.GetQuery()); err == nil { - for _, selectors := range parser.ExtractSelectors(expr) { - for _, matcher := range selectors { - if matcher.Type != labels.MatchRegexp && matcher.Type != labels.MatchNotRegexp { - continue - } - - s.regexpMatcherCount.Inc() - if matcher.IsRegexOptimized() { - s.regexpMatcherOptimizedCount.Inc() - } - } - } - } - + s.trackRegexpMatchers(req) + s.trackReadConsistency(ctx) s.populateQueryDetails(ctx, req) return s.next.Do(ctx, req) } +func (s queryStatsMiddleware) trackRegexpMatchers(req Request) { + expr, err := parser.ParseExpr(req.GetQuery()) + if err != nil { + return + } + for _, selectors := range parser.ExtractSelectors(expr) { + for _, matcher := range selectors { + if matcher.Type != labels.MatchRegexp && matcher.Type != labels.MatchNotRegexp { + continue + } + + s.regexpMatcherCount.Inc() + if matcher.IsRegexOptimized() { + s.regexpMatcherOptimizedCount.Inc() + } + } + } +} + var queryStatsErrQueryable = &storage.MockQueryable{MockQuerier: &storage.MockQuerier{SelectMockFunction: func(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { return storage.ErrSeriesSet(errors.New("cannot use query stats queryable for running queries")) }}} @@ -107,6 +121,17 @@ func (s queryStatsMiddleware) populateQueryDetails(ctx context.Context, req Requ } } +func (s queryStatsMiddleware) trackReadConsistency(ctx context.Context) { + consistency, ok := api.ReadConsistencyFromContext(ctx) + if !ok { + return + } + tenants, _ := tenant.TenantIDs(ctx) + for _, tenantID := range tenants { + s.consistencyCounter.WithLabelValues(tenantID, consistency).Inc() + } +} + type QueryDetails struct { QuerierStats *stats.Stats diff --git a/pkg/frontend/querymiddleware/stats_test.go b/pkg/frontend/querymiddleware/stats_test.go index eafae213fe1..86a185a1e30 100644 --- a/pkg/frontend/querymiddleware/stats_test.go +++ b/pkg/frontend/querymiddleware/stats_test.go @@ -14,12 +14,15 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + querierapi "github.com/grafana/mimir/pkg/querier/api" querier_stats "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/util" ) func Test_queryStatsMiddleware_Do(t *testing.T) { + const tenantID = "test" type args struct { + ctx context.Context req Request } tests := []struct { @@ -90,13 +93,55 @@ func Test_queryStatsMiddleware_Do(t *testing.T) { Step: step, }, }, + { + name: "explicit consistency", + args: args{ + ctx: querierapi.ContextWithReadConsistency(context.Background(), querierapi.ReadConsistencyStrong), + req: &PrometheusRangeQueryRequest{ + Path: "/query_range", + Start: util.TimeToMillis(start), + End: util.TimeToMillis(end), + Step: step.Milliseconds(), + Query: `sum(sum_over_time(metric{app="test",namespace=~"short"}[5m]))`, + }, + }, + expectedMetrics: strings.NewReader(` + # HELP cortex_query_frontend_non_step_aligned_queries_total Total queries sent that are not step aligned. + # TYPE cortex_query_frontend_non_step_aligned_queries_total counter + cortex_query_frontend_non_step_aligned_queries_total 1 + # HELP cortex_query_frontend_regexp_matcher_count Total number of regexp matchers + # TYPE cortex_query_frontend_regexp_matcher_count counter + cortex_query_frontend_regexp_matcher_count 1 + # HELP cortex_query_frontend_regexp_matcher_optimized_count Total number of optimized regexp matchers + # TYPE cortex_query_frontend_regexp_matcher_optimized_count counter + cortex_query_frontend_regexp_matcher_optimized_count 1 + # HELP cortex_query_frontend_queries_consistency_total Total number of queries that explicitly request a level of consistency. + # TYPE cortex_query_frontend_queries_consistency_total counter + cortex_query_frontend_queries_consistency_total{consistency="strong",user="test"} 1 + `), + expectedQueryDetails: QueryDetails{ + QuerierStats: &querier_stats.Stats{}, + Start: start.Truncate(time.Millisecond), + End: end.Truncate(time.Millisecond), + MinT: start.Truncate(time.Millisecond).Add(-5 * time.Minute), + MaxT: end.Truncate(time.Millisecond), + Step: step, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() mw := newQueryStatsMiddleware(reg, newEngine()) - actualDetails, ctx := ContextWithEmptyDetails(context.Background()) - _, err := mw.Wrap(mockHandlerWith(nil, nil)).Do(user.InjectOrgID(ctx, "test"), tt.args.req) + ctx := context.Background() + if tt.args.ctx != nil { + ctx = tt.args.ctx + } + actualDetails, ctx := ContextWithEmptyDetails(ctx) + ctx = user.InjectOrgID(ctx, tenantID) + + _, err := mw.Wrap(mockHandlerWith(nil, nil)).Do(ctx, tt.args.req) + require.NoError(t, err) assert.NoError(t, testutil.GatherAndCompare(reg, tt.expectedMetrics)) assert.Equal(t, tt.expectedQueryDetails, *actualDetails) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 87b2503afb6..be52e38c031 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -28,6 +28,7 @@ import ( apierror "github.com/grafana/mimir/pkg/api/error" "github.com/grafana/mimir/pkg/frontend/querymiddleware" + querierapi "github.com/grafana/mimir/pkg/querier/api" querier_stats "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/activitytracker" @@ -300,6 +301,9 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer "results_cache_hit_bytes", details.ResultsCacheHitBytes, "results_cache_miss_bytes", details.ResultsCacheMissBytes, ) + if consistency, ok := querierapi.ReadConsistencyFromContext(r.Context()); ok { + logMessage = append(logMessage, "read_consistency", consistency) + } } if len(f.cfg.LogQueryRequestHeaders) != 0 { logMessage = append(logMessage, formatRequestHeaders(&r.Header, f.cfg.LogQueryRequestHeaders)...) diff --git a/pkg/ingester/active_series.go b/pkg/ingester/active_series.go index a4004562601..f13f296b3cf 100644 --- a/pkg/ingester/active_series.go +++ b/pkg/ingester/active_series.go @@ -44,6 +44,12 @@ func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream clie return fmt.Errorf("error parsing label matchers: %w", err) } + // Enforce read consistency before getting TSDB (covers the case the tenant's data has not been ingested + // in this ingester yet, but there's some to ingest in the backlog). + if err := i.enforceReadConsistency(ctx, userID); err != nil { + return err + } + db := i.getTSDB(userID) if db == nil { level.Debug(i.logger).Log("msg", "no TSDB for user", "userID", userID) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index e0e1122c87c..27467b28a2f 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/mimir/pkg/mimirpb" + querierapi "github.com/grafana/mimir/pkg/querier/api" ) // HealthAndIngesterClient is the union of IngesterClient and grpc_health_v1.HealthClient. @@ -43,6 +44,8 @@ func MakeIngesterClient(inst ring.InstanceDesc, cfg Config, metrics *Metrics, lo if cfg.CircuitBreaker.Enabled { unary = append([]grpc.UnaryClientInterceptor{NewCircuitBreaker(inst, cfg.CircuitBreaker, metrics, logger)}, unary...) } + unary = append(unary, querierapi.ReadConsistencyClientUnaryInterceptor) + stream = append(stream, querierapi.ReadConsistencyClientStreamInterceptor) dialOpts, err := cfg.GRPCClientConfig.DialOption(unary, stream) if err != nil { diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index dd84a25c40b..5ca2df45c58 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -3563,7 +3563,13 @@ func (i *Ingester) enforceReadConsistency(ctx context.Context, tenantID string) return nil } - if i.limits.IngestStorageReadConsistency(tenantID) != api.ReadConsistencyStrong { + var cLevel string + if c, ok := api.ReadConsistencyFromContext(ctx); ok { + cLevel = c + } else { + cLevel = i.limits.IngestStorageReadConsistency(tenantID) + } + if cLevel == api.ReadConsistencyEventual { return nil } diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 92623010217..30102bc2eb9 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -56,6 +56,7 @@ import ( "github.com/grafana/mimir/pkg/ingester/client" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/querier" + querierapi "github.com/grafana/mimir/pkg/querier/api" "github.com/grafana/mimir/pkg/querier/tenantfederation" querier_worker "github.com/grafana/mimir/pkg/querier/worker" "github.com/grafana/mimir/pkg/ruler" @@ -727,6 +728,8 @@ func New(cfg Config, reg prometheus.Registerer) (*Mimir, error) { if cfg.TenantFederation.Enabled && cfg.Ruler.TenantFederation.Enabled { util_log.WarnExperimentalUse("ruler.tenant-federation") } + cfg.Server.GRPCMiddleware = append(cfg.Server.GRPCMiddleware, querierapi.ReadConsistencyServerUnaryInterceptor) + cfg.Server.GRPCStreamMiddleware = append(cfg.Server.GRPCStreamMiddleware, querierapi.ReadConsistencyServerStreamInterceptor) cfg.API.HTTPAuthMiddleware = noauth.SetupAuthMiddleware( &cfg.Server, diff --git a/pkg/querier/api/consistency.go b/pkg/querier/api/consistency.go index ad1575875a8..ece184d0fa0 100644 --- a/pkg/querier/api/consistency.go +++ b/pkg/querier/api/consistency.go @@ -2,7 +2,19 @@ package api +import ( + "context" + "net/http" + "slices" + + "github.com/grafana/dskit/middleware" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + const ( + ReadConsistencyHeader = "X-Read-Consistency" + // ReadConsistencyStrong means that a query sent by the same client will always observe the writes // that have completed before issuing the query. ReadConsistencyStrong = "strong" @@ -13,3 +25,84 @@ const ( ) var ReadConsistencies = []string{ReadConsistencyStrong, ReadConsistencyEventual} + +func IsValidReadConsistency(lvl string) bool { + return slices.Contains(ReadConsistencies, lvl) +} + +type contextKey int + +const consistencyContextKey contextKey = 1 + +// ContextWithReadConsistency returns a new context with the given consistency level. +// The consistency level can be retrieved with ReadConsistencyFromContext. +func ContextWithReadConsistency(parent context.Context, level string) context.Context { + return context.WithValue(parent, consistencyContextKey, level) +} + +// ReadConsistencyFromContext returns the consistency level from the context if set via ContextWithReadConsistency. +// The second return value is true if the consistency level was found in the context and is valid. +func ReadConsistencyFromContext(ctx context.Context) (string, bool) { + level, _ := ctx.Value(consistencyContextKey).(string) + return level, IsValidReadConsistency(level) +} + +// ConsistencyMiddleware takes the consistency level from the X-Read-Consistency header and sets it in the context. +// It can be retrieved with ReadConsistencyFromContext. +func ConsistencyMiddleware() middleware.Interface { + return middleware.Func(func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if c := r.Header.Get(ReadConsistencyHeader); IsValidReadConsistency(c) { + r = r.WithContext(ContextWithReadConsistency(r.Context(), c)) + } + next.ServeHTTP(w, r) + }) + }) +} + +const consistencyLevelGrpcMdKey = "__consistency_level__" + +func ReadConsistencyClientUnaryInterceptor(ctx context.Context, method string, req any, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + if c, ok := ReadConsistencyFromContext(ctx); ok { + ctx = metadata.AppendToOutgoingContext(ctx, consistencyLevelGrpcMdKey, c) + } + return invoker(ctx, method, req, reply, cc, opts...) +} + +func ReadConsistencyServerUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + md, _ := metadata.FromIncomingContext(ctx) + consistencies := md.Get(consistencyLevelGrpcMdKey) + if len(consistencies) > 0 && IsValidReadConsistency(consistencies[0]) { + ctx = ContextWithReadConsistency(ctx, consistencies[0]) + } + return handler(ctx, req) +} + +func ReadConsistencyClientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + if c, ok := ReadConsistencyFromContext(ctx); ok { + ctx = metadata.AppendToOutgoingContext(ctx, consistencyLevelGrpcMdKey, c) + } + return streamer(ctx, desc, cc, method, opts...) +} + +func ReadConsistencyServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + md, _ := metadata.FromIncomingContext(ss.Context()) + consistencies := md.Get(consistencyLevelGrpcMdKey) + if len(consistencies) > 0 && IsValidReadConsistency(consistencies[0]) { + ctx := ContextWithReadConsistency(ss.Context(), consistencies[0]) + ss = ctxStream{ + ctx: ctx, + ServerStream: ss, + } + } + return handler(srv, ss) +} + +type ctxStream struct { + ctx context.Context + grpc.ServerStream +} + +func (ss ctxStream) Context() context.Context { + return ss.ctx +}