diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f43d93ba66..9f6adf4c6b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ * [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747 * [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653 * [FEATURE] New `-.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=mimir`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 #8270 #8276 #8277 #8291 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=mimir`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 #8270 #8276 #8277 #8291 #8303 * [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739 * [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698 * [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123 diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 0724c64edf3..eddcf07aa12 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -165,7 +165,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor eng = promql.NewEngine(opts) case mimirPromQLEngine: limitsProvider := &tenantQueryLimitsProvider{limits: limits} - streamingEngine, err := streamingpromql.NewEngine(opts, limitsProvider, logger) + streamingEngine, err := streamingpromql.NewEngine(opts, limitsProvider, queryMetrics, logger) if err != nil { return nil, nil, nil, err } diff --git a/pkg/querier/stats/query_metrics.go b/pkg/querier/stats/query_metrics.go index 0db1448fda6..a2d95f8227c 100644 --- a/pkg/querier/stats/query_metrics.go +++ b/pkg/querier/stats/query_metrics.go @@ -11,14 +11,15 @@ import ( ) const ( - RejectReasonMaxSeries = "max-fetched-series-per-query" - RejectReasonMaxChunkBytes = "max-fetched-chunk-bytes-per-query" - RejectReasonMaxChunks = "max-fetched-chunks-per-query" - RejectReasonMaxEstimatedChunks = "max-estimated-fetched-chunks-per-query" + RejectReasonMaxSeries = "max-fetched-series-per-query" + RejectReasonMaxChunkBytes = "max-fetched-chunk-bytes-per-query" + RejectReasonMaxChunks = "max-fetched-chunks-per-query" + RejectReasonMaxEstimatedChunks = "max-estimated-fetched-chunks-per-query" + RejectReasonMaxEstimatedQueryMemoryConsumption = "max-estimated-memory-consumption-per-query" ) var ( - rejectReasons = []string{RejectReasonMaxSeries, RejectReasonMaxChunkBytes, RejectReasonMaxChunks, RejectReasonMaxEstimatedChunks} + rejectReasons = []string{RejectReasonMaxSeries, RejectReasonMaxChunkBytes, RejectReasonMaxChunks, RejectReasonMaxEstimatedChunks, RejectReasonMaxEstimatedQueryMemoryConsumption} ) // QueryMetrics collects metrics on the number of chunks used while serving queries. diff --git a/pkg/streamingpromql/benchmarks/comparison_test.go b/pkg/streamingpromql/benchmarks/comparison_test.go index e9d1418d803..144a4294f2f 100644 --- a/pkg/streamingpromql/benchmarks/comparison_test.go +++ b/pkg/streamingpromql/benchmarks/comparison_test.go @@ -44,7 +44,7 @@ func BenchmarkQuery(b *testing.B) { opts := streamingpromql.NewTestEngineOpts() prometheusEngine := promql.NewEngine(opts) - mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), log.NewNopLogger()) + mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) require.NoError(b, err) // Important: the names below must remain in sync with the names used in tools/benchmark-query-engine. @@ -96,7 +96,7 @@ func TestBothEnginesReturnSameResultsForBenchmarkQueries(t *testing.T) { opts := streamingpromql.NewTestEngineOpts() prometheusEngine := promql.NewEngine(opts) - mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), log.NewNopLogger()) + mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) require.NoError(t, err) ctx := user.InjectOrgID(context.Background(), UserID) @@ -123,7 +123,7 @@ func TestBenchmarkSetup(t *testing.T) { q := createBenchmarkQueryable(t, []int{1}) opts := streamingpromql.NewTestEngineOpts() - mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), log.NewNopLogger()) + mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) require.NoError(t, err) ctx := user.InjectOrgID(context.Background(), UserID) diff --git a/pkg/streamingpromql/engine.go b/pkg/streamingpromql/engine.go index ad35d922c90..9786e518e0f 100644 --- a/pkg/streamingpromql/engine.go +++ b/pkg/streamingpromql/engine.go @@ -16,11 +16,13 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" + + "github.com/grafana/mimir/pkg/querier/stats" ) const defaultLookbackDelta = 5 * time.Minute // This should be the same value as github.com/prometheus/prometheus/promql.defaultLookbackDelta. -func NewEngine(opts promql.EngineOpts, limitsProvider QueryLimitsProvider, logger log.Logger) (promql.QueryEngine, error) { +func NewEngine(opts promql.EngineOpts, limitsProvider QueryLimitsProvider, metrics *stats.QueryMetrics, logger log.Logger) (promql.QueryEngine, error) { lookbackDelta := opts.LookbackDelta if lookbackDelta == 0 { lookbackDelta = defaultLookbackDelta @@ -50,6 +52,7 @@ func NewEngine(opts promql.EngineOpts, limitsProvider QueryLimitsProvider, logge Help: "Estimated peak memory consumption of each query (in bytes)", NativeHistogramBucketFactor: 1.1, }), + queriesRejectedDueToPeakMemoryConsumption: metrics.QueriesRejectedTotal.WithLabelValues(stats.RejectReasonMaxEstimatedQueryMemoryConsumption), }, nil } @@ -59,8 +62,9 @@ type Engine struct { limitsProvider QueryLimitsProvider activeQueryTracker promql.QueryTracker - logger log.Logger - estimatedPeakMemoryConsumption prometheus.Histogram + logger log.Logger + estimatedPeakMemoryConsumption prometheus.Histogram + queriesRejectedDueToPeakMemoryConsumption prometheus.Counter } func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index ac965aca31c..70735231105 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -9,6 +9,7 @@ import ( "io" "io/fs" "os" + "strings" "testing" "time" @@ -16,6 +17,7 @@ import ( "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" @@ -26,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "github.com/uber/jaeger-client-go" + "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/streamingpromql/pooling" "github.com/grafana/mimir/pkg/util/globalerror" @@ -33,7 +36,7 @@ import ( func TestUnsupportedPromQLFeatures(t *testing.T) { opts := NewTestEngineOpts() - engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger()) + engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) require.NoError(t, err) ctx := context.Background() @@ -93,7 +96,7 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { func TestNewRangeQuery_InvalidQueryTime(t *testing.T) { opts := NewTestEngineOpts() - engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger()) + engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) require.NoError(t, err) ctx := context.Background() @@ -107,7 +110,7 @@ func TestNewRangeQuery_InvalidQueryTime(t *testing.T) { func TestNewRangeQuery_InvalidExpressionTypes(t *testing.T) { opts := NewTestEngineOpts() - engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger()) + engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) require.NoError(t, err) ctx := context.Background() @@ -123,7 +126,7 @@ func TestNewRangeQuery_InvalidExpressionTypes(t *testing.T) { // Once the streaming engine supports all PromQL features exercised by Prometheus' test cases, we can remove these files and instead call promql.RunBuiltinTests here instead. func TestUpstreamTestCases(t *testing.T) { opts := NewTestEngineOpts() - engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger()) + engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) require.NoError(t, err) testdataFS := os.DirFS("./testdata") @@ -146,7 +149,7 @@ func TestUpstreamTestCases(t *testing.T) { func TestOurTestCases(t *testing.T) { opts := NewTestEngineOpts() - mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger()) + mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) require.NoError(t, err) prometheusEngine := promql.NewEngine(opts) @@ -184,7 +187,7 @@ func TestOurTestCases(t *testing.T) { // So instead, we test these few cases here instead. func TestRangeVectorSelectors(t *testing.T) { opts := NewTestEngineOpts() - mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger()) + mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) require.NoError(t, err) prometheusEngine := promql.NewEngine(opts) @@ -297,7 +300,7 @@ func TestRangeVectorSelectors(t *testing.T) { func TestQueryCancellation(t *testing.T) { opts := NewTestEngineOpts() - engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger()) + engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) require.NoError(t, err) // Simulate the query being cancelled by another goroutine by waiting for the Select() call to be made, @@ -325,7 +328,7 @@ func TestQueryCancellation(t *testing.T) { func TestQueryTimeout(t *testing.T) { opts := NewTestEngineOpts() opts.Timeout = 20 * time.Millisecond - engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger()) + engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) require.NoError(t, err) // Simulate the query doing some work and check that the query context has been cancelled. @@ -391,7 +394,7 @@ func (w cancellationQuerier) waitForCancellation(ctx context.Context) error { func TestQueryContextCancelledOnceQueryFinished(t *testing.T) { opts := NewTestEngineOpts() - engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger()) + engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) require.NoError(t, err) storage := promqltest.LoadedStorage(t, ` @@ -459,7 +462,7 @@ func (q *contextCapturingQuerier) Close() error { return q.inner.Close() } -func TestMemoryConsumptionLimit(t *testing.T) { +func TestMemoryConsumptionLimit_SingleQueries(t *testing.T) { storage := promqltest.LoadedStorage(t, ` load 1m some_metric{idx="1"} 0+1x5 @@ -553,7 +556,7 @@ func TestMemoryConsumptionLimit(t *testing.T) { opts := NewTestEngineOpts() opts.Reg = reg - engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(limit), log.NewNopLogger()) + engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(limit), stats.NewQueryMetrics(reg), log.NewNopLogger()) require.NoError(t, err) tracer, closer := jaeger.NewTracer("test", jaeger.NewConstSampler(true), jaeger.NewNullReporter()) @@ -607,8 +610,10 @@ func TestMemoryConsumptionLimit(t *testing.T) { if testCase.shouldSucceed { require.NoError(t, res.Err) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(0)), "cortex_querier_queries_rejected_total")) } else { require.ErrorContains(t, res.Err, globalerror.MaxEstimatedMemoryConsumptionPerQuery.Error()) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(1)), "cortex_querier_queries_rejected_total")) } assertEstimatedPeakMemoryConsumption(t, reg, span, expectedPeakMemoryConsumption) @@ -618,6 +623,67 @@ func TestMemoryConsumptionLimit(t *testing.T) { } } +func TestMemoryConsumptionLimit_MultipleQueries(t *testing.T) { + storage := promqltest.LoadedStorage(t, ` + load 1m + some_metric{idx="1"} 0+1x5 + some_metric{idx="2"} 0+1x5 + some_metric{idx="3"} 0+1x5 + some_metric{idx="4"} 0+1x5 + some_metric{idx="5"} 0+1x5 + `) + t.Cleanup(func() { require.NoError(t, storage.Close()) }) + + reg := prometheus.NewPedanticRegistry() + opts := NewTestEngineOpts() + opts.Reg = reg + + limit := 3 * 8 * pooling.FPointSize // Allow up to three series with five points (which will be rounded up to 8, the nearest power of 2) + engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(limit), stats.NewQueryMetrics(reg), log.NewNopLogger()) + require.NoError(t, err) + + runQuery := func(expr string, shouldSucceed bool) { + q, err := engine.NewRangeQuery(context.Background(), storage, nil, expr, timestamp.Time(0), timestamp.Time(0).Add(4*time.Minute), time.Minute) + require.NoError(t, err) + defer q.Close() + + res := q.Exec(context.Background()) + + if shouldSucceed { + require.NoError(t, res.Err) + } else { + require.ErrorContains(t, res.Err, globalerror.MaxEstimatedMemoryConsumptionPerQuery.Error()) + } + } + + runQuery(`some_metric{idx=~"1"}`, true) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(0)), "cortex_querier_queries_rejected_total")) + + runQuery(`some_metric{idx=~"1|2|3"}`, true) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(0)), "cortex_querier_queries_rejected_total")) + + runQuery(`some_metric{idx=~"1|2|3|4"}`, false) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(1)), "cortex_querier_queries_rejected_total")) + + runQuery(`some_metric{idx=~"1|2|3|4"}`, false) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(2)), "cortex_querier_queries_rejected_total")) + + runQuery(`some_metric{idx=~"1|2|3"}`, true) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(2)), "cortex_querier_queries_rejected_total")) +} + +func rejectedMetrics(rejectedDueToMemoryConsumption int) string { + return fmt.Sprintf(` + # HELP cortex_querier_queries_rejected_total Number of queries that were rejected, for example because they exceeded a limit. + # TYPE cortex_querier_queries_rejected_total counter + cortex_querier_queries_rejected_total{reason="max-estimated-fetched-chunks-per-query"} 0 + cortex_querier_queries_rejected_total{reason="max-estimated-memory-consumption-per-query"} %v + cortex_querier_queries_rejected_total{reason="max-fetched-chunk-bytes-per-query"} 0 + cortex_querier_queries_rejected_total{reason="max-fetched-chunks-per-query"} 0 + cortex_querier_queries_rejected_total{reason="max-fetched-series-per-query"} 0 + `, rejectedDueToMemoryConsumption) +} + func getHistogram(t *testing.T, reg *prometheus.Registry, name string) *dto.Histogram { metrics, err := reg.Gather() require.NoError(t, err) @@ -640,7 +706,7 @@ func TestActiveQueryTracker(t *testing.T) { opts := NewTestEngineOpts() tracker := &testQueryTracker{} opts.ActiveQueryTracker = tracker - engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger()) + engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) require.NoError(t, err) innerStorage := promqltest.LoadedStorage(t, "") @@ -747,7 +813,7 @@ func TestActiveQueryTracker_WaitingForTrackerIncludesQueryTimeout(t *testing.T) opts := NewTestEngineOpts() opts.Timeout = 10 * time.Millisecond opts.ActiveQueryTracker = tracker - engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), log.NewNopLogger()) + engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) require.NoError(t, err) queryTypes := map[string]func() (promql.Query, error){ diff --git a/pkg/streamingpromql/operators/binary_operation_test.go b/pkg/streamingpromql/operators/binary_operation_test.go index 413c4162597..ca372660b2b 100644 --- a/pkg/streamingpromql/operators/binary_operation_test.go +++ b/pkg/streamingpromql/operators/binary_operation_test.go @@ -254,7 +254,7 @@ func TestBinaryOperation_SeriesMerging(t *testing.T) { On: true, MatchingLabels: []string{"env"}, }, - Pool: pooling.NewLimitingPool(0), + Pool: pooling.NewLimitingPool(0, nil), } result, err := o.mergeOneSide(testCase.input, testCase.sourceSeriesIndices, testCase.sourceSeriesMetadata, "right") @@ -526,7 +526,7 @@ func TestBinaryOperationSeriesBuffer(t *testing.T) { } seriesUsed := []bool{true, false, true, true, true} - buffer := newBinaryOperationSeriesBuffer(inner, seriesUsed, pooling.NewLimitingPool(0)) + buffer := newBinaryOperationSeriesBuffer(inner, seriesUsed, pooling.NewLimitingPool(0, nil)) ctx := context.Background() // Read first series. diff --git a/pkg/streamingpromql/pooling/limiting_pool.go b/pkg/streamingpromql/pooling/limiting_pool.go index da0420d65b9..a7a79d5e927 100644 --- a/pkg/streamingpromql/pooling/limiting_pool.go +++ b/pkg/streamingpromql/pooling/limiting_pool.go @@ -5,6 +5,7 @@ package pooling import ( "unsafe" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/promql" "github.com/grafana/mimir/pkg/streamingpromql/types" @@ -61,11 +62,16 @@ type LimitingPool struct { MaxEstimatedMemoryConsumptionBytes uint64 CurrentEstimatedMemoryConsumptionBytes uint64 PeakEstimatedMemoryConsumptionBytes uint64 + + rejectionCount prometheus.Counter + haveRecordedRejection bool } -func NewLimitingPool(maxEstimatedMemoryConsumptionBytes uint64) *LimitingPool { +func NewLimitingPool(maxEstimatedMemoryConsumptionBytes uint64, rejectionCount prometheus.Counter) *LimitingPool { return &LimitingPool{ MaxEstimatedMemoryConsumptionBytes: maxEstimatedMemoryConsumptionBytes, + + rejectionCount: rejectionCount, } } @@ -84,6 +90,12 @@ func getWithElementSize[E any, S ~[]E](p *LimitingPool, pool *pool.BucketedPool[ if p.MaxEstimatedMemoryConsumptionBytes > 0 && p.CurrentEstimatedMemoryConsumptionBytes+estimatedBytes > p.MaxEstimatedMemoryConsumptionBytes { pool.Put(s) + + if !p.haveRecordedRejection { + p.haveRecordedRejection = true + p.rejectionCount.Inc() + } + return nil, limiter.NewMaxEstimatedMemoryConsumptionPerQueryLimitError(p.MaxEstimatedMemoryConsumptionBytes) } diff --git a/pkg/streamingpromql/pooling/limiting_pool_test.go b/pkg/streamingpromql/pooling/limiting_pool_test.go index 549c5d67e13..c4ce94e7347 100644 --- a/pkg/streamingpromql/pooling/limiting_pool_test.go +++ b/pkg/streamingpromql/pooling/limiting_pool_test.go @@ -4,39 +4,50 @@ package pooling import ( "fmt" + "strings" "testing" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" ) +const rejectedQueryName = "rejected_queries" + func TestLimitingPool_Unlimited(t *testing.T) { t.Run("[]promql.FPoint", func(t *testing.T) { - pool := NewLimitingPool(0) - testUnlimitedPool(t, pool.GetFPointSlice, pool.PutFPointSlice, pool, FPointSize) + reg, metric := createRejectedMetric() + pool := NewLimitingPool(0, metric) + testUnlimitedPool(t, pool.GetFPointSlice, pool.PutFPointSlice, pool, FPointSize, reg) }) t.Run("[]promql.HPoint", func(t *testing.T) { - pool := NewLimitingPool(0) - testUnlimitedPool(t, pool.GetHPointSlice, pool.PutHPointSlice, pool, HPointSize) + reg, metric := createRejectedMetric() + pool := NewLimitingPool(0, metric) + testUnlimitedPool(t, pool.GetHPointSlice, pool.PutHPointSlice, pool, HPointSize, reg) }) t.Run("promql.Vector", func(t *testing.T) { - pool := NewLimitingPool(0) - testUnlimitedPool(t, pool.GetVector, pool.PutVector, pool, VectorSampleSize) + reg, metric := createRejectedMetric() + pool := NewLimitingPool(0, metric) + testUnlimitedPool(t, pool.GetVector, pool.PutVector, pool, VectorSampleSize, reg) }) t.Run("[]float64", func(t *testing.T) { - pool := NewLimitingPool(0) - testUnlimitedPool(t, pool.GetFloatSlice, pool.PutFloatSlice, pool, Float64Size) + reg, metric := createRejectedMetric() + pool := NewLimitingPool(0, metric) + testUnlimitedPool(t, pool.GetFloatSlice, pool.PutFloatSlice, pool, Float64Size, reg) }) t.Run("[]bool", func(t *testing.T) { - pool := NewLimitingPool(0) - testUnlimitedPool(t, pool.GetBoolSlice, pool.PutBoolSlice, pool, BoolSize) + reg, metric := createRejectedMetric() + pool := NewLimitingPool(0, metric) + testUnlimitedPool(t, pool.GetBoolSlice, pool.PutBoolSlice, pool, BoolSize, reg) }) } -func testUnlimitedPool[E any, S ~[]E](t *testing.T, get func(int) (S, error), put func(S), pool *LimitingPool, elementSize uint64) { +func testUnlimitedPool[E any, S ~[]E](t *testing.T, get func(int) (S, error), put func(S), pool *LimitingPool, elementSize uint64, reg *prometheus.Registry) { // Get a slice from the pool, the current and peak stats should be updated based on the capacity of the slice returned, not the size requested. s100, err := get(100) require.NoError(t, err) @@ -74,36 +85,43 @@ func testUnlimitedPool[E any, S ~[]E](t *testing.T, get func(int) (S, error), pu put(nil) require.Equal(t, 266*elementSize, pool.CurrentEstimatedMemoryConsumptionBytes) require.Equal(t, 266*elementSize, pool.PeakEstimatedMemoryConsumptionBytes) + + assertRejectedQueryCount(t, reg, 0) } func TestLimitingPool_Limited(t *testing.T) { t.Run("[]promql.FPoint", func(t *testing.T) { - pool := NewLimitingPool(11 * FPointSize) - testLimitedPool(t, pool.GetFPointSlice, pool.PutFPointSlice, pool, FPointSize) + reg, metric := createRejectedMetric() + pool := NewLimitingPool(11*FPointSize, metric) + testLimitedPool(t, pool.GetFPointSlice, pool.PutFPointSlice, pool, FPointSize, reg) }) t.Run("[]promql.HPoint", func(t *testing.T) { - pool := NewLimitingPool(11 * HPointSize) - testLimitedPool(t, pool.GetHPointSlice, pool.PutHPointSlice, pool, HPointSize) + reg, metric := createRejectedMetric() + pool := NewLimitingPool(11*HPointSize, metric) + testLimitedPool(t, pool.GetHPointSlice, pool.PutHPointSlice, pool, HPointSize, reg) }) t.Run("promql.Vector", func(t *testing.T) { - pool := NewLimitingPool(11 * VectorSampleSize) - testLimitedPool(t, pool.GetVector, pool.PutVector, pool, VectorSampleSize) + reg, metric := createRejectedMetric() + pool := NewLimitingPool(11*VectorSampleSize, metric) + testLimitedPool(t, pool.GetVector, pool.PutVector, pool, VectorSampleSize, reg) }) t.Run("[]float64", func(t *testing.T) { - pool := NewLimitingPool(11 * Float64Size) - testLimitedPool(t, pool.GetFloatSlice, pool.PutFloatSlice, pool, Float64Size) + reg, metric := createRejectedMetric() + pool := NewLimitingPool(11*Float64Size, metric) + testLimitedPool(t, pool.GetFloatSlice, pool.PutFloatSlice, pool, Float64Size, reg) }) t.Run("[]bool", func(t *testing.T) { - pool := NewLimitingPool(11 * BoolSize) - testLimitedPool(t, pool.GetBoolSlice, pool.PutBoolSlice, pool, BoolSize) + reg, metric := createRejectedMetric() + pool := NewLimitingPool(11*BoolSize, metric) + testLimitedPool(t, pool.GetBoolSlice, pool.PutBoolSlice, pool, BoolSize, reg) }) } -func testLimitedPool[E any, S ~[]E](t *testing.T, get func(int) (S, error), put func(S), pool *LimitingPool, elementSize uint64) { +func testLimitedPool[E any, S ~[]E](t *testing.T, get func(int) (S, error), put func(S), pool *LimitingPool, elementSize uint64, reg *prometheus.Registry) { // This method assumes that pool has been created with a limit set to 11x elementSize // Get a slice from the pool beneath the limit. @@ -112,6 +130,7 @@ func testLimitedPool[E any, S ~[]E](t *testing.T, get func(int) (S, error), put require.Equal(t, 8, cap(s7)) require.Equal(t, 8*elementSize, pool.CurrentEstimatedMemoryConsumptionBytes) require.Equal(t, 8*elementSize, pool.PeakEstimatedMemoryConsumptionBytes) + assertRejectedQueryCount(t, reg, 0) // Get another slice from the pool beneath the limit. s1, err := get(1) @@ -119,11 +138,13 @@ func testLimitedPool[E any, S ~[]E](t *testing.T, get func(int) (S, error), put require.Equal(t, 1, cap(s1)) require.Equal(t, 9*elementSize, pool.CurrentEstimatedMemoryConsumptionBytes) require.Equal(t, 9*elementSize, pool.PeakEstimatedMemoryConsumptionBytes) + assertRejectedQueryCount(t, reg, 0) // Return a slice to the pool. put(s1) require.Equal(t, 8*elementSize, pool.CurrentEstimatedMemoryConsumptionBytes) require.Equal(t, 9*elementSize, pool.PeakEstimatedMemoryConsumptionBytes) + assertRejectedQueryCount(t, reg, 0) // Try to get a slice where the requested size would push us over the limit. _, err = get(4) @@ -131,6 +152,7 @@ func testLimitedPool[E any, S ~[]E](t *testing.T, get func(int) (S, error), put require.ErrorContains(t, err, expectedError) require.Equal(t, 8*elementSize, pool.CurrentEstimatedMemoryConsumptionBytes) require.Equal(t, 9*elementSize, pool.PeakEstimatedMemoryConsumptionBytes) + assertRejectedQueryCount(t, reg, 1) // Try to get a slice where the requested size is under the limit, but the capacity of the slice returned by the pool is over the limit. // (We expect the pool to be configured with a factor of 2, so a slice of size 3 will be rounded up to 4 elements.) @@ -139,6 +161,9 @@ func testLimitedPool[E any, S ~[]E](t *testing.T, get func(int) (S, error), put require.Equal(t, 8*elementSize, pool.CurrentEstimatedMemoryConsumptionBytes) require.Equal(t, 9*elementSize, pool.PeakEstimatedMemoryConsumptionBytes) + // Make sure we don't increment the rejection count a second time for the same query. + assertRejectedQueryCount(t, reg, 1) + // Keep getting more slices from the pool up to the limit of 11 to make sure the failed allocations weren't counted. for i := 0; i < 3; i++ { s1, err = get(1) @@ -153,10 +178,19 @@ func testLimitedPool[E any, S ~[]E](t *testing.T, get func(int) (S, error), put require.ErrorContains(t, err, expectedError) require.Equal(t, 11*elementSize, pool.CurrentEstimatedMemoryConsumptionBytes) require.Equal(t, 11*elementSize, pool.PeakEstimatedMemoryConsumptionBytes) + assertRejectedQueryCount(t, reg, 1) +} + +func assertRejectedQueryCount(t *testing.T, reg *prometheus.Registry, expectedRejectionCount int) { + expected := fmt.Sprintf(` + # TYPE %s counter + %s %v + `, rejectedQueryName, rejectedQueryName, expectedRejectionCount) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expected), rejectedQueryName)) } func TestLimitingPool_ClearsReturnedSlices(t *testing.T) { - pool := NewLimitingPool(0) + pool := NewLimitingPool(0, nil) // Get a slice, put it back in the pool and get it back again. // Make sure all elements are zero or false when we get it back. @@ -190,3 +224,12 @@ func TestLimitingPool_ClearsReturnedSlices(t *testing.T) { require.Equal(t, []bool{false, false}, boolSlice) }) } + +func createRejectedMetric() (*prometheus.Registry, prometheus.Counter) { + reg := prometheus.NewPedanticRegistry() + metric := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: rejectedQueryName, + }) + + return reg, metric +} diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 97aff8e2a3f..3fbdceb5f5f 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -67,7 +67,7 @@ func newQuery(ctx context.Context, queryable storage.Queryable, opts promql.Quer opts: opts, engine: engine, qs: qs, - pool: pooling.NewLimitingPool(maxInMemorySamples), + pool: pooling.NewLimitingPool(maxInMemorySamples, engine.queriesRejectedDueToPeakMemoryConsumption), statement: &parser.EvalStmt{ Expr: expr, Start: start, diff --git a/pkg/util/limiter/query_limiter_test.go b/pkg/util/limiter/query_limiter_test.go index 163eb72517f..0d8041e4e4d 100644 --- a/pkg/util/limiter/query_limiter_test.go +++ b/pkg/util/limiter/query_limiter_test.go @@ -201,6 +201,7 @@ func assertRejectedQueriesMetricValue(t *testing.T, c prometheus.Collector, expe cortex_querier_queries_rejected_total{reason="max-fetched-chunk-bytes-per-query"} %v cortex_querier_queries_rejected_total{reason="max-fetched-chunks-per-query"} %v cortex_querier_queries_rejected_total{reason="max-estimated-fetched-chunks-per-query"} %v + cortex_querier_queries_rejected_total{reason="max-estimated-memory-consumption-per-query"} 0 `, expectedMaxSeries, expectedMaxChunkBytes,