Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: introduce dynamic lookback interval #3277

Merged
merged 13 commits into from
Nov 6, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3146](https://github.com/thanos-io/thanos/pull/3146) Sidecar: Add `thanos_sidecar_prometheus_store_received_frames` histogram metric.
- [#3147](https://github.com/thanos-io/thanos/pull/3147) Querier: Add `query.metadata.default-time-range` flag to specify the default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified. The zero value means range covers the time since the beginning.
- [#3207](https://github.com/thanos-io/thanos/pull/3207) Query Frontend: Add `cache-compression-type` flag to use compression in the query frontend cache.
- [#3277](https://github.com/thanos-io/thanos/pull/3277) Thanos Query: Introduce dynamic lookback interval. This allows queries with large step to make use of downsampled data.

### Changed

Expand Down
77 changes: 62 additions & 15 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"math"
"net/http"
"strconv"
"strings"
"time"

Expand All @@ -24,13 +25,13 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"

"github.com/thanos-io/thanos/pkg/extkingpin"

v1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
Expand Down Expand Up @@ -73,6 +74,7 @@ func registerQuery(app *extkingpin.App) {
Default("20").Int()

lookbackDelta := cmd.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations. PromQL always evaluates the query for the certain timestamp (query range timestamps are deduced by step). Since scrape intervals might be different, PromQL looks back for given amount of time to get latest sample. If it exceeds the maximum lookback delta it assumes series is stale and returns none (a gap). This is why lookback delta should be set to at least 2 times of the slowest scrape interval. If unset it will use the promql default of 5m.").Duration()
dynamicLookbackDelta := cmd.Flag("query.dynamic-lookback-delta", "Allow for larger lookback duration for queries based on resolution.").Default("false").Bool()
krya-kryak marked this conversation as resolved.
Show resolved Hide resolved

maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query.").
Default("4").Int()
Expand Down Expand Up @@ -181,6 +183,7 @@ func registerQuery(app *extkingpin.App) {
*maxConcurrentSelects,
time.Duration(*queryTimeout),
*lookbackDelta,
*dynamicLookbackDelta,
time.Duration(*defaultEvaluationInterval),
time.Duration(*storeResponseTimeout),
*queryReplicaLabels,
Expand Down Expand Up @@ -230,6 +233,7 @@ func runQuery(
maxConcurrentSelects int,
queryTimeout time.Duration,
lookbackDelta time.Duration,
dynamicLookbackDelta bool,
defaultEvaluationInterval time.Duration,
storeResponseTimeout time.Duration,
queryReplicaLabels []string,
Expand Down Expand Up @@ -317,20 +321,30 @@ func runQuery(
maxConcurrentSelects,
queryTimeout,
)
engine = promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Reg: reg,
// TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703.
MaxSamples: math.MaxInt32,
Timeout: queryTimeout,
NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 {
return defaultEvaluationInterval.Milliseconds()
subqueryIntervalFunc = func(int64) int64 {
return defaultEvaluationInterval.Milliseconds()
}
totalEngines int
newEngine = func(ld time.Duration) *promql.Engine {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused, do we need two functions? What about engineFunc? I guess you wanted to have large number of args in function, but it might be cleaner to embed engineFunc here, not embedding this function in engineFunc. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're absolutely right, having two functions is confusing. Got some coffee, merged engineFunc and newEngine into single engineFactory. IMO things got a lot more unerstandable. I hope this isn't just coffee speaking.

var r prometheus.Registerer = reg
if dynamicLookbackDelta {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wow 😱 Makes sense, wonder if it's doable to contribute to Promtheus some tweak to allow us to avoid creating 3 engines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO for now

r = extprom.WrapRegistererWith(map[string]string{"engine": strconv.Itoa(totalEngines)}, reg)
totalEngines++
}
return promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Reg: r,
// TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703.
MaxSamples: math.MaxInt32,
Timeout: queryTimeout,
NoStepSubqueryIntervalFn: subqueryIntervalFunc,
LookbackDelta: ld,
},
LookbackDelta: lookbackDelta,
},
)
)
}
)

// Periodically update the store set with the addresses we see in our cluster.
{
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -440,7 +454,7 @@ func runQuery(
api := v1.NewQueryAPI(
logger,
stores,
engine,
engineFunc(newEngine, lookbackDelta, dynamicLookbackDelta),
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
Expand Down Expand Up @@ -536,3 +550,36 @@ func firstDuplicate(ss []string) string {

return ""
}

func engineFunc(newEngine func(time.Duration) *promql.Engine, lookbackDelta time.Duration, dynamicLookbackDelta bool) func(int64) *promql.Engine {
deltas := []int64{downsample.ResLevel0}
krya-kryak marked this conversation as resolved.
Show resolved Hide resolved
if dynamicLookbackDelta {
deltas = []int64{downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2}
}
var (
engines = make([]*promql.Engine, len(deltas))
defaultEngine = newEngine(lookbackDelta)
krya-kryak marked this conversation as resolved.
Show resolved Hide resolved
ld = lookbackDelta.Milliseconds()
)
engines[0] = defaultEngine
for i, d := range deltas[1:] {
if ld < d {
// Convert delta from milliseconds to time.Duration (nanoseconds).
engines[i+1] = newEngine(time.Duration(d * 1_000_000))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks weird and should fail our lints: 1_000_000 - what about using timestamp.Time(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced with standard time.Duration(d) * time.Millisecond. Prometheus'es timestamp seems unhelpful here, as it's Duration we need, not Time

} else {
krya-kryak marked this conversation as resolved.
Show resolved Hide resolved
engines[i+1] = defaultEngine
}
}
return func(maxSourceResolutionMillis int64) *promql.Engine {
for i := len(deltas) - 1; i >= 1; i-- {
left := deltas[i-1]
if deltas[i-1] < ld {
left = ld
}
if left < maxSourceResolutionMillis {
return engines[i]
}
}
return engines[0]
}
}
110 changes: 110 additions & 0 deletions cmd/thanos/query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package main

import (
"testing"
"time"

"github.com/prometheus/prometheus/promql"

"github.com/thanos-io/thanos/pkg/testutil"
)

func TestEngineFunc(t *testing.T) {
var (
engineDefault = promql.NewEngine(promql.EngineOpts{LookbackDelta: 123})
engine5m = promql.NewEngine(promql.EngineOpts{LookbackDelta: 5 * time.Minute})
engine1h = promql.NewEngine(promql.EngineOpts{LookbackDelta: 1 * time.Hour})
)
fakeNewEngine := func(lookback time.Duration) *promql.Engine {
switch lookback {
case 1 * time.Hour:
return engine1h
case 5 * time.Minute:
return engine5m
default:
return engineDefault
}
}
type testCase struct {
stepMillis int64
expect *promql.Engine
}
var (
minute = time.Minute.Milliseconds()
hour = time.Hour.Milliseconds()
tData = []struct {
lookbackDelta time.Duration
dynamicLookbackDelta bool
tcs []testCase
}{
{
// Non-dynamic lookbackDelta should always return the same engine.
lookbackDelta: 0,
dynamicLookbackDelta: false,
tcs: []testCase{
{0, engineDefault},
{5 * minute, engineDefault},
{1 * hour, engineDefault},
},
},
{
lookbackDelta: 3 * time.Minute,
dynamicLookbackDelta: true,
tcs: []testCase{
{2 * minute, engineDefault},
{3 * minute, engineDefault},
{4 * minute, engine5m},
{5 * minute, engine5m},
{6 * minute, engine1h},
{1 * hour, engine1h},
{2 * hour, engine1h},
},
},
{
lookbackDelta: 5 * time.Minute,
dynamicLookbackDelta: true,
tcs: []testCase{
{0, engine5m},
{5 * minute, engine5m},
{6 * minute, engine1h},
{59 * minute, engine1h},
{1 * hour, engine1h},
{2 * hour, engine1h},
},
},
{
lookbackDelta: 30 * time.Minute,
dynamicLookbackDelta: true,
tcs: []testCase{
{0, engineDefault},
{5 * minute, engineDefault},
{30 * minute, engineDefault},
{31 * minute, engine1h},
{59 * minute, engine1h},
{1 * hour, engine1h},
{2 * hour, engine1h},
},
},
{
lookbackDelta: 1 * time.Hour,
dynamicLookbackDelta: true,
tcs: []testCase{
{0, engine1h},
{5 * minute, engine1h},
{1 * hour, engine1h},
{2 * hour, engine1h},
},
},
}
)
for _, td := range tData {
e := engineFunc(fakeNewEngine, td.lookbackDelta, td.dynamicLookbackDelta)
for _, tc := range td.tcs {
got := e(tc.stepMillis)
testutil.Equals(t, tc.expect, got)
}
}
}
3 changes: 3 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ Flags:
lookback delta should be set to at least 2
times of the slowest scrape interval. If unset
it will use the promql default of 5m.
--query.dynamic-lookback-delta
Allow for larger lookback duration for queries
based on resolution.
--query.max-concurrent-select=4
Maximum number of select requests made
concurrently per a query.
Expand Down
15 changes: 10 additions & 5 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ type QueryAPI struct {
logger log.Logger
gate gate.Gate
queryableCreate query.QueryableCreator
queryEngine *promql.Engine
ruleGroups rules.UnaryClient
// queryEngine returns appropriate promql.Engine for a query with a given step.
queryEngine func(int64) *promql.Engine
ruleGroups rules.UnaryClient

enableAutodownsampling bool
enableQueryPartialResponse bool
Expand All @@ -82,7 +83,7 @@ type QueryAPI struct {
func NewQueryAPI(
logger log.Logger,
storeSet *query.StoreSet,
qe *promql.Engine,
qe func(int64) *promql.Engine,
c query.QueryableCreator,
ruleGroups rules.UnaryClient,
enableAutodownsampling bool,
Expand Down Expand Up @@ -265,11 +266,13 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
return nil, nil, apiErr
}

qe := qapi.queryEngine(maxSourceResolution)

// We are starting promQL tracing span here, because we have no control over promQL code.
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := qapi.queryEngine.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts)
qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
Expand Down Expand Up @@ -370,11 +373,13 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr
}

qe := qapi.queryEngine(maxSourceResolution)

// We are starting promQL tracing span here, because we have no control over promQL code.
span, ctx := tracing.StartSpan(ctx, "promql_range_query")
defer span.Finish()

qry, err := qapi.queryEngine.NewRangeQuery(
qry, err := qe.NewRangeQuery(
qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false),
r.FormValue("query"),
start,
Expand Down
39 changes: 21 additions & 18 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,20 @@ func TestQueryEndpoints(t *testing.T) {

now := time.Now()
timeout := 100 * time.Second
qe := promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
})
api := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryEngine: promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}),
queryEngine: func(int64) *promql.Engine {
return qe
},
gate: gate.New(nil, 4),
}

Expand Down Expand Up @@ -657,30 +660,30 @@ func TestMetadataEndpoints(t *testing.T) {

now := time.Now()
timeout := 100 * time.Second
qe := promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
})
api := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryEngine: promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}),
queryEngine: func(int64) *promql.Engine {
return qe
},
gate: gate.New(nil, 4),
}
apiWithLabelLookback := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryEngine: promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}),
queryEngine: func(int64) *promql.Engine {
return qe
},
gate: gate.New(nil, 4),
defaultMetadataTimeRange: apiLookbackDelta,
}
Expand Down