Skip to content

Commit

Permalink
Mimir query engine: track active queries (#8277)
Browse files Browse the repository at this point in the history
* Mimir query engine: track active queries

* Add changelog entry

* Refactor test to make it clearer

* Refactor test to reduce duplication

* Refactor TestMemoryConsumptionLimit to follow same pattern

* Ensure that call to active query tracker is done with a context that respects the configured query timeout

* Fix linting issues

* Fix issue in test 🤦
  • Loading branch information
charleskorn committed Jun 6, 2024
1 parent 5d557e7 commit d29d1f1
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 37 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 #8277
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123
Expand Down
14 changes: 8 additions & 6 deletions pkg/streamingpromql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@ func NewEngine(opts promql.EngineOpts, limitsProvider QueryLimitsProvider) (prom
}

return &Engine{
lookbackDelta: lookbackDelta,
timeout: opts.Timeout,
limitsProvider: limitsProvider,
lookbackDelta: lookbackDelta,
timeout: opts.Timeout,
limitsProvider: limitsProvider,
activeQueryTracker: opts.ActiveQueryTracker,
}, nil
}

type Engine struct {
lookbackDelta time.Duration
timeout time.Duration
limitsProvider QueryLimitsProvider
lookbackDelta time.Duration
timeout time.Duration
limitsProvider QueryLimitsProvider
activeQueryTracker promql.QueryTracker
}

func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
Expand Down
229 changes: 199 additions & 30 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package streamingpromql
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
Expand Down Expand Up @@ -519,43 +520,211 @@ func TestMemoryConsumptionLimit(t *testing.T) {

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
t.Run("range query", func(t *testing.T) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(testCase.rangeQueryLimit))
require.NoError(t, err)
queryTypes := map[string]func() (promql.Query, error){
"range query": func() (promql.Query, error) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(testCase.rangeQueryLimit))
if err != nil {
return nil, err
}

start := timestamp.Time(0)
return engine.NewRangeQuery(ctx, storage, nil, testCase.expr, start, start.Add(4*time.Minute), time.Minute)
},
"instant query": func() (promql.Query, error) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(testCase.instantQueryLimit))
if err != nil {
return nil, err
}

start := timestamp.Time(0)
return engine.NewInstantQuery(ctx, storage, nil, testCase.expr, start)
},
}

start := timestamp.Time(0)
q, err := engine.NewRangeQuery(ctx, storage, nil, testCase.expr, start, start.Add(4*time.Minute), time.Minute)
require.NoError(t, err)
defer q.Close()
for queryType, createQuery := range queryTypes {
t.Run(queryType, func(t *testing.T) {
q, err := createQuery()
require.NoError(t, err)
t.Cleanup(q.Close)

res := q.Exec(ctx)
res := q.Exec(ctx)

if testCase.shouldSucceed {
require.NoError(t, res.Err)
} else {
require.ErrorContains(t, res.Err, globalerror.MaxEstimatedMemoryConsumptionPerQuery.Error())
}
})
if testCase.shouldSucceed {
require.NoError(t, res.Err)
} else {
require.ErrorContains(t, res.Err, globalerror.MaxEstimatedMemoryConsumptionPerQuery.Error())
}
})
}
})
}
}

t.Run("instant query", func(t *testing.T) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(testCase.instantQueryLimit))
require.NoError(t, err)
func TestActiveQueryTracker(t *testing.T) {
for _, shouldSucceed := range []bool{true, false} {
t.Run(fmt.Sprintf("successful query = %v", shouldSucceed), func(t *testing.T) {
opts := NewTestEngineOpts()
tracker := &testQueryTracker{}
opts.ActiveQueryTracker = tracker
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0))
require.NoError(t, err)

start := timestamp.Time(0)
q, err := engine.NewInstantQuery(ctx, storage, nil, testCase.expr, start)
require.NoError(t, err)
defer q.Close()
innerStorage := promqltest.LoadedStorage(t, "")
t.Cleanup(func() { require.NoError(t, innerStorage.Close()) })

res := q.Exec(ctx)
// Use a fake queryable as a way to check that the query is recorded as active while the query is in progress.
queryTrackingTestingQueryable := &activeQueryTrackerQueryable{
innerStorage: innerStorage,
tracker: tracker,
}

if testCase.shouldSucceed {
require.NoError(t, res.Err)
} else {
require.ErrorContains(t, res.Err, globalerror.MaxEstimatedMemoryConsumptionPerQuery.Error())
}
})
if !shouldSucceed {
queryTrackingTestingQueryable.err = errors.New("something went wrong inside the query")
}

queryTypes := map[string]func(expr string) (promql.Query, error){
"range": func(expr string) (promql.Query, error) {
return engine.NewRangeQuery(context.Background(), queryTrackingTestingQueryable, nil, expr, timestamp.Time(0), timestamp.Time(0).Add(time.Hour), time.Minute)
},
"instant": func(expr string) (promql.Query, error) {
return engine.NewInstantQuery(context.Background(), queryTrackingTestingQueryable, nil, expr, timestamp.Time(0))
},
}

