Skip to content

Commit

Permalink
Mimir query engine: report queries rejected due to hitting the memory…
Browse files Browse the repository at this point in the history
… consumption limit in the `cortex_querier_queries_rejected_total` metric (#8303)

* Report queries rejected due to hitting the memory consumption limit in the `cortex_querier_queries_rejected_total` metric

* Update changelog entry

* Fix tests

* Add test to ensure metric behaves as expected with multiple queries to a single engine
  • Loading branch information
charleskorn committed Jun 7, 2024
1 parent 1fcecee commit 7adf4e1
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 53 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=mimir`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 #8270 #8276 #8277 #8291
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=mimir`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 #8270 #8276 #8277 #8291 #8303
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
eng = promql.NewEngine(opts)
case mimirPromQLEngine:
limitsProvider := &tenantQueryLimitsProvider{limits: limits}
streamingEngine, err := streamingpromql.NewEngine(opts, limitsProvider, logger)
streamingEngine, err := streamingpromql.NewEngine(opts, limitsProvider, queryMetrics, logger)
if err != nil {
return nil, nil, nil, err
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/querier/stats/query_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
)

const (
RejectReasonMaxSeries = "max-fetched-series-per-query"
RejectReasonMaxChunkBytes = "max-fetched-chunk-bytes-per-query"
RejectReasonMaxChunks = "max-fetched-chunks-per-query"
RejectReasonMaxEstimatedChunks = "max-estimated-fetched-chunks-per-query"
RejectReasonMaxSeries = "max-fetched-series-per-query"
RejectReasonMaxChunkBytes = "max-fetched-chunk-bytes-per-query"
RejectReasonMaxChunks = "max-fetched-chunks-per-query"
RejectReasonMaxEstimatedChunks = "max-estimated-fetched-chunks-per-query"
RejectReasonMaxEstimatedQueryMemoryConsumption = "max-estimated-memory-consumption-per-query"
)

var (
rejectReasons = []string{RejectReasonMaxSeries, RejectReasonMaxChunkBytes, RejectReasonMaxChunks, RejectReasonMaxEstimatedChunks}
rejectReasons = []string{RejectReasonMaxSeries, RejectReasonMaxChunkBytes, RejectReasonMaxChunks, RejectReasonMaxEstimatedChunks, RejectReasonMaxEstimatedQueryMemoryConsumption}
)

// QueryMetrics collects metrics on the number of chunks used while serving queries.
Expand Down
6 changes: 3 additions & 3 deletions pkg/streamingpromql/benchmarks/comparison_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func BenchmarkQuery(b *testing.B) {

opts := streamingpromql.NewTestEngineOpts()
prometheusEngine := promql.NewEngine(opts)
mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), log.NewNopLogger())
mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(b, err)

// Important: the names below must remain in sync with the names used in tools/benchmark-query-engine.
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestBothEnginesReturnSameResultsForBenchmarkQueries(t *testing.T) {

opts := streamingpromql.NewTestEngineOpts()
prometheusEngine := promql.NewEngine(opts)
mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), log.NewNopLogger())
mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

ctx := user.InjectOrgID(context.Background(), UserID)
Expand All @@ -123,7 +123,7 @@ func TestBenchmarkSetup(t *testing.T) {
q := createBenchmarkQueryable(t, []int{1})

opts := streamingpromql.NewTestEngineOpts()
mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), log.NewNopLogger())
mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

ctx := user.InjectOrgID(context.Background(), UserID)
Expand Down
10 changes: 7 additions & 3 deletions pkg/streamingpromql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"

"github.com/grafana/mimir/pkg/querier/stats"
)

const defaultLookbackDelta = 5 * time.Minute // This should be the same value as github.com/prometheus/prometheus/promql.defaultLookbackDelta.

