Skip to content

Commit

Permalink
query: introduce dynamic lookback interval
Browse files Browse the repository at this point in the history
Closes thanos-io#2608
This allows queries with large step to make use of downsampled data.

Signed-off-by: Vladimir Kononov <krya-kryak@users.noreply.github.com>
  • Loading branch information
krya-kryak committed Oct 5, 2020
1 parent bb1662a commit 75de5c9
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 36 deletions.
73 changes: 60 additions & 13 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"math"
"net/http"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -73,6 +74,7 @@ func registerQuery(app *extkingpin.App) {
Default("20").Int()

lookbackDelta := cmd.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations. PromQL always evaluates the query for the certain timestamp (query range timestamps are deduced by step). Since scrape intervals might be different, PromQL looks back for given amount of time to get latest sample. If it exceeds the maximum lookback delta it assumes series is stale and returns none (a gap). This is why lookback delta should be set to at least 2 times of the slowest scrape interval. If unset it will use the promql default of 5m.").Duration()
dynamicLookbackDelta := cmd.Flag("query.dynamic-lookback-delta", "Allow for larger lookback duration for queries based on resolution.").Default("false").Bool()

maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query.").
Default("4").Int()
Expand Down Expand Up @@ -181,6 +183,7 @@ func registerQuery(app *extkingpin.App) {
*maxConcurrentSelects,
time.Duration(*queryTimeout),
*lookbackDelta,
*dynamicLookbackDelta,
time.Duration(*defaultEvaluationInterval),
time.Duration(*storeResponseTimeout),
*queryReplicaLabels,
Expand Down Expand Up @@ -230,6 +233,7 @@ func runQuery(
maxConcurrentSelects int,
queryTimeout time.Duration,
lookbackDelta time.Duration,
dynamicLookbackDelta bool,
defaultEvaluationInterval time.Duration,
storeResponseTimeout time.Duration,
queryReplicaLabels []string,
Expand Down Expand Up @@ -317,20 +321,30 @@ func runQuery(
maxConcurrentSelects,
queryTimeout,
)
engine = promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Reg: reg,
// TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703.
MaxSamples: math.MaxInt32,
Timeout: queryTimeout,
NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 {
return defaultEvaluationInterval.Milliseconds()
subqueryIntervalFunc = func(rangeMillis int64) int64 {
return defaultEvaluationInterval.Milliseconds()
}
totalEngines int
newEngine = func(ld time.Duration) *promql.Engine {
var r prometheus.Registerer = reg
if dynamicLookbackDelta {
r = extprom.WrapRegistererWith(map[string]string{"engine": strconv.Itoa(totalEngines)}, reg)
totalEngines++
}
return promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Reg: r,
// TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703.
MaxSamples: math.MaxInt32,
Timeout: queryTimeout,
NoStepSubqueryIntervalFn: subqueryIntervalFunc,
LookbackDelta: ld,
},
LookbackDelta: lookbackDelta,
},
)
)
}
)

