Skip to content

Commit

Permalink
ingest storage: per-query X-Read-Consistency HTTP header (#7091)
Browse files Browse the repository at this point in the history
* WIP

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Remove print statements

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Try without frontend/scheduler worker propagation

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* No explicit frontend/scheduler worker propagation

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Log query-frontend read consistency

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Revert "Log query-frontend read consistency"

This reverts commit 605a710.

* Remove unrelated changes

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Use slices instead of util

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Move consistency middleware to API.newRoute

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Move consistency middleware to API.newRoute

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Rename query stats middleware helper functions

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add test for read consistency stats

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Rename ReadConsistencyClientUnaryInterceptor

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Rename ReadConsistencyServerUnaryInterceptor

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Remove commented out lines

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

---------

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov committed Jan 11, 2024
1 parent bff750b commit a2b6d4e
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 19 deletions.
5 changes: 5 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier"
querierapi "github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/querier/tenantfederation"
"github.com/grafana/mimir/pkg/ruler"
"github.com/grafana/mimir/pkg/scheduler"
Expand Down Expand Up @@ -158,6 +159,10 @@ func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth
}

func (a *API) newRoute(path string, handler http.Handler, isPrefix, auth, gzip bool, methods ...string) (route *mux.Route) {
// Propagate the consistency level on all HTTP routes.
// They are not used everywhere, but for consistency and less surprise it's added everywhere.
handler = querierapi.ConsistencyMiddleware().Wrap(handler)

if auth {
handler = a.AuthMiddleware.Wrap(handler)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
v1 "github.com/prometheus/prometheus/web/api/v1"

"github.com/grafana/mimir/pkg/querier"
querierapi "github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/usagestats"
"github.com/grafana/mimir/pkg/util"
Expand Down Expand Up @@ -282,6 +283,8 @@ func NewQuerierHandler(
InflightRequests: inflightRequests,
}
router.Use(instrumentMiddleware.Wrap)
// Since we don't use the regular RegisterQueryAPI, we need to add the consistency middleware manually.
router.Use(querierapi.ConsistencyMiddleware().Wrap)

// Define the prefixes for all routes
prefix := path.Join(cfg.ServerPrefix, cfg.PrometheusHTTPPrefix)
Expand Down
5 changes: 5 additions & 0 deletions pkg/frontend/querymiddleware/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -401,6 +402,10 @@ func (c prometheusCodec) EncodeRequest(ctx context.Context, r Request) (*http.Re
return nil, fmt.Errorf("unknown query result response format '%s'", c.preferredQueryResultResponseFormat)
}

if consistency, ok := api.ReadConsistencyFromContext(ctx); ok {
req.Header.Add(api.ReadConsistencyHeader, consistency)
}

return req.WithContext(ctx), nil
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/frontend/querymiddleware/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/api"
)

var (
Expand Down Expand Up @@ -128,6 +129,18 @@ func TestPrometheusCodec_EncodeRequest_AcceptHeader(t *testing.T) {
}
}

func TestPrometheusCodec_EncodeRequest_ReadConsistency(t *testing.T) {
for _, consistencyLevel := range api.ReadConsistencies {
t.Run(consistencyLevel, func(t *testing.T) {
codec := NewPrometheusCodec(prometheus.NewPedanticRegistry(), formatProtobuf)
ctx := api.ContextWithReadConsistency(context.Background(), consistencyLevel)
encodedRequest, err := codec.EncodeRequest(ctx, &PrometheusInstantQueryRequest{})
require.NoError(t, err)
require.Equal(t, consistencyLevel, encodedRequest.Header.Get(api.ReadConsistencyHeader))
})
}
}

func TestPrometheusCodec_EncodeResponse_ContentNegotiation(t *testing.T) {
testResponse := &PrometheusResponse{
Status: statusError,
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/model.proto
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,4 @@ message CachedHTTPResponse {
message CachedHTTPHeader {
string name = 1;
string value = 2;
}
}
55 changes: 40 additions & 15 deletions pkg/frontend/querymiddleware/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"errors"
"time"

"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"

"github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/querier/stats"
)

Expand All @@ -22,6 +24,7 @@ type queryStatsMiddleware struct {
nonAlignedQueries prometheus.Counter
regexpMatcherCount prometheus.Counter
regexpMatcherOptimizedCount prometheus.Counter
consistencyCounter *prometheus.CounterVec
next Handler
}

Expand All @@ -38,13 +41,18 @@ func newQueryStatsMiddleware(reg prometheus.Registerer, engine *promql.Engine) M
Name: "cortex_query_frontend_regexp_matcher_optimized_count",
Help: "Total number of optimized regexp matchers",
})
consistencyCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_queries_consistency_total",
Help: "Total number of queries that explicitly request a level of consistency.",
}, []string{"user", "consistency"})