func NewEngine(opts promql.EngineOpts, limitsProvider QueryLimitsProvider, logger log.Logger) (promql.QueryEngine, error) {
func NewEngine(opts promql.EngineOpts, limitsProvider QueryLimitsProvider, metrics *stats.QueryMetrics, logger log.Logger) (promql.QueryEngine, error) {
lookbackDelta := opts.LookbackDelta
if lookbackDelta == 0 {
lookbackDelta = defaultLookbackDelta
Expand Down Expand Up @@ -50,6 +52,7 @@ func NewEngine(opts promql.EngineOpts, limitsProvider QueryLimitsProvider, logge
Help: "Estimated peak memory consumption of each query (in bytes)",
NativeHistogramBucketFactor: 1.1,
}),
queriesRejectedDueToPeakMemoryConsumption: metrics.QueriesRejectedTotal.WithLabelValues(stats.RejectReasonMaxEstimatedQueryMemoryConsumption),
}, nil
}

Expand All @@ -59,8 +62,9 @@ type Engine struct {
limitsProvider QueryLimitsProvider
activeQueryTracker promql.QueryTracker

logger log.Logger
estimatedPeakMemoryConsumption prometheus.Histogram
logger log.Logger
estimatedPeakMemoryConsumption prometheus.Histogram
queriesRejectedDueToPeakMemoryConsumption prometheus.Counter
}

func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
Expand Down
92 changes: 79 additions & 13 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"io"
"io/fs"
"os"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
Expand All @@ -26,14 +28,15 @@ import (
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go"

"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/streamingpromql/compat"
"github.com/grafana/mimir/pkg/streamingpromql/pooling"
"github.com/grafana/mimir/pkg/util/globalerror"
)

func TestUnsupportedPromQLFeatures(t *testing.T) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger())
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)
ctx := context.Background()

Expand Down Expand Up @@ -93,7 +96,7 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {

func TestNewRangeQuery_InvalidQueryTime(t *testing.T) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger())
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)
ctx := context.Background()

Expand All @@ -107,7 +110,7 @@ func TestNewRangeQuery_InvalidQueryTime(t *testing.T) {

func TestNewRangeQuery_InvalidExpressionTypes(t *testing.T) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger())
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)
ctx := context.Background()

Expand All @@ -123,7 +126,7 @@ func TestNewRangeQuery_InvalidExpressionTypes(t *testing.T) {
// Once the streaming engine supports all PromQL features exercised by Prometheus' test cases, we can remove these files and instead call promql.RunBuiltinTests here instead.
func TestUpstreamTestCases(t *testing.T) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger())
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

testdataFS := os.DirFS("./testdata")
Expand All @@ -146,7 +149,7 @@ func TestUpstreamTestCases(t *testing.T) {

func TestOurTestCases(t *testing.T) {
opts := NewTestEngineOpts()
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger())
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

prometheusEngine := promql.NewEngine(opts)
Expand Down Expand Up @@ -184,7 +187,7 @@ func TestOurTestCases(t *testing.T) {
// So instead, we test these few cases here instead.
func TestRangeVectorSelectors(t *testing.T) {
opts := NewTestEngineOpts()
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger())
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

prometheusEngine := promql.NewEngine(opts)
Expand Down Expand Up @@ -297,7 +300,7 @@ func TestRangeVectorSelectors(t *testing.T) {

func TestQueryCancellation(t *testing.T) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger())
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

// Simulate the query being cancelled by another goroutine by waiting for the Select() call to be made,
Expand Down Expand Up @@ -325,7 +328,7 @@ func TestQueryCancellation(t *testing.T) {
func TestQueryTimeout(t *testing.T) {
opts := NewTestEngineOpts()
opts.Timeout = 20 * time.Millisecond
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger())
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

// Simulate the query doing some work and check that the query context has been cancelled.
Expand Down Expand Up @@ -391,7 +394,7 @@ func (w cancellationQuerier) waitForCancellation(ctx context.Context) error {

func TestQueryContextCancelledOnceQueryFinished(t *testing.T) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger())
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

storage := promqltest.LoadedStorage(t, `
Expand Down Expand Up @@ -459,7 +462,7 @@ func (q *contextCapturingQuerier) Close() error {
return q.inner.Close()
}

func TestMemoryConsumptionLimit(t *testing.T) {
func TestMemoryConsumptionLimit_SingleQueries(t *testing.T) {
storage := promqltest.LoadedStorage(t, `
load 1m
some_metric{idx="1"} 0+1x5
Expand Down Expand Up @@ -553,7 +556,7 @@ func TestMemoryConsumptionLimit(t *testing.T) {
opts := NewTestEngineOpts()
opts.Reg = reg

engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(limit), log.NewNopLogger())
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(limit), stats.NewQueryMetrics(reg), log.NewNopLogger())
require.NoError(t, err)

tracer, closer := jaeger.NewTracer("test", jaeger.NewConstSampler(true), jaeger.NewNullReporter())
Expand Down Expand Up @@ -607,8 +610,10 @@ func TestMemoryConsumptionLimit(t *testing.T) {

if testCase.shouldSucceed {
require.NoError(t, res.Err)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(0)), "cortex_querier_queries_rejected_total"))
} else {
require.ErrorContains(t, res.Err, globalerror.MaxEstimatedMemoryConsumptionPerQuery.Error())
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(1)), "cortex_querier_queries_rejected_total"))
}

