Skip to content

Commit

Permalink
Mimir query engine: support query cancellation and timeout (#8197)
Browse files Browse the repository at this point in the history
* Add support for cancelling queries.

* Respect the configured query timeout.

* Update changelog entry

* Use `storage.ErrSeriesSet`

* Ensure we don't leak contexts

* Check for context cancellation in `Selector.Next`

* Add test case to ensure that query execution context is cancelled after evaluation finishes
  • Loading branch information
charleskorn committed May 30, 2024
1 parent 74628a9 commit de0f9c5
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 7 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123
Expand Down
2 changes: 2 additions & 0 deletions pkg/streamingpromql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ func NewEngine(opts promql.EngineOpts) (promql.QueryEngine, error) {

return &Engine{
lookbackDelta: lookbackDelta,
timeout: opts.Timeout,
}, nil
}

type Engine struct {
lookbackDelta time.Duration
timeout time.Duration
}

func (e *Engine) NewInstantQuery(_ context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
Expand Down
167 changes: 167 additions & 0 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package streamingpromql

import (
"context"
"errors"
"io"
"io/fs"
"os"
Expand All @@ -14,6 +15,8 @@ import (
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/streamingpromql/compat"
Expand Down Expand Up @@ -282,3 +285,167 @@ func TestRangeVectorSelectors(t *testing.T) {
})
}
}

func TestQueryCancellation(t *testing.T) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts)
require.NoError(t, err)

// Simulate the query being cancelled by another goroutine by waiting for the Select() call to be made,
// then cancel the query and wait for the query context to be cancelled.
//
// In both this test and production, we rely on the underlying storage responding to the context cancellation -
// we don't explicitly check for context cancellation in the query engine.
var q promql.Query
queryable := cancellationQueryable{func() {
q.Cancel()
}}

q, err = engine.NewInstantQuery(context.Background(), queryable, nil, "some_metric", timestamp.Time(0))
require.NoError(t, err)
defer q.Close()

res := q.Exec(context.Background())

require.Error(t, res.Err)
require.ErrorIs(t, res.Err, context.Canceled)
require.EqualError(t, res.Err, "context canceled: query execution cancelled")
require.Nil(t, res.Value)
}

func TestQueryTimeout(t *testing.T) {
opts := NewTestEngineOpts()
opts.Timeout = 20 * time.Millisecond
engine, err := NewEngine(opts)
require.NoError(t, err)

// Simulate the query doing some work and check that the query context has been cancelled.
//
// In both this test and production, we rely on the underlying storage responding to the context cancellation -
// we don't explicitly check for context cancellation in the query engine.
var q promql.Query
queryable := cancellationQueryable{func() {
time.Sleep(opts.Timeout * 10)
}}

q, err = engine.NewInstantQuery(context.Background(), queryable, nil, "some_metric", timestamp.Time(0))
require.NoError(t, err)
defer q.Close()

res := q.Exec(context.Background())

require.Error(t, res.Err)
require.ErrorIs(t, res.Err, context.DeadlineExceeded)
require.EqualError(t, res.Err, "context deadline exceeded: query timed out")
require.Nil(t, res.Value)
}

type cancellationQueryable struct {
onQueried func()
}

func (w cancellationQueryable) Querier(_, _ int64) (storage.Querier, error) {
// nolint:gosimple
return cancellationQuerier{onQueried: w.onQueried}, nil
}

type cancellationQuerier struct {
onQueried func()
}

func (w cancellationQuerier) LabelValues(ctx context.Context, _ string, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, w.waitForCancellation(ctx)
}

func (w cancellationQuerier) LabelNames(ctx context.Context, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, w.waitForCancellation(ctx)
}

func (w cancellationQuerier) Select(ctx context.Context, _ bool, _ *storage.SelectHints, _ ...*labels.Matcher) storage.SeriesSet {
return storage.ErrSeriesSet(w.waitForCancellation(ctx))
}

func (w cancellationQuerier) Close() error {
return nil
}

func (w cancellationQuerier) waitForCancellation(ctx context.Context) error {
w.onQueried()

select {
case <-ctx.Done():
return context.Cause(ctx)
case <-time.After(time.Second):
return errors.New("expected query context to be cancelled after 1 second, but it was not")
}
}

func TestQueryContextCancelledOnceQueryFinished(t *testing.T) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts)
require.NoError(t, err)

storage := promqltest.LoadedStorage(t, `
load 1m
some_metric 0+1x4
`)
t.Cleanup(func() { require.NoError(t, storage.Close()) })

queryable := &contextCapturingQueryable{inner: storage}

q, err := engine.NewInstantQuery(context.Background(), queryable, nil, "some_metric", timestamp.Time(0))
require.NoError(t, err)
defer q.Close()

res := q.Exec(context.Background())
require.NoError(t, res.Err)
require.NotNil(t, res.Value)

contextErr := queryable.capturedContext.Err()
require.Equal(t, context.Canceled, contextErr)

contextCause := context.Cause(queryable.capturedContext)
require.ErrorIs(t, contextCause, context.Canceled)
require.EqualError(t, contextCause, "context canceled: query execution finished")
}

type contextCapturingQueryable struct {
capturedContext context.Context
inner storage.Queryable
}

func (q *contextCapturingQueryable) Querier(mint, maxt int64) (storage.Querier, error) {
innerQuerier, err := q.inner.Querier(mint, maxt)
if err != nil {
return nil, err
}

return &contextCapturingQuerier{
queryable: q,
inner: innerQuerier,
}, nil
}