return MiddlewareFunc(func(next Handler) Handler {
return &queryStatsMiddleware{
engine: engine,
nonAlignedQueries: nonAlignedQueries,
regexpMatcherCount: regexpMatcherCount,
regexpMatcherOptimizedCount: regexpMatcherOptimizedCount,
consistencyCounter: consistencyCounter,
next: next,
}
})
Expand All @@ -55,26 +63,32 @@ func (s queryStatsMiddleware) Do(ctx context.Context, req Request) (Response, er
s.nonAlignedQueries.Inc()
}

if expr, err := parser.ParseExpr(req.GetQuery()); err == nil {
for _, selectors := range parser.ExtractSelectors(expr) {
for _, matcher := range selectors {
if matcher.Type != labels.MatchRegexp && matcher.Type != labels.MatchNotRegexp {
continue
}

s.regexpMatcherCount.Inc()
if matcher.IsRegexOptimized() {
s.regexpMatcherOptimizedCount.Inc()
}
}
}
}

s.trackRegexpMatchers(req)
s.trackReadConsistency(ctx)
s.populateQueryDetails(ctx, req)

return s.next.Do(ctx, req)
}

func (s queryStatsMiddleware) trackRegexpMatchers(req Request) {
expr, err := parser.ParseExpr(req.GetQuery())
if err != nil {
return
}
for _, selectors := range parser.ExtractSelectors(expr) {
for _, matcher := range selectors {
if matcher.Type != labels.MatchRegexp && matcher.Type != labels.MatchNotRegexp {
continue
}

s.regexpMatcherCount.Inc()
if matcher.IsRegexOptimized() {
s.regexpMatcherOptimizedCount.Inc()
}
}
}
}

var queryStatsErrQueryable = &storage.MockQueryable{MockQuerier: &storage.MockQuerier{SelectMockFunction: func(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
return storage.ErrSeriesSet(errors.New("cannot use query stats queryable for running queries"))
}}}
Expand Down Expand Up @@ -107,6 +121,17 @@ func (s queryStatsMiddleware) populateQueryDetails(ctx context.Context, req Requ
}
}

func (s queryStatsMiddleware) trackReadConsistency(ctx context.Context) {
consistency, ok := api.ReadConsistencyFromContext(ctx)
if !ok {
return
}
tenants, _ := tenant.TenantIDs(ctx)
for _, tenantID := range tenants {
s.consistencyCounter.WithLabelValues(tenantID, consistency).Inc()
}
}

type QueryDetails struct {
QuerierStats *stats.Stats

Expand Down
49 changes: 47 additions & 2 deletions pkg/frontend/querymiddleware/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

querierapi "github.com/grafana/mimir/pkg/querier/api"
querier_stats "github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/util"
)