assertEstimatedPeakMemoryConsumption(t, reg, span, expectedPeakMemoryConsumption)
Expand All @@ -618,6 +623,67 @@ func TestMemoryConsumptionLimit(t *testing.T) {
}
}

func TestMemoryConsumptionLimit_MultipleQueries(t *testing.T) {
storage := promqltest.LoadedStorage(t, `
load 1m
some_metric{idx="1"} 0+1x5
some_metric{idx="2"} 0+1x5
some_metric{idx="3"} 0+1x5
some_metric{idx="4"} 0+1x5
some_metric{idx="5"} 0+1x5
`)
t.Cleanup(func() { require.NoError(t, storage.Close()) })

reg := prometheus.NewPedanticRegistry()
opts := NewTestEngineOpts()
opts.Reg = reg

limit := 3 * 8 * pooling.FPointSize // Allow up to three series with five points (which will be rounded up to 8, the nearest power of 2)
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(limit), stats.NewQueryMetrics(reg), log.NewNopLogger())
require.NoError(t, err)

runQuery := func(expr string, shouldSucceed bool) {
q, err := engine.NewRangeQuery(context.Background(), storage, nil, expr, timestamp.Time(0), timestamp.Time(0).Add(4*time.Minute), time.Minute)
require.NoError(t, err)
defer q.Close()

res := q.Exec(context.Background())

if shouldSucceed {
require.NoError(t, res.Err)
} else {
require.ErrorContains(t, res.Err, globalerror.MaxEstimatedMemoryConsumptionPerQuery.Error())
}
}

runQuery(`some_metric{idx=~"1"}`, true)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(0)), "cortex_querier_queries_rejected_total"))

runQuery(`some_metric{idx=~"1|2|3"}`, true)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(0)), "cortex_querier_queries_rejected_total"))

runQuery(`some_metric{idx=~"1|2|3|4"}`, false)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(1)), "cortex_querier_queries_rejected_total"))

runQuery(`some_metric{idx=~"1|2|3|4"}`, false)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(2)), "cortex_querier_queries_rejected_total"))

runQuery(`some_metric{idx=~"1|2|3"}`, true)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(2)), "cortex_querier_queries_rejected_total"))
}

