Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest storage: per-query X-Read-Consistency HTTP header #7091

Merged
merged 15 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier"
querierapi "github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/querier/tenantfederation"
"github.com/grafana/mimir/pkg/ruler"
"github.com/grafana/mimir/pkg/scheduler"
Expand Down Expand Up @@ -160,6 +161,10 @@ func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth
func (a *API) newRoute(path string, handler http.Handler, isPrefix, auth, gzip bool, methods ...string) (route *mux.Route) {
if auth {
handler = a.AuthMiddleware.Wrap(handler)

// Assuming that we only need consistency controls when the request is for a specific tenant.
// Not all requests that require a tenant also support consistency, but it doesn't hurt if we propagate it anyway.
handler = querierapi.ConsistencyMiddleware().Wrap(handler)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my comment is the opposite. Why not just always inject it? Also look at who else reads AuthMiddleware cause I think should inject it also when directly querying queriers for consistency.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just always inject it?

I mean, why only when auth is used? We could just always enable it. Then the fact many APIs currently don't support it is a separate discussion, but for consistency we should always enable it.

Alternatively, we could move to the other side of the specturum and just enable it for querier's API, so only for routes registered by RegisterQueryAPI().

I don't feel strong about this comment, but it still looks weird to bundle it to auth, given has nothing to do with auth.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the review. Can you check out how dd76194 looks like?

}
if gzip {
handler = gziphandler.GzipHandler(handler)
Expand Down
5 changes: 5 additions & 0 deletions pkg/frontend/querymiddleware/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -401,6 +402,10 @@ func (c prometheusCodec) EncodeRequest(ctx context.Context, r Request) (*http.Re
return nil, fmt.Errorf("unknown query result response format '%s'", c.preferredQueryResultResponseFormat)
}

if consistency, ok := api.ReadConsistencyFromContext(ctx); ok {
req.Header.Add(api.ReadConsistencyHeader, consistency)
}

return req.WithContext(ctx), nil
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/frontend/querymiddleware/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/api"
)

var (
Expand Down Expand Up @@ -128,6 +129,18 @@ func TestPrometheusCodec_EncodeRequest_AcceptHeader(t *testing.T) {
}
}

func TestPrometheusCodec_EncodeRequest_ReadConsistency(t *testing.T) {
for _, consistencyLevel := range api.ReadConsistencies {
t.Run(consistencyLevel, func(t *testing.T) {
codec := NewPrometheusCodec(prometheus.NewPedanticRegistry(), formatProtobuf)
ctx := api.ContextWithReadConsistency(context.Background(), consistencyLevel)
encodedRequest, err := codec.EncodeRequest(ctx, &PrometheusInstantQueryRequest{})
require.NoError(t, err)
require.Equal(t, consistencyLevel, encodedRequest.Header.Get(api.ReadConsistencyHeader))
})
}
}

func TestPrometheusCodec_EncodeResponse_ContentNegotiation(t *testing.T) {
testResponse := &PrometheusResponse{
Status: statusError,
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/model.proto
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,4 @@ message CachedHTTPResponse {
message CachedHTTPHeader {
string name = 1;
string value = 2;
}
}
55 changes: 40 additions & 15 deletions pkg/frontend/querymiddleware/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"errors"
"time"

"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"

"github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/querier/stats"
)

Expand All @@ -22,6 +24,7 @@ type queryStatsMiddleware struct {
nonAlignedQueries prometheus.Counter
regexpMatcherCount prometheus.Counter
regexpMatcherOptimizedCount prometheus.Counter
consistencyCounter *prometheus.CounterVec
next Handler
}

Expand All @@ -38,13 +41,18 @@ func newQueryStatsMiddleware(reg prometheus.Registerer, engine *promql.Engine) M
Name: "cortex_query_frontend_regexp_matcher_optimized_count",
Help: "Total number of optimized regexp matchers",
})
consistencyCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_queries_consistency_total",
Help: "Total number of queries that explicitly request a level of consistency.",
}, []string{"user", "consistency"})