for queryType, createQuery := range queryTypes {
t.Run(queryType+" query", func(t *testing.T) {
expr := "test_" + queryType + "_query"
queryTrackingTestingQueryable.activeQueryAtQueryTime = trackedQuery{}

q, err := createQuery(expr)
require.NoError(t, err)
defer q.Close()

res := q.Exec(context.Background())

if shouldSucceed {
require.NoError(t, res.Err)
} else {
require.EqualError(t, res.Err, "something went wrong inside the query")
}

// Check that the query was active in the query tracker while the query was executing.
require.Equal(t, expr, queryTrackingTestingQueryable.activeQueryAtQueryTime.expr)
require.False(t, queryTrackingTestingQueryable.activeQueryAtQueryTime.deleted)

// Check that the query has now been marked as deleted in the query tracker.
require.NotEmpty(t, tracker.queries)
trackedQuery := tracker.queries[len(tracker.queries)-1]
require.Equal(t, expr, trackedQuery.expr)
require.Equal(t, true, trackedQuery.deleted)
})
}
})
}
}

type testQueryTracker struct {
queries []trackedQuery
}

type trackedQuery struct {
expr string
deleted bool
}

func (qt *testQueryTracker) GetMaxConcurrent() int {
return 0
}

func (qt *testQueryTracker) Insert(_ context.Context, query string) (int, error) {
qt.queries = append(qt.queries, trackedQuery{
expr: query,
deleted: false,
})

return len(qt.queries) - 1, nil
}

func (qt *testQueryTracker) Delete(insertIndex int) {
qt.queries[insertIndex].deleted = true
}

type activeQueryTrackerQueryable struct {
tracker *testQueryTracker

activeQueryAtQueryTime trackedQuery

innerStorage storage.Queryable
err error
}

func (a *activeQueryTrackerQueryable) Querier(mint, maxt int64) (storage.Querier, error) {
a.activeQueryAtQueryTime = a.tracker.queries[len(a.tracker.queries)-1]

if a.err != nil {
return nil, a.err
}

return a.innerStorage.Querier(mint, maxt)
}

func TestActiveQueryTracker_WaitingForTrackerIncludesQueryTimeout(t *testing.T) {
tracker := &timeoutTestingQueryTracker{}
opts := NewTestEngineOpts()
opts.Timeout = 10 * time.Millisecond
opts.ActiveQueryTracker = tracker
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0))
require.NoError(t, err)

queryTypes := map[string]func() (promql.Query, error){
"range": func() (promql.Query, error) {
return engine.NewRangeQuery(context.Background(), nil, nil, "some_test_query", timestamp.Time(0), timestamp.Time(0).Add(time.Hour), time.Minute)
},
"instant": func() (promql.Query, error) {
return engine.NewInstantQuery(context.Background(), nil, nil, "some_test_query", timestamp.Time(0))
},
}

for queryType, createQuery := range queryTypes {
t.Run(queryType+" query", func(t *testing.T) {
tracker.sawTimeout = false

q, err := createQuery()
require.NoError(t, err)
defer q.Close()

res := q.Exec(context.Background())

require.True(t, tracker.sawTimeout, "query tracker was not called with a context that timed out")

require.Error(t, res.Err)
require.ErrorIs(t, res.Err, context.DeadlineExceeded)
require.EqualError(t, res.Err, "context deadline exceeded: query timed out")
require.Nil(t, res.Value)
})
}
}

type timeoutTestingQueryTracker struct {
sawTimeout bool
}

func (t *timeoutTestingQueryTracker) GetMaxConcurrent() int {
return 0
}

func (t *timeoutTestingQueryTracker) Insert(ctx context.Context, _ string) (int, error) {
select {
case <-ctx.Done():
t.sawTimeout = true
return 0, context.Cause(ctx)
case <-time.After(time.Second):
return 0, errors.New("gave up waiting for query to time out")
}
}

func (t *timeoutTestingQueryTracker) Delete(_ int) {
panic("should not be called")
}
9 changes: 9 additions & 0 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,15 @@ func (q *Query) Exec(ctx context.Context) *promql.Result {
// (so that it runs before the cancellation of the context with timeout created above).
defer cancel(errQueryFinished)

if q.engine.activeQueryTracker != nil {
queryID, err := q.engine.activeQueryTracker.Insert(ctx, q.qs)
if err != nil {
return &promql.Result{Err: err}
}

defer q.engine.activeQueryTracker.Delete(queryID)
}

series, err := q.root.SeriesMetadata(ctx)
if err != nil {
return &promql.Result{Err: err}
Expand Down

0 comments on commit d29d1f1

Please sign in to comment.