diff --git a/CHANGELOG.md b/CHANGELOG.md index 4de16749af6..f4a773c3d8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,7 @@ * [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747 * [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653 * [FEATURE] New `-.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 * [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739 * [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698 * [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123 diff --git a/pkg/streamingpromql/engine.go b/pkg/streamingpromql/engine.go index d3e7c455d73..78a7bf5d41c 100644 --- a/pkg/streamingpromql/engine.go +++ b/pkg/streamingpromql/engine.go @@ -37,11 +37,13 @@ func NewEngine(opts promql.EngineOpts) (promql.QueryEngine, error) { return &Engine{ lookbackDelta: lookbackDelta, + timeout: opts.Timeout, }, nil } type Engine struct { lookbackDelta time.Duration + timeout time.Duration } func (e *Engine) NewInstantQuery(_ context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 9b3f342313a..550cfb33ba1 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -4,6 +4,7 @@ package streamingpromql import ( "context" + "errors" "io" "io/fs" "os" @@ -14,6 +15,8 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/promqltest" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" "github.com/stretchr/testify/require" "github.com/grafana/mimir/pkg/streamingpromql/compat" @@ -282,3 +285,167 @@ func TestRangeVectorSelectors(t *testing.T) { }) } } + +func TestQueryCancellation(t *testing.T) { + opts := NewTestEngineOpts() + engine, err := NewEngine(opts) + require.NoError(t, err) + + // Simulate the query being cancelled by another goroutine by waiting for the Select() call to be made, + // then cancel the query and wait for the query context to be cancelled. + // + // In both this test and production, we rely on the underlying storage responding to the context cancellation - + // we don't explicitly check for context cancellation in the query engine. + var q promql.Query + queryable := cancellationQueryable{func() { + q.Cancel() + }} + + q, err = engine.NewInstantQuery(context.Background(), queryable, nil, "some_metric", timestamp.Time(0)) + require.NoError(t, err) + defer q.Close() + + res := q.Exec(context.Background()) + + require.Error(t, res.Err) + require.ErrorIs(t, res.Err, context.Canceled) + require.EqualError(t, res.Err, "context canceled: query execution cancelled") + require.Nil(t, res.Value) +} + +func TestQueryTimeout(t *testing.T) { + opts := NewTestEngineOpts() + opts.Timeout = 20 * time.Millisecond + engine, err := NewEngine(opts) + require.NoError(t, err) + + // Simulate the query doing some work and check that the query context has been cancelled. + // + // In both this test and production, we rely on the underlying storage responding to the context cancellation - + // we don't explicitly check for context cancellation in the query engine. + var q promql.Query + queryable := cancellationQueryable{func() { + time.Sleep(opts.Timeout * 10) + }} + + q, err = engine.NewInstantQuery(context.Background(), queryable, nil, "some_metric", timestamp.Time(0)) + require.NoError(t, err) + defer q.Close() + + res := q.Exec(context.Background()) + + require.Error(t, res.Err) + require.ErrorIs(t, res.Err, context.DeadlineExceeded) + require.EqualError(t, res.Err, "context deadline exceeded: query timed out") + require.Nil(t, res.Value) +} + +type cancellationQueryable struct { + onQueried func() +} + +func (w cancellationQueryable) Querier(_, _ int64) (storage.Querier, error) { + // nolint:gosimple + return cancellationQuerier{onQueried: w.onQueried}, nil +} + +type cancellationQuerier struct { + onQueried func() +} + +func (w cancellationQuerier) LabelValues(ctx context.Context, _ string, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, w.waitForCancellation(ctx) +} + +func (w cancellationQuerier) LabelNames(ctx context.Context, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, w.waitForCancellation(ctx) +} + +func (w cancellationQuerier) Select(ctx context.Context, _ bool, _ *storage.SelectHints, _ ...*labels.Matcher) storage.SeriesSet { + return storage.ErrSeriesSet(w.waitForCancellation(ctx)) +} + +func (w cancellationQuerier) Close() error { + return nil +} + +func (w cancellationQuerier) waitForCancellation(ctx context.Context) error { + w.onQueried() + + select { + case <-ctx.Done(): + return context.Cause(ctx) + case <-time.After(time.Second): + return errors.New("expected query context to be cancelled after 1 second, but it was not") + } +} + +func TestQueryContextCancelledOnceQueryFinished(t *testing.T) { + opts := NewTestEngineOpts() + engine, err := NewEngine(opts) + require.NoError(t, err) + + storage := promqltest.LoadedStorage(t, ` + load 1m + some_metric 0+1x4 + `) + t.Cleanup(func() { require.NoError(t, storage.Close()) }) + + queryable := &contextCapturingQueryable{inner: storage} + + q, err := engine.NewInstantQuery(context.Background(), queryable, nil, "some_metric", timestamp.Time(0)) + require.NoError(t, err) + defer q.Close() + + res := q.Exec(context.Background()) + require.NoError(t, res.Err) + require.NotNil(t, res.Value) + + contextErr := queryable.capturedContext.Err() + require.Equal(t, context.Canceled, contextErr) + + contextCause := context.Cause(queryable.capturedContext) + require.ErrorIs(t, contextCause, context.Canceled) + require.EqualError(t, contextCause, "context canceled: query execution finished") +} + +type contextCapturingQueryable struct { + capturedContext context.Context + inner storage.Queryable +} + +func (q *contextCapturingQueryable) Querier(mint, maxt int64) (storage.Querier, error) { + innerQuerier, err := q.inner.Querier(mint, maxt) + if err != nil { + return nil, err + } + + return &contextCapturingQuerier{ + queryable: q, + inner: innerQuerier, + }, nil +} + +type contextCapturingQuerier struct { + queryable *contextCapturingQueryable + inner storage.Querier +} + +func (q *contextCapturingQuerier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + q.queryable.capturedContext = ctx + return q.inner.LabelValues(ctx, name, matchers...) +} + +func (q *contextCapturingQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + q.queryable.capturedContext = ctx + return q.inner.LabelNames(ctx, matchers...) +} + +func (q *contextCapturingQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + q.queryable.capturedContext = ctx + return q.inner.Select(ctx, sortSeries, hints, matchers...) +} + +func (q *contextCapturingQuerier) Close() error { + return q.inner.Close() +} diff --git a/pkg/streamingpromql/operator/instant_vector_selector.go b/pkg/streamingpromql/operator/instant_vector_selector.go index d4903272bb4..63b9ec25791 100644 --- a/pkg/streamingpromql/operator/instant_vector_selector.go +++ b/pkg/streamingpromql/operator/instant_vector_selector.go @@ -34,13 +34,13 @@ func (v *InstantVectorSelector) SeriesMetadata(ctx context.Context) ([]SeriesMet return v.Selector.SeriesMetadata(ctx) } -func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeriesData, error) { +func (v *InstantVectorSelector) NextSeries(ctx context.Context) (InstantVectorSeriesData, error) { if v.memoizedIterator == nil { v.memoizedIterator = storage.NewMemoizedEmptyIterator(v.Selector.LookbackDelta.Milliseconds()) } var err error - v.chunkIterator, err = v.Selector.Next(v.chunkIterator) + v.chunkIterator, err = v.Selector.Next(ctx, v.chunkIterator) if err != nil { return InstantVectorSeriesData{}, err } diff --git a/pkg/streamingpromql/operator/range_vector_selector.go b/pkg/streamingpromql/operator/range_vector_selector.go index 9d80eba9097..9285660dbc0 100644 --- a/pkg/streamingpromql/operator/range_vector_selector.go +++ b/pkg/streamingpromql/operator/range_vector_selector.go @@ -43,9 +43,9 @@ func (m *RangeVectorSelector) Range() time.Duration { return m.Selector.Range } -func (m *RangeVectorSelector) NextSeries(_ context.Context) error { +func (m *RangeVectorSelector) NextSeries(ctx context.Context) error { var err error - m.chunkIterator, err = m.Selector.Next(m.chunkIterator) + m.chunkIterator, err = m.Selector.Next(ctx, m.chunkIterator) if err != nil { return err } diff --git a/pkg/streamingpromql/operator/selector.go b/pkg/streamingpromql/operator/selector.go index 8225daa0992..4d306785397 100644 --- a/pkg/streamingpromql/operator/selector.go +++ b/pkg/streamingpromql/operator/selector.go @@ -29,6 +29,8 @@ type Selector struct { querier storage.Querier series *seriesList + + seriesIdx int } func (s *Selector) SeriesMetadata(ctx context.Context) ([]SeriesMetadata, error) { @@ -83,11 +85,20 @@ func (s *Selector) SeriesMetadata(ctx context.Context) ([]SeriesMetadata, error) return s.series.ToSeriesMetadata(), ss.Err() } -func (s *Selector) Next(existing chunkenc.Iterator) (chunkenc.Iterator, error) { +func (s *Selector) Next(ctx context.Context, existing chunkenc.Iterator) (chunkenc.Iterator, error) { if s.series.Len() == 0 { return nil, EOS } + s.seriesIdx++ + + // Only check for cancellation every 128 series. This avoids a (relatively) expensive check on every iteration, but aborts + // queries quickly enough when cancelled. + // See https://github.com/prometheus/prometheus/pull/14118 for more explanation of why we use 128 (rather than say 100). + if s.seriesIdx%128 == 0 && ctx.Err() != nil { + return nil, context.Cause(ctx) + } + return s.series.Pop().Iterator(existing), nil } diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 93f424049b9..e2a7623e0c3 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -11,6 +11,7 @@ import ( "fmt" "time" + "github.com/grafana/dskit/cancellation" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" @@ -23,6 +24,10 @@ import ( "github.com/grafana/mimir/pkg/streamingpromql/operator" ) +var errQueryCancelled = cancellation.NewErrorf("query execution cancelled") +var errQueryClosed = cancellation.NewErrorf("Query.Close() called") +var errQueryFinished = cancellation.NewErrorf("query execution finished") + type Query struct { queryable storage.Queryable opts promql.QueryOpts @@ -30,6 +35,7 @@ type Query struct { root operator.Operator engine *Engine qs string + cancel context.CancelCauseFunc result *promql.Result } @@ -240,6 +246,20 @@ func (q *Query) IsInstant() bool { func (q *Query) Exec(ctx context.Context) *promql.Result { defer q.root.Close() + ctx, cancel := context.WithCancelCause(ctx) + q.cancel = cancel + + if q.engine.timeout != 0 { + var cancelTimeoutCtx context.CancelFunc + ctx, cancelTimeoutCtx = context.WithTimeoutCause(ctx, q.engine.timeout, fmt.Errorf("%w: query timed out", context.DeadlineExceeded)) + + defer cancelTimeoutCtx() + } + + // The order of the deferred cancellations is important: we want to cancel with errQueryFinished first, so we must defer this cancellation last + // (so that it runs before the cancellation of the context with timeout created above). + defer cancel(errQueryFinished) + series, err := q.root.SeriesMetadata(ctx) if err != nil { return &promql.Result{Err: err} @@ -392,6 +412,10 @@ func (q *Query) populateMatrixFromRangeVectorOperator(ctx context.Context, o ope } func (q *Query) Close() { + if q.cancel != nil { + q.cancel(errQueryClosed) + } + if q.result == nil { return } @@ -421,7 +445,9 @@ func (q *Query) Stats() *stats.Statistics { } func (q *Query) Cancel() { - // Not yet supported. + if q.cancel != nil { + q.cancel(errQueryCancelled) + } } func (q *Query) String() string {