From 75de5c97c3a5e8f57c2811c07fbb4202be441c67 Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Mon, 5 Oct 2020 19:05:45 +0300 Subject: [PATCH 01/11] query: introduce dynamic lookback interval Closes #2608 This allows queries with large step to make use of downsampled data. Signed-off-by: Vladimir Kononov --- cmd/thanos/query.go | 73 +++++++++++++++++++++----- cmd/thanos/query_test.go | 107 +++++++++++++++++++++++++++++++++++++++ docs/components/query.md | 3 ++ pkg/api/query/v1.go | 15 ++++-- pkg/api/query/v1_test.go | 39 +++++++------- 5 files changed, 201 insertions(+), 36 deletions(-) create mode 100644 cmd/thanos/query_test.go diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 4ce74da495..e94d9d20b0 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -8,6 +8,7 @@ import ( "fmt" "math" "net/http" + "strconv" "strings" "time" @@ -73,6 +74,7 @@ func registerQuery(app *extkingpin.App) { Default("20").Int() lookbackDelta := cmd.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations. PromQL always evaluates the query for the certain timestamp (query range timestamps are deduced by step). Since scrape intervals might be different, PromQL looks back for given amount of time to get latest sample. If it exceeds the maximum lookback delta it assumes series is stale and returns none (a gap). This is why lookback delta should be set to at least 2 times of the slowest scrape interval. If unset it will use the promql default of 5m.").Duration() + dynamicLookbackDelta := cmd.Flag("query.dynamic-lookback-delta", "Allow for larger lookback duration for queries based on resolution.").Default("false").Bool() maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query."). Default("4").Int() @@ -181,6 +183,7 @@ func registerQuery(app *extkingpin.App) { *maxConcurrentSelects, time.Duration(*queryTimeout), *lookbackDelta, + *dynamicLookbackDelta, time.Duration(*defaultEvaluationInterval), time.Duration(*storeResponseTimeout), *queryReplicaLabels, @@ -230,6 +233,7 @@ func runQuery( maxConcurrentSelects int, queryTimeout time.Duration, lookbackDelta time.Duration, + dynamicLookbackDelta bool, defaultEvaluationInterval time.Duration, storeResponseTimeout time.Duration, queryReplicaLabels []string, @@ -317,20 +321,30 @@ func runQuery( maxConcurrentSelects, queryTimeout, ) - engine = promql.NewEngine( - promql.EngineOpts{ - Logger: logger, - Reg: reg, - // TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703. - MaxSamples: math.MaxInt32, - Timeout: queryTimeout, - NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 { - return defaultEvaluationInterval.Milliseconds() + subqueryIntervalFunc = func(rangeMillis int64) int64 { + return defaultEvaluationInterval.Milliseconds() + } + totalEngines int + newEngine = func(ld time.Duration) *promql.Engine { + var r prometheus.Registerer = reg + if dynamicLookbackDelta { + r = extprom.WrapRegistererWith(map[string]string{"engine": strconv.Itoa(totalEngines)}, reg) + totalEngines++ + } + return promql.NewEngine( + promql.EngineOpts{ + Logger: logger, + Reg: r, + // TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703. + MaxSamples: math.MaxInt32, + Timeout: queryTimeout, + NoStepSubqueryIntervalFn: subqueryIntervalFunc, + LookbackDelta: ld, }, - LookbackDelta: lookbackDelta, - }, - ) + ) + } ) + // Periodically update the store set with the addresses we see in our cluster. { ctx, cancel := context.WithCancel(context.Background()) @@ -440,7 +454,7 @@ func runQuery( api := v1.NewQueryAPI( logger, stores, - engine, + engineFunc(newEngine, lookbackDelta, dynamicLookbackDelta), queryableCreator, // NOTE: Will share the same replica label as the query for now. rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels), @@ -536,3 +550,36 @@ func firstDuplicate(ss []string) string { return "" } + +func engineFunc(newEngine func(time.Duration) *promql.Engine, lookbackDelta time.Duration, dynamicLookbackDelta bool) func(int64) *promql.Engine { + deltas := []int64{0} + if dynamicLookbackDelta { + deltas = []int64{0, 5 * time.Minute.Milliseconds(), 1 * time.Hour.Milliseconds()} + } + var ( + engines = make([]*promql.Engine, len(deltas)) + defaultEngine = newEngine(lookbackDelta) + ld = lookbackDelta.Milliseconds() + ) + engines[0] = defaultEngine + for i, d := range deltas[1:] { + if ld < d { + // Convert delta from milliseconds to time.Duration (nanoseconds). + engines[i+1] = newEngine(time.Duration(d * 1_000_000)) + } else { + engines[i+1] = defaultEngine + } + } + return func(maxSourceResolutionMillis int64) *promql.Engine { + for i := len(deltas) - 1; i >= 1; i-- { + left := deltas[i-1] + if deltas[i-1] < ld { + left = ld + } + if left < maxSourceResolutionMillis { + return engines[i] + } + } + return engines[0] + } +} diff --git a/cmd/thanos/query_test.go b/cmd/thanos/query_test.go new file mode 100644 index 0000000000..fad668b8fb --- /dev/null +++ b/cmd/thanos/query_test.go @@ -0,0 +1,107 @@ +package main + +import ( + "testing" + "time" + + "github.com/prometheus/prometheus/promql" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestEngineFunc(t *testing.T) { + var ( + engineDefault = promql.NewEngine(promql.EngineOpts{LookbackDelta: 123}) + engine5m = promql.NewEngine(promql.EngineOpts{LookbackDelta: 5 * time.Minute}) + engine1h = promql.NewEngine(promql.EngineOpts{LookbackDelta: 1 * time.Hour}) + ) + fakeNewEngine := func(lookback time.Duration) *promql.Engine { + switch lookback { + case 1 * time.Hour: + return engine1h + case 5 * time.Minute: + return engine5m + default: + return engineDefault + } + } + type testCase struct { + stepMillis int64 + expect *promql.Engine + } + var ( + minute = time.Minute.Milliseconds() + hour = time.Hour.Milliseconds() + tData = []struct { + lookbackDelta time.Duration + dynamicLookbackDelta bool + tcs []testCase + }{ + { + // Non-dynamic lookbackDelta should always return the same engine. + lookbackDelta: 0, + dynamicLookbackDelta: false, + tcs: []testCase{ + {0, engineDefault}, + {5 * minute, engineDefault}, + {1 * hour, engineDefault}, + }, + }, + { + lookbackDelta: 3 * time.Minute, + dynamicLookbackDelta: true, + tcs: []testCase{ + {2 * minute, engineDefault}, + {3 * minute, engineDefault}, + {4 * minute, engine5m}, + {5 * minute, engine5m}, + {6 * minute, engine1h}, + {1 * hour, engine1h}, + {2 * hour, engine1h}, + }, + }, + { + lookbackDelta: 5 * time.Minute, + dynamicLookbackDelta: true, + tcs: []testCase{ + {0, engine5m}, + {5 * minute, engine5m}, + {6 * minute, engine1h}, + {59 * minute, engine1h}, + {1 * hour, engine1h}, + {2 * hour, engine1h}, + }, + }, + { + lookbackDelta: 30 * time.Minute, + dynamicLookbackDelta: true, + tcs: []testCase{ + {0, engineDefault}, + {5 * minute, engineDefault}, + {30 * minute, engineDefault}, + {31 * minute, engine1h}, + {59 * minute, engine1h}, + {1 * hour, engine1h}, + {2 * hour, engine1h}, + }, + }, + { + lookbackDelta: 1 * time.Hour, + dynamicLookbackDelta: true, + tcs: []testCase{ + {0, engine1h}, + {5 * minute, engine1h}, + {1 * hour, engine1h}, + {2 * hour, engine1h}, + }, + }, + } + ) + for _, td := range tData { + e := engineFunc(fakeNewEngine, td.lookbackDelta, td.dynamicLookbackDelta) + for _, tc := range td.tcs { + got := e(tc.stepMillis) + testutil.Equals(t, tc.expect, got) + } + } +} diff --git a/docs/components/query.md b/docs/components/query.md index 5dfacab116..f5bb024c12 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -380,6 +380,9 @@ Flags: lookback delta should be set to at least 2 times of the slowest scrape interval. If unset it will use the promql default of 5m. + --query.dynamic-lookback-delta + Allow for larger lookback duration for + queries based on resolution. --query.max-concurrent-select=4 Maximum number of select requests made concurrently per a query. diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index eab980379f..7e006d6112 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -64,8 +64,9 @@ type QueryAPI struct { logger log.Logger gate gate.Gate queryableCreate query.QueryableCreator - queryEngine *promql.Engine - ruleGroups rules.UnaryClient + // queryEngine returns appropriate promql.Engine for a query with a given step. + queryEngine func(int64) *promql.Engine + ruleGroups rules.UnaryClient enableAutodownsampling bool enableQueryPartialResponse bool @@ -82,7 +83,7 @@ type QueryAPI struct { func NewQueryAPI( logger log.Logger, storeSet *query.StoreSet, - qe *promql.Engine, + qe func(int64) *promql.Engine, c query.QueryableCreator, ruleGroups rules.UnaryClient, enableAutodownsampling bool, @@ -265,11 +266,13 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro return nil, nil, apiErr } + qe := qapi.queryEngine(maxSourceResolution) + // We are starting promQL tracing span here, because we have no control over promQL code. span, ctx := tracing.StartSpan(ctx, "promql_instant_query") defer span.Finish() - qry, err := qapi.queryEngine.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts) + qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err} } @@ -370,11 +373,13 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap return nil, nil, apiErr } + qe := qapi.queryEngine(maxSourceResolution) + // We are starting promQL tracing span here, because we have no control over promQL code. span, ctx := tracing.StartSpan(ctx, "promql_range_query") defer span.Finish() - qry, err := qapi.queryEngine.NewRangeQuery( + qry, err := qe.NewRangeQuery( qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), start, diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index c680c10f12..428a9511d2 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -167,17 +167,20 @@ func TestQueryEndpoints(t *testing.T) { now := time.Now() timeout := 100 * time.Second + qe := promql.NewEngine(promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: timeout, + }) api := &QueryAPI{ baseAPI: &baseAPI.BaseAPI{ Now: func() time.Time { return now }, }, queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout), - queryEngine: promql.NewEngine(promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10000, - Timeout: timeout, - }), + queryEngine: func(int64) *promql.Engine { + return qe + }, gate: gate.New(nil, 4), } @@ -657,17 +660,20 @@ func TestMetadataEndpoints(t *testing.T) { now := time.Now() timeout := 100 * time.Second + qe := promql.NewEngine(promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: timeout, + }) api := &QueryAPI{ baseAPI: &baseAPI.BaseAPI{ Now: func() time.Time { return now }, }, queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout), - queryEngine: promql.NewEngine(promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10000, - Timeout: timeout, - }), + queryEngine: func(int64) *promql.Engine { + return qe + }, gate: gate.New(nil, 4), } apiWithLabelLookback := &QueryAPI{ @@ -675,12 +681,9 @@ func TestMetadataEndpoints(t *testing.T) { Now: func() time.Time { return now }, }, queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout), - queryEngine: promql.NewEngine(promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10000, - Timeout: timeout, - }), + queryEngine: func(int64) *promql.Engine { + return qe + }, gate: gate.New(nil, 4), defaultMetadataTimeRange: apiLookbackDelta, } From 71a61eb78467bdef7c20ea0a71235d6769a7ae29 Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Tue, 6 Oct 2020 02:06:44 +0300 Subject: [PATCH 02/11] Fix minor checks Signed-off-by: Vladimir Kononov --- cmd/thanos/query.go | 2 +- docs/components/query.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index e94d9d20b0..ae7e8e41f6 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -321,7 +321,7 @@ func runQuery( maxConcurrentSelects, queryTimeout, ) - subqueryIntervalFunc = func(rangeMillis int64) int64 { + subqueryIntervalFunc = func(int64) int64 { return defaultEvaluationInterval.Milliseconds() } totalEngines int diff --git a/docs/components/query.md b/docs/components/query.md index f5bb024c12..b914268b0d 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -381,8 +381,8 @@ Flags: times of the slowest scrape interval. If unset it will use the promql default of 5m. --query.dynamic-lookback-delta - Allow for larger lookback duration for - queries based on resolution. + Allow for larger lookback duration for queries + based on resolution. --query.max-concurrent-select=4 Maximum number of select requests made concurrently per a query. From 5262d9834c569d78c84af6c47605276738843cd4 Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Tue, 6 Oct 2020 02:07:08 +0300 Subject: [PATCH 03/11] Append changelog Signed-off-by: Vladimir Kononov --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b249dd62ab..31f26e1e87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#3146](https://github.com/thanos-io/thanos/pull/3146) Sidecar: Add `thanos_sidecar_prometheus_store_received_frames` histogram metric. - [#3147](https://github.com/thanos-io/thanos/pull/3147) Querier: Add `query.metadata.default-time-range` flag to specify the default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified. The zero value means range covers the time since the beginning. - [#3207](https://github.com/thanos-io/thanos/pull/3207) Query Frontend: Add `cache-compression-type` flag to use compression in the query frontend cache. +- [#3277](https://github.com/thanos-io/thanos/pull/3277) Thanos Query: Introduce dynamic lookback interval. This allows queries with large step to make use of downsampled data. ### Changed From 6935ece3267ad1481b619d189d336c03d95c59b1 Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Tue, 6 Oct 2020 02:14:46 +0300 Subject: [PATCH 04/11] Add missing copyright Signed-off-by: Vladimir Kononov --- cmd/thanos/query_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/thanos/query_test.go b/cmd/thanos/query_test.go index fad668b8fb..403d2b2faa 100644 --- a/cmd/thanos/query_test.go +++ b/cmd/thanos/query_test.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package main import ( From 2824f45d7b68cf7a3720b784daa1b37b925c8295 Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Tue, 6 Oct 2020 15:50:37 +0300 Subject: [PATCH 05/11] Use pre-defined downsampling resolution constatns Signed-off-by: Vladimir Kononov --- cmd/thanos/query.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index ae7e8e41f6..879767554c 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -25,13 +25,13 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" - "github.com/thanos-io/thanos/pkg/extkingpin" - v1 "github.com/thanos-io/thanos/pkg/api/query" + "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/discovery/cache" "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/thanos-io/thanos/pkg/extgrpc" + "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/gate" @@ -552,9 +552,9 @@ func firstDuplicate(ss []string) string { } func engineFunc(newEngine func(time.Duration) *promql.Engine, lookbackDelta time.Duration, dynamicLookbackDelta bool) func(int64) *promql.Engine { - deltas := []int64{0} + deltas := []int64{downsample.ResLevel0} if dynamicLookbackDelta { - deltas = []int64{0, 5 * time.Minute.Milliseconds(), 1 * time.Hour.Milliseconds()} + deltas = []int64{downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2} } var ( engines = make([]*promql.Engine, len(deltas)) From b491083e8343493d4466ec7ab3297b2bde87b58e Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Thu, 8 Oct 2020 19:13:59 +0300 Subject: [PATCH 06/11] Use dynamic lookback delta by default Co-authored-by: Bartlomiej Plotka Signed-off-by: Vladimir Kononov --- cmd/thanos/query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 879767554c..f2b112f076 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -74,7 +74,7 @@ func registerQuery(app *extkingpin.App) { Default("20").Int() lookbackDelta := cmd.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations. PromQL always evaluates the query for the certain timestamp (query range timestamps are deduced by step). Since scrape intervals might be different, PromQL looks back for given amount of time to get latest sample. If it exceeds the maximum lookback delta it assumes series is stale and returns none (a gap). This is why lookback delta should be set to at least 2 times of the slowest scrape interval. If unset it will use the promql default of 5m.").Duration() - dynamicLookbackDelta := cmd.Flag("query.dynamic-lookback-delta", "Allow for larger lookback duration for queries based on resolution.").Default("false").Bool() + dynamicLookbackDelta := cmd.Flag("query.dynamic-lookback-delta", "Allow for larger lookback duration for queries based on resolution.").Hidden().Default("true").Bool() maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query."). Default("4").Int() From 29ea24eb6ba480ff863f997f6f61b00a65ced34e Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Thu, 8 Oct 2020 19:23:55 +0300 Subject: [PATCH 07/11] Rename defaultEngine to rawEngine Signed-off-by: Vladimir Kononov --- cmd/thanos/query.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index f2b112f076..2561c53763 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -557,17 +557,17 @@ func engineFunc(newEngine func(time.Duration) *promql.Engine, lookbackDelta time deltas = []int64{downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2} } var ( - engines = make([]*promql.Engine, len(deltas)) - defaultEngine = newEngine(lookbackDelta) - ld = lookbackDelta.Milliseconds() + engines = make([]*promql.Engine, len(deltas)) + rawEngine = newEngine(lookbackDelta) + ld = lookbackDelta.Milliseconds() ) - engines[0] = defaultEngine + engines[0] = rawEngine for i, d := range deltas[1:] { if ld < d { // Convert delta from milliseconds to time.Duration (nanoseconds). engines[i+1] = newEngine(time.Duration(d * 1_000_000)) } else { - engines[i+1] = defaultEngine + engines[i+1] = rawEngine } } return func(maxSourceResolutionMillis int64) *promql.Engine { From 7413ba5a93064251e1e46e484c7cd3aa9e3747c1 Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Fri, 9 Oct 2020 03:03:39 +0300 Subject: [PATCH 08/11] Merge engineFunc and newEngine into single engineFactory Signed-off-by: Vladimir Kononov --- cmd/thanos/query.go | 78 +++++++++++++++++++++++++--------------- cmd/thanos/query_test.go | 31 ++++++++-------- 2 files changed, 66 insertions(+), 43 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 2561c53763..ae8ebbf66b 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -321,27 +321,16 @@ func runQuery( maxConcurrentSelects, queryTimeout, ) - subqueryIntervalFunc = func(int64) int64 { - return defaultEvaluationInterval.Milliseconds() - } - totalEngines int - newEngine = func(ld time.Duration) *promql.Engine { - var r prometheus.Registerer = reg - if dynamicLookbackDelta { - r = extprom.WrapRegistererWith(map[string]string{"engine": strconv.Itoa(totalEngines)}, reg) - totalEngines++ - } - return promql.NewEngine( - promql.EngineOpts{ - Logger: logger, - Reg: r, - // TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703. - MaxSamples: math.MaxInt32, - Timeout: queryTimeout, - NoStepSubqueryIntervalFn: subqueryIntervalFunc, - LookbackDelta: ld, - }, - ) + engineOpts = promql.EngineOpts{ + Logger: logger, + Reg: reg, + // TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703. + MaxSamples: math.MaxInt32, + Timeout: queryTimeout, + LookbackDelta: lookbackDelta, + NoStepSubqueryIntervalFn: func(int64) int64 { + return defaultEvaluationInterval.Milliseconds() + }, } ) @@ -454,7 +443,7 @@ func runQuery( api := v1.NewQueryAPI( logger, stores, - engineFunc(newEngine, lookbackDelta, dynamicLookbackDelta), + engineFactory(promql.NewEngine, engineOpts, dynamicLookbackDelta), queryableCreator, // NOTE: Will share the same replica label as the query for now. rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels), @@ -551,21 +540,54 @@ func firstDuplicate(ss []string) string { return "" } -func engineFunc(newEngine func(time.Duration) *promql.Engine, lookbackDelta time.Duration, dynamicLookbackDelta bool) func(int64) *promql.Engine { +// engineFactory creates from 1 to 3 promql.Engines depending on +// dynamicLookbackDelta and eo.LookbackDelta and returns a function +// that returns appropriate engine for given maxSourceResolutionMillis. +// +// TODO: it seems like a good idea to tweak Prometheus itself +// instead of creating several Engines here. +func engineFactory( + newEngine func(promql.EngineOpts) *promql.Engine, + eo promql.EngineOpts, + dynamicLookbackDelta bool, +) func(int64) *promql.Engine { deltas := []int64{downsample.ResLevel0} if dynamicLookbackDelta { deltas = []int64{downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2} } var ( - engines = make([]*promql.Engine, len(deltas)) - rawEngine = newEngine(lookbackDelta) - ld = lookbackDelta.Milliseconds() + engines = make([]*promql.Engine, len(deltas)) + ld = eo.LookbackDelta.Milliseconds() + totalEngines int ) + wrapReg := func() prometheus.Registerer { + wr := extprom.WrapRegistererWith(map[string]string{"engine": strconv.Itoa(totalEngines)}, eo.Reg) + totalEngines++ + return wr + } + rawEngine := newEngine(promql.EngineOpts{ + Logger: eo.Logger, + Reg: wrapReg(), + MaxSamples: eo.MaxSamples, + Timeout: eo.Timeout, + ActiveQueryTracker: eo.ActiveQueryTracker, + LookbackDelta: eo.LookbackDelta, + NoStepSubqueryIntervalFn: eo.NoStepSubqueryIntervalFn, + }) + engines[0] = rawEngine for i, d := range deltas[1:] { if ld < d { - // Convert delta from milliseconds to time.Duration (nanoseconds). - engines[i+1] = newEngine(time.Duration(d * 1_000_000)) + totalEngines++ + engines[i+1] = newEngine(promql.EngineOpts{ + Logger: eo.Logger, + Reg: wrapReg(), + MaxSamples: eo.MaxSamples, + Timeout: eo.Timeout, + ActiveQueryTracker: eo.ActiveQueryTracker, + LookbackDelta: time.Duration(d) * time.Millisecond, + NoStepSubqueryIntervalFn: eo.NoStepSubqueryIntervalFn, + }) } else { engines[i+1] = rawEngine } diff --git a/cmd/thanos/query_test.go b/cmd/thanos/query_test.go index 403d2b2faa..c9676ef9cd 100644 --- a/cmd/thanos/query_test.go +++ b/cmd/thanos/query_test.go @@ -14,18 +14,18 @@ import ( func TestEngineFunc(t *testing.T) { var ( - engineDefault = promql.NewEngine(promql.EngineOpts{LookbackDelta: 123}) - engine5m = promql.NewEngine(promql.EngineOpts{LookbackDelta: 5 * time.Minute}) - engine1h = promql.NewEngine(promql.EngineOpts{LookbackDelta: 1 * time.Hour}) + engineRaw = promql.NewEngine(promql.EngineOpts{}) + engine5m = promql.NewEngine(promql.EngineOpts{LookbackDelta: 5 * time.Minute}) + engine1h = promql.NewEngine(promql.EngineOpts{LookbackDelta: 1 * time.Hour}) ) - fakeNewEngine := func(lookback time.Duration) *promql.Engine { - switch lookback { + mockNewEngine := func(opts promql.EngineOpts) *promql.Engine { + switch opts.LookbackDelta { case 1 * time.Hour: return engine1h case 5 * time.Minute: return engine5m default: - return engineDefault + return engineRaw } } type testCase struct { @@ -45,17 +45,18 @@ func TestEngineFunc(t *testing.T) { lookbackDelta: 0, dynamicLookbackDelta: false, tcs: []testCase{ - {0, engineDefault}, - {5 * minute, engineDefault}, - {1 * hour, engineDefault}, + {0, engineRaw}, + {5 * minute, engineRaw}, + {1 * hour, engineRaw}, }, }, { + lookbackDelta: 3 * time.Minute, dynamicLookbackDelta: true, tcs: []testCase{ - {2 * minute, engineDefault}, - {3 * minute, engineDefault}, + {2 * minute, engineRaw}, + {3 * minute, engineRaw}, {4 * minute, engine5m}, {5 * minute, engine5m}, {6 * minute, engine1h}, @@ -79,9 +80,9 @@ func TestEngineFunc(t *testing.T) { lookbackDelta: 30 * time.Minute, dynamicLookbackDelta: true, tcs: []testCase{ - {0, engineDefault}, - {5 * minute, engineDefault}, - {30 * minute, engineDefault}, + {0, engineRaw}, + {5 * minute, engineRaw}, + {30 * minute, engineRaw}, {31 * minute, engine1h}, {59 * minute, engine1h}, {1 * hour, engine1h}, @@ -101,7 +102,7 @@ func TestEngineFunc(t *testing.T) { } ) for _, td := range tData { - e := engineFunc(fakeNewEngine, td.lookbackDelta, td.dynamicLookbackDelta) + e := engineFactory(mockNewEngine, promql.EngineOpts{LookbackDelta: td.lookbackDelta}, td.dynamicLookbackDelta) for _, tc := range td.tcs { got := e(tc.stepMillis) testutil.Equals(t, tc.expect, got) From 22acb877518f2d9d82568f17bc45720d0bd9b53c Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Fri, 9 Oct 2020 03:20:13 +0300 Subject: [PATCH 09/11] Remove query.dynamic-lookback-delta from docs Signed-off-by: Vladimir Kononov --- docs/components/query.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/components/query.md b/docs/components/query.md index b914268b0d..5dfacab116 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -380,9 +380,6 @@ Flags: lookback delta should be set to at least 2 times of the slowest scrape interval. If unset it will use the promql default of 5m. - --query.dynamic-lookback-delta - Allow for larger lookback duration for queries - based on resolution. --query.max-concurrent-select=4 Maximum number of select requests made concurrently per a query. From cfbcd461ff6d88d5ea118d4a17dcdad9387b7e29 Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Fri, 9 Oct 2020 03:23:51 +0300 Subject: [PATCH 10/11] Rename test Signed-off-by: Vladimir Kononov --- cmd/thanos/query_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/thanos/query_test.go b/cmd/thanos/query_test.go index c9676ef9cd..df5cebac57 100644 --- a/cmd/thanos/query_test.go +++ b/cmd/thanos/query_test.go @@ -12,7 +12,7 @@ import ( "github.com/thanos-io/thanos/pkg/testutil" ) -func TestEngineFunc(t *testing.T) { +func TestEngineFactory(t *testing.T) { var ( engineRaw = promql.NewEngine(promql.EngineOpts{}) engine5m = promql.NewEngine(promql.EngineOpts{LookbackDelta: 5 * time.Minute}) From 886ddd8600aac733419053c5a15b59efee0909bb Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Fri, 6 Nov 2020 17:54:04 +0300 Subject: [PATCH 11/11] Review fixes Signed-off-by: Vladimir Kononov --- cmd/thanos/query.go | 58 +++++++++++++++++---------------------------- 1 file changed, 22 insertions(+), 36 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index ae8ebbf66b..8fe8330b30 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -551,51 +551,37 @@ func engineFactory( eo promql.EngineOpts, dynamicLookbackDelta bool, ) func(int64) *promql.Engine { - deltas := []int64{downsample.ResLevel0} + resolutions := []int64{downsample.ResLevel0} if dynamicLookbackDelta { - deltas = []int64{downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2} + resolutions = []int64{downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2} } var ( - engines = make([]*promql.Engine, len(deltas)) - ld = eo.LookbackDelta.Milliseconds() - totalEngines int + engines = make([]*promql.Engine, len(resolutions)) + ld = eo.LookbackDelta.Milliseconds() ) - wrapReg := func() prometheus.Registerer { - wr := extprom.WrapRegistererWith(map[string]string{"engine": strconv.Itoa(totalEngines)}, eo.Reg) - totalEngines++ - return wr + wrapReg := func(engineNum int) prometheus.Registerer { + return extprom.WrapRegistererWith(map[string]string{"engine": strconv.Itoa(engineNum)}, eo.Reg) } - rawEngine := newEngine(promql.EngineOpts{ - Logger: eo.Logger, - Reg: wrapReg(), - MaxSamples: eo.MaxSamples, - Timeout: eo.Timeout, - ActiveQueryTracker: eo.ActiveQueryTracker, - LookbackDelta: eo.LookbackDelta, - NoStepSubqueryIntervalFn: eo.NoStepSubqueryIntervalFn, - }) - engines[0] = rawEngine - for i, d := range deltas[1:] { - if ld < d { - totalEngines++ - engines[i+1] = newEngine(promql.EngineOpts{ - Logger: eo.Logger, - Reg: wrapReg(), - MaxSamples: eo.MaxSamples, - Timeout: eo.Timeout, - ActiveQueryTracker: eo.ActiveQueryTracker, - LookbackDelta: time.Duration(d) * time.Millisecond, - NoStepSubqueryIntervalFn: eo.NoStepSubqueryIntervalFn, - }) - } else { - engines[i+1] = rawEngine + lookbackDelta := eo.LookbackDelta + for i, r := range resolutions { + if ld < r { + lookbackDelta = time.Duration(r) * time.Millisecond } + engines[i] = newEngine(promql.EngineOpts{ + Logger: eo.Logger, + Reg: wrapReg(i), + MaxSamples: eo.MaxSamples, + Timeout: eo.Timeout, + ActiveQueryTracker: eo.ActiveQueryTracker, + LookbackDelta: lookbackDelta, + NoStepSubqueryIntervalFn: eo.NoStepSubqueryIntervalFn, + }) } return func(maxSourceResolutionMillis int64) *promql.Engine { - for i := len(deltas) - 1; i >= 1; i-- { - left := deltas[i-1] - if deltas[i-1] < ld { + for i := len(resolutions) - 1; i >= 1; i-- { + left := resolutions[i-1] + if resolutions[i-1] < ld { left = ld } if left < maxSourceResolutionMillis {