diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 4ce74da4955..e94d9d20b06 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -8,6 +8,7 @@ import ( "fmt" "math" "net/http" + "strconv" "strings" "time" @@ -73,6 +74,7 @@ func registerQuery(app *extkingpin.App) { Default("20").Int() lookbackDelta := cmd.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations. PromQL always evaluates the query for the certain timestamp (query range timestamps are deduced by step). Since scrape intervals might be different, PromQL looks back for given amount of time to get latest sample. If it exceeds the maximum lookback delta it assumes series is stale and returns none (a gap). This is why lookback delta should be set to at least 2 times of the slowest scrape interval. If unset it will use the promql default of 5m.").Duration() + dynamicLookbackDelta := cmd.Flag("query.dynamic-lookback-delta", "Allow for larger lookback duration for queries based on resolution.").Default("false").Bool() maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query."). Default("4").Int() @@ -181,6 +183,7 @@ func registerQuery(app *extkingpin.App) { *maxConcurrentSelects, time.Duration(*queryTimeout), *lookbackDelta, + *dynamicLookbackDelta, time.Duration(*defaultEvaluationInterval), time.Duration(*storeResponseTimeout), *queryReplicaLabels, @@ -230,6 +233,7 @@ func runQuery( maxConcurrentSelects int, queryTimeout time.Duration, lookbackDelta time.Duration, + dynamicLookbackDelta bool, defaultEvaluationInterval time.Duration, storeResponseTimeout time.Duration, queryReplicaLabels []string, @@ -317,20 +321,30 @@ func runQuery( maxConcurrentSelects, queryTimeout, ) - engine = promql.NewEngine( - promql.EngineOpts{ - Logger: logger, - Reg: reg, - // TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703. - MaxSamples: math.MaxInt32, - Timeout: queryTimeout, - NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 { - return defaultEvaluationInterval.Milliseconds() + subqueryIntervalFunc = func(rangeMillis int64) int64 { + return defaultEvaluationInterval.Milliseconds() + } + totalEngines int + newEngine = func(ld time.Duration) *promql.Engine { + var r prometheus.Registerer = reg + if dynamicLookbackDelta { + r = extprom.WrapRegistererWith(map[string]string{"engine": strconv.Itoa(totalEngines)}, reg) + totalEngines++ + } + return promql.NewEngine( + promql.EngineOpts{ + Logger: logger, + Reg: r, + // TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703. + MaxSamples: math.MaxInt32, + Timeout: queryTimeout, + NoStepSubqueryIntervalFn: subqueryIntervalFunc, + LookbackDelta: ld, }, - LookbackDelta: lookbackDelta, - }, - ) + ) + } ) + // Periodically update the store set with the addresses we see in our cluster. { ctx, cancel := context.WithCancel(context.Background()) @@ -440,7 +454,7 @@ func runQuery( api := v1.NewQueryAPI( logger, stores, - engine, + engineFunc(newEngine, lookbackDelta, dynamicLookbackDelta), queryableCreator, // NOTE: Will share the same replica label as the query for now. rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels), @@ -536,3 +550,36 @@ func firstDuplicate(ss []string) string { return "" } + +func engineFunc(newEngine func(time.Duration) *promql.Engine, lookbackDelta time.Duration, dynamicLookbackDelta bool) func(int64) *promql.Engine { + deltas := []int64{0} + if dynamicLookbackDelta { + deltas = []int64{0, 5 * time.Minute.Milliseconds(), 1 * time.Hour.Milliseconds()} + } + var ( + engines = make([]*promql.Engine, len(deltas)) + defaultEngine = newEngine(lookbackDelta) + ld = lookbackDelta.Milliseconds() + ) + engines[0] = defaultEngine + for i, d := range deltas[1:] { + if ld < d { + // Convert delta from milliseconds to time.Duration (nanoseconds). + engines[i+1] = newEngine(time.Duration(d * 1_000_000)) + } else { + engines[i+1] = defaultEngine + } + } + return func(maxSourceResolutionMillis int64) *promql.Engine { + for i := len(deltas) - 1; i >= 1; i-- { + left := deltas[i-1] + if deltas[i-1] < ld { + left = ld + } + if left < maxSourceResolutionMillis { + return engines[i] + } + } + return engines[0] + } +} diff --git a/cmd/thanos/query_test.go b/cmd/thanos/query_test.go new file mode 100644 index 00000000000..fad668b8fb1 --- /dev/null +++ b/cmd/thanos/query_test.go @@ -0,0 +1,107 @@ +package main + +import ( + "testing" + "time" + + "github.com/prometheus/prometheus/promql" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestEngineFunc(t *testing.T) { + var ( + engineDefault = promql.NewEngine(promql.EngineOpts{LookbackDelta: 123}) + engine5m = promql.NewEngine(promql.EngineOpts{LookbackDelta: 5 * time.Minute}) + engine1h = promql.NewEngine(promql.EngineOpts{LookbackDelta: 1 * time.Hour}) + ) + fakeNewEngine := func(lookback time.Duration) *promql.Engine { + switch lookback { + case 1 * time.Hour: + return engine1h + case 5 * time.Minute: + return engine5m + default: + return engineDefault + } + } + type testCase struct { + stepMillis int64 + expect *promql.Engine + } + var ( + minute = time.Minute.Milliseconds() + hour = time.Hour.Milliseconds() + tData = []struct { + lookbackDelta time.Duration + dynamicLookbackDelta bool + tcs []testCase + }{ + { + // Non-dynamic lookbackDelta should always return the same engine. + lookbackDelta: 0, + dynamicLookbackDelta: false, + tcs: []testCase{ + {0, engineDefault}, + {5 * minute, engineDefault}, + {1 * hour, engineDefault}, + }, + }, + { + lookbackDelta: 3 * time.Minute, + dynamicLookbackDelta: true, + tcs: []testCase{ + {2 * minute, engineDefault}, + {3 * minute, engineDefault}, + {4 * minute, engine5m}, + {5 * minute, engine5m}, + {6 * minute, engine1h}, + {1 * hour, engine1h}, + {2 * hour, engine1h}, + }, + }, + { + lookbackDelta: 5 * time.Minute, + dynamicLookbackDelta: true, + tcs: []testCase{ + {0, engine5m}, + {5 * minute, engine5m}, + {6 * minute, engine1h}, + {59 * minute, engine1h}, + {1 * hour, engine1h}, + {2 * hour, engine1h}, + }, + }, + { + lookbackDelta: 30 * time.Minute, + dynamicLookbackDelta: true, + tcs: []testCase{ + {0, engineDefault}, + {5 * minute, engineDefault}, + {30 * minute, engineDefault}, + {31 * minute, engine1h}, + {59 * minute, engine1h}, + {1 * hour, engine1h}, + {2 * hour, engine1h}, + }, + }, + { + lookbackDelta: 1 * time.Hour, + dynamicLookbackDelta: true, + tcs: []testCase{ + {0, engine1h}, + {5 * minute, engine1h}, + {1 * hour, engine1h}, + {2 * hour, engine1h}, + }, + }, + } + ) + for _, td := range tData { + e := engineFunc(fakeNewEngine, td.lookbackDelta, td.dynamicLookbackDelta) + for _, tc := range td.tcs { + got := e(tc.stepMillis) + testutil.Equals(t, tc.expect, got) + } + } +} diff --git a/docs/components/query.md b/docs/components/query.md index 5dfacab1168..f5bb024c12a 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -380,6 +380,9 @@ Flags: lookback delta should be set to at least 2 times of the slowest scrape interval. If unset it will use the promql default of 5m. + --query.dynamic-lookback-delta + Allow for larger lookback duration for + queries based on resolution. --query.max-concurrent-select=4 Maximum number of select requests made concurrently per a query. diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index eab980379f5..7e006d6112f 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -64,8 +64,9 @@ type QueryAPI struct { logger log.Logger gate gate.Gate queryableCreate query.QueryableCreator - queryEngine *promql.Engine - ruleGroups rules.UnaryClient + // queryEngine returns appropriate promql.Engine for a query with a given step. + queryEngine func(int64) *promql.Engine + ruleGroups rules.UnaryClient enableAutodownsampling bool enableQueryPartialResponse bool @@ -82,7 +83,7 @@ type QueryAPI struct { func NewQueryAPI( logger log.Logger, storeSet *query.StoreSet, - qe *promql.Engine, + qe func(int64) *promql.Engine, c query.QueryableCreator, ruleGroups rules.UnaryClient, enableAutodownsampling bool, @@ -265,11 +266,13 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro return nil, nil, apiErr } + qe := qapi.queryEngine(maxSourceResolution) + // We are starting promQL tracing span here, because we have no control over promQL code. span, ctx := tracing.StartSpan(ctx, "promql_instant_query") defer span.Finish() - qry, err := qapi.queryEngine.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts) + qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err} } @@ -370,11 +373,13 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap return nil, nil, apiErr } + qe := qapi.queryEngine(maxSourceResolution) + // We are starting promQL tracing span here, because we have no control over promQL code. span, ctx := tracing.StartSpan(ctx, "promql_range_query") defer span.Finish() - qry, err := qapi.queryEngine.NewRangeQuery( + qry, err := qe.NewRangeQuery( qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), start, diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index c680c10f122..428a9511d2b 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -167,17 +167,20 @@ func TestQueryEndpoints(t *testing.T) { now := time.Now() timeout := 100 * time.Second + qe := promql.NewEngine(promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: timeout, + }) api := &QueryAPI{ baseAPI: &baseAPI.BaseAPI{ Now: func() time.Time { return now }, }, queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout), - queryEngine: promql.NewEngine(promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10000, - Timeout: timeout, - }), + queryEngine: func(int64) *promql.Engine { + return qe + }, gate: gate.New(nil, 4), } @@ -657,17 +660,20 @@ func TestMetadataEndpoints(t *testing.T) { now := time.Now() timeout := 100 * time.Second + qe := promql.NewEngine(promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: timeout, + }) api := &QueryAPI{ baseAPI: &baseAPI.BaseAPI{ Now: func() time.Time { return now }, }, queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout), - queryEngine: promql.NewEngine(promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10000, - Timeout: timeout, - }), + queryEngine: func(int64) *promql.Engine { + return qe + }, gate: gate.New(nil, 4), } apiWithLabelLookback := &QueryAPI{ @@ -675,12 +681,9 @@ func TestMetadataEndpoints(t *testing.T) { Now: func() time.Time { return now }, }, queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout), - queryEngine: promql.NewEngine(promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10000, - Timeout: timeout, - }), + queryEngine: func(int64) *promql.Engine { + return qe + }, gate: gate.New(nil, 4), defaultMetadataTimeRange: apiLookbackDelta, }