func Test_queryStatsMiddleware_Do(t *testing.T) {
const tenantID = "test"
type args struct {
ctx context.Context
req Request
}
tests := []struct {
Expand Down Expand Up @@ -90,13 +93,55 @@ func Test_queryStatsMiddleware_Do(t *testing.T) {
Step: step,
},
},
{
name: "explicit consistency",
args: args{
ctx: querierapi.ContextWithReadConsistency(context.Background(), querierapi.ReadConsistencyStrong),
req: &PrometheusRangeQueryRequest{
Path: "/query_range",
Start: util.TimeToMillis(start),
End: util.TimeToMillis(end),
Step: step.Milliseconds(),
Query: `sum(sum_over_time(metric{app="test",namespace=~"short"}[5m]))`,
},
},
expectedMetrics: strings.NewReader(`
# HELP cortex_query_frontend_non_step_aligned_queries_total Total queries sent that are not step aligned.
# TYPE cortex_query_frontend_non_step_aligned_queries_total counter
cortex_query_frontend_non_step_aligned_queries_total 1
# HELP cortex_query_frontend_regexp_matcher_count Total number of regexp matchers
# TYPE cortex_query_frontend_regexp_matcher_count counter
cortex_query_frontend_regexp_matcher_count 1
# HELP cortex_query_frontend_regexp_matcher_optimized_count Total number of optimized regexp matchers
# TYPE cortex_query_frontend_regexp_matcher_optimized_count counter
cortex_query_frontend_regexp_matcher_optimized_count 1
# HELP cortex_query_frontend_queries_consistency_total Total number of queries that explicitly request a level of consistency.
# TYPE cortex_query_frontend_queries_consistency_total counter
cortex_query_frontend_queries_consistency_total{consistency="strong",user="test"} 1
`),
expectedQueryDetails: QueryDetails{
QuerierStats: &querier_stats.Stats{},
Start: start.Truncate(time.Millisecond),
End: end.Truncate(time.Millisecond),
MinT: start.Truncate(time.Millisecond).Add(-5 * time.Minute),
MaxT: end.Truncate(time.Millisecond),
Step: step,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
mw := newQueryStatsMiddleware(reg, newEngine())
actualDetails, ctx := ContextWithEmptyDetails(context.Background())
_, err := mw.Wrap(mockHandlerWith(nil, nil)).Do(user.InjectOrgID(ctx, "test"), tt.args.req)
ctx := context.Background()
if tt.args.ctx != nil {
ctx = tt.args.ctx
}
actualDetails, ctx := ContextWithEmptyDetails(ctx)
ctx = user.InjectOrgID(ctx, tenantID)

_, err := mw.Wrap(mockHandlerWith(nil, nil)).Do(ctx, tt.args.req)

require.NoError(t, err)
assert.NoError(t, testutil.GatherAndCompare(reg, tt.expectedMetrics))
assert.Equal(t, tt.expectedQueryDetails, *actualDetails)
Expand Down
4 changes: 4 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/frontend/querymiddleware"
querierapi "github.com/grafana/mimir/pkg/querier/api"
querier_stats "github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/activitytracker"
Expand Down Expand Up @@ -300,6 +301,9 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
"results_cache_hit_bytes", details.ResultsCacheHitBytes,
"results_cache_miss_bytes", details.ResultsCacheMissBytes,
)
if consistency, ok := querierapi.ReadConsistencyFromContext(r.Context()); ok {
logMessage = append(logMessage, "read_consistency", consistency)
}
}
if len(f.cfg.LogQueryRequestHeaders) != 0 {
logMessage = append(logMessage, formatRequestHeaders(&r.Header, f.cfg.LogQueryRequestHeaders)...)
Expand Down
6 changes: 6 additions & 0 deletions pkg/ingester/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream clie
return fmt.Errorf("error parsing label matchers: %w", err)
}

// Enforce read consistency before getting TSDB (covers the case the tenant's data has not been ingested
// in this ingester yet, but there's some to ingest in the backlog).
if err := i.enforceReadConsistency(ctx, userID); err != nil {
return err
}

db := i.getTSDB(userID)
if db == nil {
level.Debug(i.logger).Log("msg", "no TSDB for user", "userID", userID)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/mimir/pkg/mimirpb"
querierapi "github.com/grafana/mimir/pkg/querier/api"
)

// HealthAndIngesterClient is the union of IngesterClient and grpc_health_v1.HealthClient.
Expand All @@ -43,6 +44,8 @@ func MakeIngesterClient(inst ring.InstanceDesc, cfg Config, metrics *Metrics, lo
if cfg.CircuitBreaker.Enabled {
unary = append([]grpc.UnaryClientInterceptor{NewCircuitBreaker(inst, cfg.CircuitBreaker, metrics, logger)}, unary...)
}
unary = append(unary, querierapi.ReadConsistencyClientUnaryInterceptor)
stream = append(stream, querierapi.ReadConsistencyClientStreamInterceptor)

dialOpts, err := cfg.GRPCClientConfig.DialOption(unary, stream)
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3563,7 +3563,13 @@ func (i *Ingester) enforceReadConsistency(ctx context.Context, tenantID string)
return nil
}

if i.limits.IngestStorageReadConsistency(tenantID) != api.ReadConsistencyStrong {
var cLevel string
if c, ok := api.ReadConsistencyFromContext(ctx); ok {
cLevel = c
} else {
cLevel = i.limits.IngestStorageReadConsistency(tenantID)
}
if cLevel == api.ReadConsistencyEventual {
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier"
querierapi "github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/querier/tenantfederation"
querier_worker "github.com/grafana/mimir/pkg/querier/worker"
"github.com/grafana/mimir/pkg/ruler"
Expand Down Expand Up @@ -727,6 +728,8 @@ func New(cfg Config, reg prometheus.Registerer) (*Mimir, error) {
if cfg.TenantFederation.Enabled && cfg.Ruler.TenantFederation.Enabled {
util_log.WarnExperimentalUse("ruler.tenant-federation")
}
cfg.Server.GRPCMiddleware = append(cfg.Server.GRPCMiddleware, querierapi.ReadConsistencyServerUnaryInterceptor)
cfg.Server.GRPCStreamMiddleware = append(cfg.Server.GRPCStreamMiddleware, querierapi.ReadConsistencyServerStreamInterceptor)

cfg.API.HTTPAuthMiddleware = noauth.SetupAuthMiddleware(
&cfg.Server,
Expand Down
Loading

0 comments on commit a2b6d4e

Please sign in to comment.