// Periodically update the store set with the addresses we see in our cluster.
{
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -440,7 +454,7 @@ func runQuery(
api := v1.NewQueryAPI(
logger,
stores,
engine,
engineFunc(newEngine, lookbackDelta, dynamicLookbackDelta),
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
Expand Down Expand Up @@ -536,3 +550,36 @@ func firstDuplicate(ss []string) string {

return ""
}

func engineFunc(newEngine func(time.Duration) *promql.Engine, lookbackDelta time.Duration, dynamicLookbackDelta bool) func(int64) *promql.Engine {
deltas := []int64{0}
if dynamicLookbackDelta {
deltas = []int64{0, 5 * time.Minute.Milliseconds(), 1 * time.Hour.Milliseconds()}
}
var (
engines = make([]*promql.Engine, len(deltas))
defaultEngine = newEngine(lookbackDelta)
ld = lookbackDelta.Milliseconds()
)
engines[0] = defaultEngine
for i, d := range deltas[1:] {
if ld < d {
// Convert delta from milliseconds to time.Duration (nanoseconds).
engines[i+1] = newEngine(time.Duration(d * 1_000_000))
} else {
engines[i+1] = defaultEngine
}
}
return func(maxSourceResolutionMillis int64) *promql.Engine {
for i := len(deltas) - 1; i >= 1; i-- {
left := deltas[i-1]
if deltas[i-1] < ld {
left = ld
}
if left < maxSourceResolutionMillis {
return engines[i]
}
}
return engines[0]
}
}
107 changes: 107 additions & 0 deletions cmd/thanos/query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package main

import (
"testing"
"time"

"github.com/prometheus/prometheus/promql"

"github.com/thanos-io/thanos/pkg/testutil"
)

func TestEngineFunc(t *testing.T) {
var (
engineDefault = promql.NewEngine(promql.EngineOpts{LookbackDelta: 123})
engine5m = promql.NewEngine(promql.EngineOpts{LookbackDelta: 5 * time.Minute})
engine1h = promql.NewEngine(promql.EngineOpts{LookbackDelta: 1 * time.Hour})
)
fakeNewEngine := func(lookback time.Duration) *promql.Engine {
switch lookback {
case 1 * time.Hour:
return engine1h
case 5 * time.Minute:
return engine5m
default:
return engineDefault
}
}
type testCase struct {
stepMillis int64
expect *promql.Engine
}
var (
minute = time.Minute.Milliseconds()
hour = time.Hour.Milliseconds()
tData = []struct {
lookbackDelta time.Duration
dynamicLookbackDelta bool
tcs []testCase
}{
{
// Non-dynamic lookbackDelta should always return the same engine.
lookbackDelta: 0,
dynamicLookbackDelta: false,
tcs: []testCase{
{0, engineDefault},
{5 * minute, engineDefault},
{1 * hour, engineDefault},
},
},
{
lookbackDelta: 3 * time.Minute,
dynamicLookbackDelta: true,
tcs: []testCase{
{2 * minute, engineDefault},
{3 * minute, engineDefault},
{4 * minute, engine5m},
{5 * minute, engine5m},
{6 * minute, engine1h},
{1 * hour, engine1h},
{2 * hour, engine1h},
},
},
{
lookbackDelta: 5 * time.Minute,
dynamicLookbackDelta: true,
tcs: []testCase{
{0, engine5m},
{5 * minute, engine5m},
{6 * minute, engine1h},
{59 * minute, engine1h},
{1 * hour, engine1h},
{2 * hour, engine1h},
},
},
{
lookbackDelta: 30 * time.Minute,
dynamicLookbackDelta: true,
tcs: []testCase{
{0, engineDefault},
{5 * minute, engineDefault},
{30 * minute, engineDefault},
{31 * minute, engine1h},
{59 * minute, engine1h},
{1 * hour, engine1h},
{2 * hour, engine1h},
},
},
{
lookbackDelta: 1 * time.Hour,
dynamicLookbackDelta: true,
tcs: []testCase{
{0, engine1h},
{5 * minute, engine1h},
{1 * hour, engine1h},
{2 * hour, engine1h},
},
},
}
)
for _, td := range tData {
e := engineFunc(fakeNewEngine, td.lookbackDelta, td.dynamicLookbackDelta)
for _, tc := range td.tcs {
got := e(tc.stepMillis)
testutil.Equals(t, tc.expect, got)
}
}
}
3 changes: 3 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ Flags:
lookback delta should be set to at least 2
times of the slowest scrape interval. If unset
it will use the promql default of 5m.
--query.dynamic-lookback-delta
Allow for larger lookback duration for
queries based on resolution.
--query.max-concurrent-select=4
Maximum number of select requests made
concurrently per a query.
Expand Down
15 changes: 10 additions & 5 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ type QueryAPI struct {
logger log.Logger
gate gate.Gate
queryableCreate query.QueryableCreator
queryEngine *promql.Engine
ruleGroups rules.UnaryClient
// queryEngine returns appropriate promql.Engine for a query with a given step.
queryEngine func(int64) *promql.Engine
ruleGroups rules.UnaryClient

enableAutodownsampling bool
enableQueryPartialResponse bool
Expand All @@ -82,7 +83,7 @@ type QueryAPI struct {
func NewQueryAPI(
logger log.Logger,
storeSet *query.StoreSet,
qe *promql.Engine,
qe func(int64) *promql.Engine,
c query.QueryableCreator,
ruleGroups rules.UnaryClient,
enableAutodownsampling bool,
Expand Down Expand Up @@ -265,11 +266,13 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
return nil, nil, apiErr
}

qe := qapi.queryEngine(maxSourceResolution)

// We are starting promQL tracing span here, because we have no control over promQL code.
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := qapi.queryEngine.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts)
qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
Expand Down Expand Up @@ -370,11 +373,13 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr
}

qe := qapi.queryEngine(maxSourceResolution)

// We are starting promQL tracing span here, because we have no control over promQL code.
span, ctx := tracing.StartSpan(ctx, "promql_range_query")
defer span.Finish()

qry, err := qapi.queryEngine.NewRangeQuery(
qry, err := qe.NewRangeQuery(
qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false),
r.FormValue("query"),
start,
Expand Down
39 changes: 21 additions & 18 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,20 @@ func TestQueryEndpoints(t *testing.T) {

now := time.Now()
timeout := 100 * time.Second
qe := promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
})
api := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryEngine: promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}),
queryEngine: func(int64) *promql.Engine {
return qe
},
gate: gate.New(nil, 4),
}

Expand Down Expand Up @@ -657,30 +660,30 @@ func TestMetadataEndpoints(t *testing.T) {

now := time.Now()
timeout := 100 * time.Second
qe := promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
})
api := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryEngine: promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}),
queryEngine: func(int64) *promql.Engine {
return qe
},
gate: gate.New(nil, 4),
}
apiWithLabelLookback := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryEngine: promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}),
queryEngine: func(int64) *promql.Engine {
return qe
},
gate: gate.New(nil, 4),
defaultMetadataTimeRange: apiLookbackDelta,
}
Expand Down

0 comments on commit 75de5c9

Please sign in to comment.