return MiddlewareFunc(func(next Handler) Handler {
return &queryStatsMiddleware{
engine: engine,
nonAlignedQueries: nonAlignedQueries,
regexpMatcherCount: regexpMatcherCount,
regexpMatcherOptimizedCount: regexpMatcherOptimizedCount,
consistencyCounter: consistencyCounter,
next: next,
}
})
Expand All @@ -55,26 +63,32 @@ func (s queryStatsMiddleware) Do(ctx context.Context, req Request) (Response, er
s.nonAlignedQueries.Inc()
}

if expr, err := parser.ParseExpr(req.GetQuery()); err == nil {
for _, selectors := range parser.ExtractSelectors(expr) {
for _, matcher := range selectors {
if matcher.Type != labels.MatchRegexp && matcher.Type != labels.MatchNotRegexp {
continue
}

s.regexpMatcherCount.Inc()
if matcher.IsRegexOptimized() {
s.regexpMatcherOptimizedCount.Inc()
}
}
}
}

s.regexpCounters(req)
s.consistencyCounters(ctx)
s.populateQueryDetails(ctx, req)

return s.next.Do(ctx, req)
}

func (s queryStatsMiddleware) regexpCounters(req Request) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Have you considered calling it track....() and consistencyCounters -> track...() too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean something like trackReadConsistency and trackRegexpMatchers? 👍 done in d98612e

expr, err := parser.ParseExpr(req.GetQuery())
if err != nil {
return
}
for _, selectors := range parser.ExtractSelectors(expr) {
for _, matcher := range selectors {
if matcher.Type != labels.MatchRegexp && matcher.Type != labels.MatchNotRegexp {
continue
}

s.regexpMatcherCount.Inc()
if matcher.IsRegexOptimized() {
s.regexpMatcherOptimizedCount.Inc()
}
}
}
}

var queryStatsErrQueryable = &storage.MockQueryable{MockQuerier: &storage.MockQuerier{SelectMockFunction: func(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
return storage.ErrSeriesSet(errors.New("cannot use query stats queryable for running queries"))
}}}
Expand Down Expand Up @@ -107,6 +121,17 @@ func (s queryStatsMiddleware) populateQueryDetails(ctx context.Context, req Requ
}
}

func (s queryStatsMiddleware) consistencyCounters(ctx context.Context) {
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
consistency, ok := api.ReadConsistencyFromContext(ctx)
if !ok {
return
}
tenants, _ := tenant.TenantIDs(ctx)
for _, tenantID := range tenants {
s.consistencyCounter.WithLabelValues(tenantID, consistency).Inc()
}
}

type QueryDetails struct {
QuerierStats *stats.Stats

Expand Down
4 changes: 4 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/frontend/querymiddleware"
querierapi "github.com/grafana/mimir/pkg/querier/api"
querier_stats "github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/activitytracker"
Expand Down Expand Up @@ -300,6 +301,9 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
"results_cache_hit_bytes", details.ResultsCacheHitBytes,
"results_cache_miss_bytes", details.ResultsCacheMissBytes,
)
if consistency, ok := querierapi.ReadConsistencyFromContext(r.Context()); ok {
logMessage = append(logMessage, "read_consistency", consistency)
}
}
if len(f.cfg.LogQueryRequestHeaders) != 0 {
logMessage = append(logMessage, formatRequestHeaders(&r.Header, f.cfg.LogQueryRequestHeaders)...)
Expand Down
6 changes: 6 additions & 0 deletions pkg/ingester/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream clie
return fmt.Errorf("error parsing label matchers: %w", err)
}

// Enforce read consistency before getting TSDB (covers the case the tenant's data has not been ingested
// in this ingester yet, but there's some to ingest in the backlog).
if err := i.enforceReadConsistency(ctx, userID); err != nil {
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
return err
}

