diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fbfb7acf89..ad8ba788e3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ * [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747 * [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653 * [FEATURE] New `-.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7899 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 * [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739 * [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698 * [ENHANCEMENT] Distributor: add experimental limit for exemplars per series per request, enabled with `-distributor.max-exemplars-per-series-per-request`, the number of discarded exemplars are tracked with `cortex_discarded_exemplars_total{reason="too_many_exemplars_per_series"}` #7989 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index b511a1f8a07..c6cb29906f3 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -1898,6 +1898,17 @@ "fieldType": "string", "fieldCategory": "experimental" }, + { + "kind": "field", + "name": "enable_promql_engine_fallback", + "required": false, + "desc": "If set to true and the streaming engine is in use, fall back to using the Prometheus PromQL engine for any queries not supported by the streaming engine.", + "fieldValue": null, + "fieldDefaultValue": true, + "fieldFlag": "querier.enable-promql-engine-fallback", + "fieldType": "boolean", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "max_concurrent", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index ce7613c0fbf..9d3d01d968f 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1671,6 +1671,8 @@ Usage of ./cmd/mimir/mimir: The default evaluation interval or step size for subqueries. This config option should be set on query-frontend too when query sharding is enabled. (default 1m0s) -querier.dns-lookup-period duration How often to query DNS for query-frontend or query-scheduler address. (default 10s) + -querier.enable-promql-engine-fallback + [experimental] If set to true and the streaming engine is in use, fall back to using the Prometheus PromQL engine for any queries not supported by the streaming engine. (default true) -querier.frontend-address string Address of the query-frontend component, in host:port format. If multiple query-frontends are running, the host should be a DNS resolving to all query-frontend instances. This option should be set only when query-scheduler component is not in use. -querier.frontend-client.backoff-max-period duration diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 4fa0a9a3276..b54e14c090e 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -128,7 +128,7 @@ The following features are currently experimental: - Maximum response size for active series queries (`-querier.active-series-results-max-size-bytes`) - Enable PromQL experimental functions (`-querier.promql-experimental-functions-enabled`) - Allow streaming of `/active_series` responses to the frontend (`-querier.response-streaming-enabled`) - - Streaming PromQL engine (`-querier.promql-engine=streaming`) + - Streaming PromQL engine (`-querier.promql-engine=streaming` and `-querier.enable-promql-engine-fallback`) - Query-frontend - `-query-frontend.querier-forget-delay` - Instant query splitting (`-query-frontend.split-instant-queries-by-interval`) diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 02cfc130edc..73b6422693e 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1335,6 +1335,12 @@ store_gateway_client: # CLI flag: -querier.promql-engine [promql_engine: | default = "standard"] +# (experimental) If set to true and the streaming engine is in use, fall back to +# using the Prometheus PromQL engine for any queries not supported by the +# streaming engine. +# CLI flag: -querier.enable-promql-engine-fallback +[enable_promql_engine_fallback: | default = true] + # The number of workers running in each querier process. This setting limits the # maximum number of concurrent queries in each querier. # CLI flag: -querier.max-concurrent diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index d8955d81a5f..c1a5d6f3a7f 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -56,7 +56,8 @@ type Config struct { MinimizeIngesterRequests bool `yaml:"minimize_ingester_requests" category:"advanced"` MinimiseIngesterRequestsHedgingDelay time.Duration `yaml:"minimize_ingester_requests_hedging_delay" category:"advanced"` - PromQLEngine string `yaml:"promql_engine" category:"experimental"` + PromQLEngine string `yaml:"promql_engine" category:"experimental"` + EnablePromQLEngineFallback bool `yaml:"enable_promql_engine_fallback" category:"experimental"` // PromQL engine config. EngineConfig engine.Config `yaml:",inline"` @@ -88,6 +89,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.Uint64Var(&cfg.StreamingChunksPerStoreGatewaySeriesBufferSize, "querier.streaming-chunks-per-store-gateway-buffer-size", 256, "Number of series to buffer per store-gateway when streaming chunks from store-gateways.") f.StringVar(&cfg.PromQLEngine, "querier.promql-engine", standardPromQLEngine, fmt.Sprintf("PromQL engine to use, either '%v' or '%v'", standardPromQLEngine, streamingPromQLEngine)) + f.BoolVar(&cfg.EnablePromQLEngineFallback, "querier.enable-promql-engine-fallback", true, "If set to true and the streaming engine is in use, fall back to using the Prometheus PromQL engine for any queries not supported by the streaming engine.") cfg.EngineConfig.RegisterFlags(f) } @@ -161,12 +163,17 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor case standardPromQLEngine: eng = promql.NewEngine(opts) case streamingPromQLEngine: - var err error - - eng, err = streamingpromql.NewEngine(opts) + streamingEngine, err := streamingpromql.NewEngine(opts) if err != nil { return nil, nil, nil, err } + + if cfg.EnablePromQLEngineFallback { + prometheusEngine := promql.NewEngine(opts) + eng = streamingpromql.NewEngineWithFallback(streamingEngine, prometheusEngine, reg, logger) + } else { + eng = streamingEngine + } default: panic(fmt.Sprintf("invalid config not caught by validation: unknown PromQL engine '%s'", cfg.PromQLEngine)) } diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 31e7731d355..05bc841b8cd 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -39,13 +39,13 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { t.Run(expression, func(t *testing.T) { qry, err := engine.NewRangeQuery(ctx, nil, nil, expression, time.Now().Add(-time.Hour), time.Now(), time.Minute) require.Error(t, err) - require.ErrorIs(t, err, ErrNotSupported) + require.ErrorIs(t, err, NotSupportedError{}) require.EqualError(t, err, "not supported by streaming engine: "+expectedError) require.Nil(t, qry) qry, err = engine.NewInstantQuery(ctx, nil, nil, expression, time.Now()) require.Error(t, err) - require.ErrorIs(t, err, ErrNotSupported) + require.ErrorIs(t, err, NotSupportedError{}) require.EqualError(t, err, "not supported by streaming engine: "+expectedError) require.Nil(t, qry) }) @@ -66,7 +66,7 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { t.Run(expression, func(t *testing.T) { qry, err := engine.NewInstantQuery(ctx, nil, nil, expression, time.Now()) require.Error(t, err) - require.ErrorIs(t, err, ErrNotSupported) + require.ErrorIs(t, err, NotSupportedError{}) require.EqualError(t, err, "not supported by streaming engine: "+expectedError) require.Nil(t, qry) }) diff --git a/pkg/streamingpromql/errors.go b/pkg/streamingpromql/errors.go index 7e1255e643c..9cebc167048 100644 --- a/pkg/streamingpromql/errors.go +++ b/pkg/streamingpromql/errors.go @@ -7,8 +7,18 @@ import ( "fmt" ) -var ErrNotSupported = errors.New("not supported by streaming engine") +type NotSupportedError struct { + reason string +} + +func NewNotSupportedError(reason string) error { + return NotSupportedError{reason} +} + +func (e NotSupportedError) Error() string { + return fmt.Sprintf("not supported by streaming engine: %v", e.reason) +} -func NewNotSupportedError(detail string) error { - return fmt.Errorf("%w: %s", ErrNotSupported, detail) +func (e NotSupportedError) Is(target error) bool { + return errors.As(target, &NotSupportedError{}) } diff --git a/pkg/streamingpromql/fallback_engine.go b/pkg/streamingpromql/fallback_engine.go new file mode 100644 index 00000000000..28b9957a47a --- /dev/null +++ b/pkg/streamingpromql/fallback_engine.go @@ -0,0 +1,88 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package streamingpromql + +import ( + "context" + "errors" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + + "github.com/grafana/mimir/pkg/util/spanlogger" +) + +type EngineWithFallback struct { + preferred promql.QueryEngine + fallback promql.QueryEngine + + supportedQueries prometheus.Counter + unsupportedQueries *prometheus.CounterVec + + logger log.Logger +} + +func NewEngineWithFallback(preferred, fallback promql.QueryEngine, reg prometheus.Registerer, logger log.Logger) promql.QueryEngine { + return &EngineWithFallback{ + preferred: preferred, + fallback: fallback, + + supportedQueries: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_streaming_promql_engine_supported_queries_total", + Help: "Total number of queries that were supported by the streaming engine.", + }), + unsupportedQueries: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_streaming_promql_engine_unsupported_queries_total", + Help: "Total number of queries that were not supported by the streaming engine and so fell back to Prometheus' engine.", + }, []string{"reason"}), + + logger: logger, + } +} + +func (e EngineWithFallback) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { + query, err := e.preferred.NewInstantQuery(ctx, q, opts, qs, ts) + + if err == nil { + e.supportedQueries.Inc() + return query, nil + } + + notSupportedErr := NotSupportedError{} + if !errors.As(err, ¬SupportedErr) { + // Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported. + return nil, err + } + + logger := spanlogger.FromContext(ctx, e.logger) + level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", notSupportedErr.reason, "expr", qs) + e.unsupportedQueries.WithLabelValues(notSupportedErr.reason).Inc() + + return e.fallback.NewInstantQuery(ctx, q, opts, qs, ts) +} + +func (e EngineWithFallback) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) { + query, err := e.preferred.NewRangeQuery(ctx, q, opts, qs, start, end, interval) + + if err == nil { + e.supportedQueries.Inc() + return query, nil + } + + notSupportedErr := NotSupportedError{} + if !errors.As(err, ¬SupportedErr) { + // Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported. + return nil, err + } + + logger := spanlogger.FromContext(ctx, e.logger) + level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", notSupportedErr.reason, "expr", qs) + e.unsupportedQueries.WithLabelValues(notSupportedErr.reason).Inc() + + return e.fallback.NewRangeQuery(ctx, q, opts, qs, start, end, interval) +} diff --git a/pkg/streamingpromql/fallback_engine_test.go b/pkg/streamingpromql/fallback_engine_test.go new file mode 100644 index 00000000000..ad49747b98a --- /dev/null +++ b/pkg/streamingpromql/fallback_engine_test.go @@ -0,0 +1,166 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package streamingpromql + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/stats" + "github.com/stretchr/testify/require" +) + +func TestEngineWithFallback(t *testing.T) { + ctx := context.Background() + logger := log.NewNopLogger() + + generators := map[string]func(engine promql.QueryEngine, expr string) (promql.Query, error){ + "instant query": func(engine promql.QueryEngine, expr string) (promql.Query, error) { + return engine.NewInstantQuery(ctx, nil, nil, expr, time.Now()) + }, + "range query": func(engine promql.QueryEngine, expr string) (promql.Query, error) { + return engine.NewRangeQuery(ctx, nil, nil, expr, time.Now(), time.Now().Add(-time.Minute), time.Second) + }, + } + + for name, createQuery := range generators { + t.Run(name, func(t *testing.T) { + t.Run("should not fall back for supported expressions", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + preferredEngine := newFakeEngineThatSupportsLimitedQueries() + fallbackEngine := newFakeEngineThatSupportsAllQueries() + engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger) + + query, err := createQuery(engineWithFallback, "a_supported_expression") + require.NoError(t, err) + require.Equal(t, preferredEngine.query, query, "should return query from preferred engine") + require.False(t, fallbackEngine.wasCalled, "should not call fallback engine if expression is supported by preferred engine") + + require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_streaming_promql_engine_supported_queries_total Total number of queries that were supported by the streaming engine. + # TYPE cortex_streaming_promql_engine_supported_queries_total counter + cortex_streaming_promql_engine_supported_queries_total 1 + `), "cortex_streaming_promql_engine_supported_queries_total", "cortex_streaming_promql_engine_unsupported_queries_total")) + }) + + t.Run("should fall back for unsupported expressions", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + preferredEngine := newFakeEngineThatSupportsLimitedQueries() + fallbackEngine := newFakeEngineThatSupportsAllQueries() + engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger) + + query, err := createQuery(engineWithFallback, "a_non_supported_expression") + require.NoError(t, err) + require.Equal(t, fallbackEngine.query, query, "should return query from fallback engine if expression is not supported by preferred engine") + + require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_streaming_promql_engine_supported_queries_total Total number of queries that were supported by the streaming engine. + # TYPE cortex_streaming_promql_engine_supported_queries_total counter + cortex_streaming_promql_engine_supported_queries_total 0 + # HELP cortex_streaming_promql_engine_unsupported_queries_total Total number of queries that were not supported by the streaming engine and so fell back to Prometheus' engine. + # TYPE cortex_streaming_promql_engine_unsupported_queries_total counter + cortex_streaming_promql_engine_unsupported_queries_total{reason="this expression is not supported"} 1 + `), "cortex_streaming_promql_engine_supported_queries_total", "cortex_streaming_promql_engine_unsupported_queries_total")) + }) + + t.Run("should not fall back if creating query fails for another reason", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + preferredEngine := newFakeEngineThatSupportsLimitedQueries() + fallbackEngine := newFakeEngineThatSupportsAllQueries() + engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger) + + _, err := createQuery(engineWithFallback, "an_invalid_expression") + require.EqualError(t, err, "the query is invalid") + require.False(t, fallbackEngine.wasCalled, "should not call fallback engine if creating query fails for another reason") + }) + }) + } +} + +type fakeEngineThatSupportsAllQueries struct { + query *fakeQuery + wasCalled bool +} + +func newFakeEngineThatSupportsAllQueries() *fakeEngineThatSupportsAllQueries { + return &fakeEngineThatSupportsAllQueries{ + query: &fakeQuery{"query from fallback engine"}, + } +} + +func (f *fakeEngineThatSupportsAllQueries) NewInstantQuery(context.Context, storage.Queryable, promql.QueryOpts, string, time.Time) (promql.Query, error) { + f.wasCalled = true + return f.query, nil +} + +func (f *fakeEngineThatSupportsAllQueries) NewRangeQuery(context.Context, storage.Queryable, promql.QueryOpts, string, time.Time, time.Time, time.Duration) (promql.Query, error) { + f.wasCalled = true + return f.query, nil +} + +type fakeEngineThatSupportsLimitedQueries struct { + query *fakeQuery +} + +func newFakeEngineThatSupportsLimitedQueries() *fakeEngineThatSupportsLimitedQueries { + return &fakeEngineThatSupportsLimitedQueries{ + query: &fakeQuery{"query from preferred engine"}, + } +} + +func (f *fakeEngineThatSupportsLimitedQueries) NewInstantQuery(_ context.Context, _ storage.Queryable, _ promql.QueryOpts, qs string, _ time.Time) (promql.Query, error) { + if qs == "a_supported_expression" { + return f.query, nil + } else if qs == "an_invalid_expression" { + return nil, errors.New("the query is invalid") + } + + return nil, NewNotSupportedError("this expression is not supported") +} + +func (f *fakeEngineThatSupportsLimitedQueries) NewRangeQuery(_ context.Context, _ storage.Queryable, _ promql.QueryOpts, qs string, _, _ time.Time, _ time.Duration) (promql.Query, error) { + if qs == "a_supported_expression" { + return f.query, nil + } else if qs == "an_invalid_expression" { + return nil, errors.New("the query is invalid") + } + + return nil, NewNotSupportedError("this expression is not supported") +} + +type fakeQuery struct { + name string +} + +func (f fakeQuery) Exec(context.Context) *promql.Result { + panic("fakeQuery: Exec() not supported") +} + +func (f fakeQuery) Close() { + panic("fakeQuery: Close() not supported") +} + +func (f fakeQuery) Statement() parser.Statement { + panic("fakeQuery: Statement() not supported") +} + +func (f fakeQuery) Stats() *stats.Statistics { + panic("fakeQuery: Stats() not supported") +} + +func (f fakeQuery) Cancel() { + panic("fakeQuery: Cancel() not supported") +} + +func (f fakeQuery) String() string { + panic("fakeQuery: String() not supported") +}