func rejectedMetrics(rejectedDueToMemoryConsumption int) string {
return fmt.Sprintf(`
# HELP cortex_querier_queries_rejected_total Number of queries that were rejected, for example because they exceeded a limit.
# TYPE cortex_querier_queries_rejected_total counter
cortex_querier_queries_rejected_total{reason="max-estimated-fetched-chunks-per-query"} 0
cortex_querier_queries_rejected_total{reason="max-estimated-memory-consumption-per-query"} %v
cortex_querier_queries_rejected_total{reason="max-fetched-chunk-bytes-per-query"} 0
cortex_querier_queries_rejected_total{reason="max-fetched-chunks-per-query"} 0
cortex_querier_queries_rejected_total{reason="max-fetched-series-per-query"} 0
`, rejectedDueToMemoryConsumption)
}

func getHistogram(t *testing.T, reg *prometheus.Registry, name string) *dto.Histogram {
metrics, err := reg.Gather()
require.NoError(t, err)
Expand All @@ -640,7 +706,7 @@ func TestActiveQueryTracker(t *testing.T) {
opts := NewTestEngineOpts()
tracker := &testQueryTracker{}
opts.ActiveQueryTracker = tracker
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger())
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

innerStorage := promqltest.LoadedStorage(t, "")
Expand Down Expand Up @@ -747,7 +813,7 @@ func TestActiveQueryTracker_WaitingForTrackerIncludesQueryTimeout(t *testing.T)
opts := NewTestEngineOpts()
opts.Timeout = 10 * time.Millisecond
opts.ActiveQueryTracker = tracker
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger())
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

queryTypes := map[string]func() (promql.Query, error){
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operators/binary_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func TestBinaryOperation_SeriesMerging(t *testing.T) {
On: true,
MatchingLabels: []string{"env"},
},
Pool: pooling.NewLimitingPool(0),
Pool: pooling.NewLimitingPool(0, nil),
}

result, err := o.mergeOneSide(testCase.input, testCase.sourceSeriesIndices, testCase.sourceSeriesMetadata, "right")
Expand Down Expand Up @@ -526,7 +526,7 @@ func TestBinaryOperationSeriesBuffer(t *testing.T) {
}

seriesUsed := []bool{true, false, true, true, true}
buffer := newBinaryOperationSeriesBuffer(inner, seriesUsed, pooling.NewLimitingPool(0))
buffer := newBinaryOperationSeriesBuffer(inner, seriesUsed, pooling.NewLimitingPool(0, nil))
ctx := context.Background()

// Read first series.
Expand Down
14 changes: 13 additions & 1 deletion pkg/streamingpromql/pooling/limiting_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package pooling
import (
"unsafe"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/types"
Expand Down Expand Up @@ -61,11 +62,16 @@ type LimitingPool struct {
MaxEstimatedMemoryConsumptionBytes uint64
CurrentEstimatedMemoryConsumptionBytes uint64
PeakEstimatedMemoryConsumptionBytes uint64

rejectionCount prometheus.Counter
haveRecordedRejection bool
}

func NewLimitingPool(maxEstimatedMemoryConsumptionBytes uint64) *LimitingPool {
func NewLimitingPool(maxEstimatedMemoryConsumptionBytes uint64, rejectionCount prometheus.Counter) *LimitingPool {
return &LimitingPool{
MaxEstimatedMemoryConsumptionBytes: maxEstimatedMemoryConsumptionBytes,

rejectionCount: rejectionCount,
}
}

Expand All @@ -84,6 +90,12 @@ func getWithElementSize[E any, S ~[]E](p *LimitingPool, pool *pool.BucketedPool[

if p.MaxEstimatedMemoryConsumptionBytes > 0 && p.CurrentEstimatedMemoryConsumptionBytes+estimatedBytes > p.MaxEstimatedMemoryConsumptionBytes {
pool.Put(s)

if !p.haveRecordedRejection {
p.haveRecordedRejection = true
p.rejectionCount.Inc()
}

return nil, limiter.NewMaxEstimatedMemoryConsumptionPerQueryLimitError(p.MaxEstimatedMemoryConsumptionBytes)
}

Expand Down
Loading

0 comments on commit 7adf4e1

Please sign in to comment.