db := i.getTSDB(userID)
if db == nil {
level.Debug(i.logger).Log("msg", "no TSDB for user", "userID", userID)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/mimir/pkg/mimirpb"
querierapi "github.com/grafana/mimir/pkg/querier/api"
)

// HealthAndIngesterClient is the union of IngesterClient and grpc_health_v1.HealthClient.
Expand All @@ -43,6 +44,8 @@ func MakeIngesterClient(inst ring.InstanceDesc, cfg Config, metrics *Metrics, lo
if cfg.CircuitBreaker.Enabled {
unary = append([]grpc.UnaryClientInterceptor{NewCircuitBreaker(inst, cfg.CircuitBreaker, metrics, logger)}, unary...)
}
unary = append(unary, querierapi.ReadConsistencyClientInterceptor)
stream = append(stream, querierapi.ReadConsistencyClientStreamInterceptor)

dialOpts, err := cfg.GRPCClientConfig.DialOption(unary, stream)
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3563,7 +3563,13 @@ func (i *Ingester) enforceReadConsistency(ctx context.Context, tenantID string)
return nil
}

if i.limits.IngestStorageReadConsistency(tenantID) != api.ReadConsistencyStrong {
var cLevel string
if c, ok := api.ReadConsistencyFromContext(ctx); ok {
cLevel = c
} else {
cLevel = i.limits.IngestStorageReadConsistency(tenantID)
}
if cLevel == api.ReadConsistencyEventual {
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier"
querierapi "github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/querier/tenantfederation"
querier_worker "github.com/grafana/mimir/pkg/querier/worker"
"github.com/grafana/mimir/pkg/ruler"
Expand Down Expand Up @@ -727,6 +728,8 @@ func New(cfg Config, reg prometheus.Registerer) (*Mimir, error) {
if cfg.TenantFederation.Enabled && cfg.Ruler.TenantFederation.Enabled {
util_log.WarnExperimentalUse("ruler.tenant-federation")
}
cfg.Server.GRPCMiddleware = append(cfg.Server.GRPCMiddleware, querierapi.ReadConsistencyServerInterceptor)
cfg.Server.GRPCStreamMiddleware = append(cfg.Server.GRPCStreamMiddleware, querierapi.ReadConsistencyServerStreamInterceptor)

cfg.API.HTTPAuthMiddleware = noauth.SetupAuthMiddleware(
&cfg.Server,
Expand Down
4 changes: 4 additions & 0 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/grafana/mimir/pkg/frontend/transport"
"github.com/grafana/mimir/pkg/ingester"
"github.com/grafana/mimir/pkg/querier"
querierapi "github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/querier/engine"
"github.com/grafana/mimir/pkg/querier/tenantfederation"
querier_worker "github.com/grafana/mimir/pkg/querier/worker"
Expand Down Expand Up @@ -587,6 +588,9 @@ func (t *Mimir) initQuerier() (serv services.Service, err error) {
// the external HTTP server. This will allow the querier to consolidate query metrics both external
// and internal using the default instrumentation when running as a standalone service.
internalQuerierRouter = t.Server.HTTPServer.Handler

// We need to propagate the consistency setting to upstream components regardless of how the querier is deployed.
internalQuerierRouter = querierapi.ConsistencyMiddleware().Wrap(internalQuerierRouter)
} else {
// Monolithic mode requires a query-frontend endpoint for the worker. If no frontend and scheduler endpoint
// is configured, Mimir will default to using frontend on localhost on it's own gRPC listening port.
Expand Down
93 changes: 93 additions & 0 deletions pkg/querier/api/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@

package api

import (
"context"
"net/http"
"slices"

"github.com/grafana/dskit/middleware"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

const (
ReadConsistencyHeader = "X-Read-Consistency"

// ReadConsistencyStrong means that a query sent by the same client will always observe the writes
// that have completed before issuing the query.
ReadConsistencyStrong = "strong"
Expand All @@ -13,3 +25,84 @@ const (
)

var ReadConsistencies = []string{ReadConsistencyStrong, ReadConsistencyEventual}

func IsValidReadConsistency(lvl string) bool {
return slices.Contains(ReadConsistencies, lvl)
}

type contextKey int

const consistencyContextKey contextKey = 1

// ContextWithReadConsistency returns a new context with the given consistency level.
// The consistency level can be retrieved with ReadConsistencyFromContext.
func ContextWithReadConsistency(parent context.Context, level string) context.Context {
return context.WithValue(parent, consistencyContextKey, level)
}

// ReadConsistencyFromContext returns the consistency level from the context if set via ContextWithReadConsistency.
// The second return value is true if the consistency level was found in the context and is valid.
func ReadConsistencyFromContext(ctx context.Context) (string, bool) {
level, _ := ctx.Value(consistencyContextKey).(string)
return level, IsValidReadConsistency(level)
}

// ConsistencyMiddleware takes the consistency level from the X-Read-Consistency header and sets it in the context.
// It can be retrieved with ReadConsistencyFromContext.
func ConsistencyMiddleware() middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if c := r.Header.Get(ReadConsistencyHeader); IsValidReadConsistency(c) {
r = r.WithContext(ContextWithReadConsistency(r.Context(), c))
}
next.ServeHTTP(w, r)
})
})
}

const consistencyLevelGrpcMdKey = "__consistency_level__"

func ReadConsistencyClientInterceptor(ctx context.Context, method string, req any, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
if c, ok := ReadConsistencyFromContext(ctx); ok {
ctx = metadata.AppendToOutgoingContext(ctx, consistencyLevelGrpcMdKey, c)
}
return invoker(ctx, method, req, reply, cc, opts...)
}

func ReadConsistencyServerInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
md, _ := metadata.FromIncomingContext(ctx)
consistencies := md.Get(consistencyLevelGrpcMdKey)
if len(consistencies) > 0 && IsValidReadConsistency(consistencies[0]) {
ctx = ContextWithReadConsistency(ctx, consistencies[0])
}
return handler(ctx, req)
}

func ReadConsistencyClientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
if c, ok := ReadConsistencyFromContext(ctx); ok {
ctx = metadata.AppendToOutgoingContext(ctx, consistencyLevelGrpcMdKey, c)
}
return streamer(ctx, desc, cc, method, opts...)
}

func ReadConsistencyServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, _ := metadata.FromIncomingContext(ss.Context())
consistencies := md.Get(consistencyLevelGrpcMdKey)
if len(consistencies) > 0 && IsValidReadConsistency(consistencies[0]) {
ctx := ContextWithReadConsistency(ss.Context(), consistencies[0])
ss = ctxStream{
ctx: ctx,
ServerStream: ss,
}
}
return handler(srv, ss)
}

type ctxStream struct {
ctx context.Context
grpc.ServerStream
}

func (ss ctxStream) Context() context.Context {
return ss.ctx
}
1 change: 1 addition & 0 deletions pkg/querier/worker/frontend_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H
stats, ctx = querier_stats.ContextWithEmptyStats(ctx)
stats.AddQueueTime(queueTime)
}
//ctx = contexWithConsistencyLevel(ctx, request.GetHeaders())
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved

response, err := fp.handler.Handle(ctx, request)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/worker/scheduler_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (sp *schedulerProcessor) querierLoop(execCtx context.Context, c schedulerpb

// We need to inject user into context for sending response back.
ctx = user.InjectOrgID(ctx, request.UserID)
//ctx = contexWithConsistencyLevel(ctx, request.GetHttpRequest().GetHeaders())
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved

tracer := opentracing.GlobalTracer()
// Ignore errors here. If we cannot get parent span, we just don't create new one.
Expand Down
Loading