type contextCapturingQuerier struct {
queryable *contextCapturingQueryable
inner storage.Querier
}

func (q *contextCapturingQuerier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
q.queryable.capturedContext = ctx
return q.inner.LabelValues(ctx, name, matchers...)
}

func (q *contextCapturingQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
q.queryable.capturedContext = ctx
return q.inner.LabelNames(ctx, matchers...)
}

func (q *contextCapturingQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
q.queryable.capturedContext = ctx
return q.inner.Select(ctx, sortSeries, hints, matchers...)
}

func (q *contextCapturingQuerier) Close() error {
return q.inner.Close()
}
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operator/instant_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ func (v *InstantVectorSelector) SeriesMetadata(ctx context.Context) ([]SeriesMet
return v.Selector.SeriesMetadata(ctx)
}

func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeriesData, error) {
func (v *InstantVectorSelector) NextSeries(ctx context.Context) (InstantVectorSeriesData, error) {
if v.memoizedIterator == nil {
v.memoizedIterator = storage.NewMemoizedEmptyIterator(v.Selector.LookbackDelta.Milliseconds())
}

var err error
v.chunkIterator, err = v.Selector.Next(v.chunkIterator)
v.chunkIterator, err = v.Selector.Next(ctx, v.chunkIterator)
if err != nil {
return InstantVectorSeriesData{}, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operator/range_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func (m *RangeVectorSelector) Range() time.Duration {
return m.Selector.Range
}

func (m *RangeVectorSelector) NextSeries(_ context.Context) error {
func (m *RangeVectorSelector) NextSeries(ctx context.Context) error {
var err error
m.chunkIterator, err = m.Selector.Next(m.chunkIterator)
m.chunkIterator, err = m.Selector.Next(ctx, m.chunkIterator)
if err != nil {
return err
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/streamingpromql/operator/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Selector struct {

querier storage.Querier
series *seriesList

seriesIdx int
}

func (s *Selector) SeriesMetadata(ctx context.Context) ([]SeriesMetadata, error) {
Expand Down Expand Up @@ -83,11 +85,20 @@ func (s *Selector) SeriesMetadata(ctx context.Context) ([]SeriesMetadata, error)
return s.series.ToSeriesMetadata(), ss.Err()
}

func (s *Selector) Next(existing chunkenc.Iterator) (chunkenc.Iterator, error) {
func (s *Selector) Next(ctx context.Context, existing chunkenc.Iterator) (chunkenc.Iterator, error) {
if s.series.Len() == 0 {
return nil, EOS
}

s.seriesIdx++

// Only check for cancellation every 128 series. This avoids a (relatively) expensive check on every iteration, but aborts
// queries quickly enough when cancelled.
// See https://github.com/prometheus/prometheus/pull/14118 for more explanation of why we use 128 (rather than say 100).
if s.seriesIdx%128 == 0 && ctx.Err() != nil {
return nil, context.Cause(ctx)
}

return s.series.Pop().Iterator(existing), nil
}

Expand Down
28 changes: 27 additions & 1 deletion pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"time"

"github.com/grafana/dskit/cancellation"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
Expand All @@ -23,13 +24,18 @@ import (
"github.com/grafana/mimir/pkg/streamingpromql/operator"
)

var errQueryCancelled = cancellation.NewErrorf("query execution cancelled")
var errQueryClosed = cancellation.NewErrorf("Query.Close() called")
var errQueryFinished = cancellation.NewErrorf("query execution finished")

type Query struct {
queryable storage.Queryable
opts promql.QueryOpts
statement *parser.EvalStmt
root operator.Operator
engine *Engine
qs string
cancel context.CancelCauseFunc

result *promql.Result
}
Expand Down Expand Up @@ -240,6 +246,20 @@ func (q *Query) IsInstant() bool {
func (q *Query) Exec(ctx context.Context) *promql.Result {
defer q.root.Close()

ctx, cancel := context.WithCancelCause(ctx)
q.cancel = cancel

if q.engine.timeout != 0 {
var cancelTimeoutCtx context.CancelFunc
ctx, cancelTimeoutCtx = context.WithTimeoutCause(ctx, q.engine.timeout, fmt.Errorf("%w: query timed out", context.DeadlineExceeded))

defer cancelTimeoutCtx()
}

// The order of the deferred cancellations is important: we want to cancel with errQueryFinished first, so we must defer this cancellation last
// (so that it runs before the cancellation of the context with timeout created above).
defer cancel(errQueryFinished)

series, err := q.root.SeriesMetadata(ctx)
if err != nil {
return &promql.Result{Err: err}
Expand Down Expand Up @@ -392,6 +412,10 @@ func (q *Query) populateMatrixFromRangeVectorOperator(ctx context.Context, o ope
}

func (q *Query) Close() {
if q.cancel != nil {
q.cancel(errQueryClosed)
}

if q.result == nil {
return
}
Expand Down Expand Up @@ -421,7 +445,9 @@ func (q *Query) Stats() *stats.Statistics {
}

func (q *Query) Cancel() {
// Not yet supported.
if q.cancel != nil {
q.cancel(errQueryCancelled)
}
}

func (q *Query) String() string {
Expand Down

0 comments on commit de0f9c5

Please sign in to comment.