Skip to content

Commit

Permalink
Streaming PromQL engine: optionally fall back to Prometheus' engine i…
Browse files Browse the repository at this point in the history
…f query is not supported (#7898)

* Initial commit.

* Log and add a trace event when a query falls back to Prometheus' engine.

* Add CLI flag to enable falling back to Prometheus' engine, and enable fallback by default.

* Add changelog and docs

* Address PR feedback

* Rename metrics

These metrics could be emitted by query-frontends too, so "querier" in
the name could be misleading.
  • Loading branch information
charleskorn committed Apr 30, 2024
1 parent 37e2aa1 commit bce1bbc
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 12 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7899
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [ENHANCEMENT] Distributor: add experimental limit for exemplars per series per request, enabled with `-distributor.max-exemplars-per-series-per-request`, the number of discarded exemplars are tracked with `cortex_discarded_exemplars_total{reason="too_many_exemplars_per_series"}` #7989
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1898,6 +1898,17 @@
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "enable_promql_engine_fallback",
"required": false,
"desc": "If set to true and the streaming engine is in use, fall back to using the Prometheus PromQL engine for any queries not supported by the streaming engine.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "querier.enable-promql-engine-fallback",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_concurrent",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1671,6 +1671,8 @@ Usage of ./cmd/mimir/mimir:
The default evaluation interval or step size for subqueries. This config option should be set on query-frontend too when query sharding is enabled. (default 1m0s)
-querier.dns-lookup-period duration
How often to query DNS for query-frontend or query-scheduler address. (default 10s)
-querier.enable-promql-engine-fallback
[experimental] If set to true and the streaming engine is in use, fall back to using the Prometheus PromQL engine for any queries not supported by the streaming engine. (default true)
-querier.frontend-address string
Address of the query-frontend component, in host:port format. If multiple query-frontends are running, the host should be a DNS resolving to all query-frontend instances. This option should be set only when query-scheduler component is not in use.
-querier.frontend-client.backoff-max-period duration
Expand Down
2 changes: 1 addition & 1 deletion docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ The following features are currently experimental:
- Maximum response size for active series queries (`-querier.active-series-results-max-size-bytes`)
- Enable PromQL experimental functions (`-querier.promql-experimental-functions-enabled`)
- Allow streaming of `/active_series` responses to the frontend (`-querier.response-streaming-enabled`)
- Streaming PromQL engine (`-querier.promql-engine=streaming`)
- Streaming PromQL engine (`-querier.promql-engine=streaming` and `-querier.enable-promql-engine-fallback`)
- Query-frontend
- `-query-frontend.querier-forget-delay`
- Instant query splitting (`-query-frontend.split-instant-queries-by-interval`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,12 @@ store_gateway_client:
# CLI flag: -querier.promql-engine
[promql_engine: <string> | default = "standard"]
# (experimental) If set to true and the streaming engine is in use, fall back to
# using the Prometheus PromQL engine for any queries not supported by the
# streaming engine.
# CLI flag: -querier.enable-promql-engine-fallback
[enable_promql_engine_fallback: <boolean> | default = true]
# The number of workers running in each querier process. This setting limits the
# maximum number of concurrent queries in each querier.
# CLI flag: -querier.max-concurrent
Expand Down
15 changes: 11 additions & 4 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ type Config struct {
MinimizeIngesterRequests bool `yaml:"minimize_ingester_requests" category:"advanced"`
MinimiseIngesterRequestsHedgingDelay time.Duration `yaml:"minimize_ingester_requests_hedging_delay" category:"advanced"`

PromQLEngine string `yaml:"promql_engine" category:"experimental"`
PromQLEngine string `yaml:"promql_engine" category:"experimental"`
EnablePromQLEngineFallback bool `yaml:"enable_promql_engine_fallback" category:"experimental"`

// PromQL engine config.
EngineConfig engine.Config `yaml:",inline"`
Expand Down Expand Up @@ -88,6 +89,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Uint64Var(&cfg.StreamingChunksPerStoreGatewaySeriesBufferSize, "querier.streaming-chunks-per-store-gateway-buffer-size", 256, "Number of series to buffer per store-gateway when streaming chunks from store-gateways.")

f.StringVar(&cfg.PromQLEngine, "querier.promql-engine", standardPromQLEngine, fmt.Sprintf("PromQL engine to use, either '%v' or '%v'", standardPromQLEngine, streamingPromQLEngine))
f.BoolVar(&cfg.EnablePromQLEngineFallback, "querier.enable-promql-engine-fallback", true, "If set to true and the streaming engine is in use, fall back to using the Prometheus PromQL engine for any queries not supported by the streaming engine.")

cfg.EngineConfig.RegisterFlags(f)
}
Expand Down Expand Up @@ -161,12 +163,17 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
case standardPromQLEngine:
eng = promql.NewEngine(opts)
case streamingPromQLEngine:
var err error

eng, err = streamingpromql.NewEngine(opts)
streamingEngine, err := streamingpromql.NewEngine(opts)
if err != nil {
return nil, nil, nil, err
}

if cfg.EnablePromQLEngineFallback {
prometheusEngine := promql.NewEngine(opts)
eng = streamingpromql.NewEngineWithFallback(streamingEngine, prometheusEngine, reg, logger)
} else {
eng = streamingEngine
}
default:
panic(fmt.Sprintf("invalid config not caught by validation: unknown PromQL engine '%s'", cfg.PromQLEngine))
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
t.Run(expression, func(t *testing.T) {
qry, err := engine.NewRangeQuery(ctx, nil, nil, expression, time.Now().Add(-time.Hour), time.Now(), time.Minute)
require.Error(t, err)
require.ErrorIs(t, err, ErrNotSupported)
require.ErrorIs(t, err, NotSupportedError{})
require.EqualError(t, err, "not supported by streaming engine: "+expectedError)
require.Nil(t, qry)

qry, err = engine.NewInstantQuery(ctx, nil, nil, expression, time.Now())
require.Error(t, err)
require.ErrorIs(t, err, ErrNotSupported)
require.ErrorIs(t, err, NotSupportedError{})
require.EqualError(t, err, "not supported by streaming engine: "+expectedError)
require.Nil(t, qry)
})
Expand All @@ -66,7 +66,7 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
t.Run(expression, func(t *testing.T) {
qry, err := engine.NewInstantQuery(ctx, nil, nil, expression, time.Now())
require.Error(t, err)
require.ErrorIs(t, err, ErrNotSupported)
require.ErrorIs(t, err, NotSupportedError{})
require.EqualError(t, err, "not supported by streaming engine: "+expectedError)
require.Nil(t, qry)
})
Expand Down
16 changes: 13 additions & 3 deletions pkg/streamingpromql/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,18 @@ import (
"fmt"
)

var ErrNotSupported = errors.New("not supported by streaming engine")
type NotSupportedError struct {
reason string
}

func NewNotSupportedError(reason string) error {
return NotSupportedError{reason}
}

func (e NotSupportedError) Error() string {
return fmt.Sprintf("not supported by streaming engine: %v", e.reason)
}

func NewNotSupportedError(detail string) error {
return fmt.Errorf("%w: %s", ErrNotSupported, detail)
func (e NotSupportedError) Is(target error) bool {
return errors.As(target, &NotSupportedError{})
}
88 changes: 88 additions & 0 deletions pkg/streamingpromql/fallback_engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// SPDX-License-Identifier: AGPL-3.0-only

package streamingpromql

import (
"context"
"errors"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"

"github.com/grafana/mimir/pkg/util/spanlogger"
)

type EngineWithFallback struct {
preferred promql.QueryEngine
fallback promql.QueryEngine

supportedQueries prometheus.Counter
unsupportedQueries *prometheus.CounterVec

logger log.Logger
}

func NewEngineWithFallback(preferred, fallback promql.QueryEngine, reg prometheus.Registerer, logger log.Logger) promql.QueryEngine {
return &EngineWithFallback{
preferred: preferred,
fallback: fallback,

supportedQueries: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_streaming_promql_engine_supported_queries_total",
Help: "Total number of queries that were supported by the streaming engine.",
}),
unsupportedQueries: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_streaming_promql_engine_unsupported_queries_total",
Help: "Total number of queries that were not supported by the streaming engine and so fell back to Prometheus' engine.",
}, []string{"reason"}),

logger: logger,
}
}

func (e EngineWithFallback) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
query, err := e.preferred.NewInstantQuery(ctx, q, opts, qs, ts)

if err == nil {
e.supportedQueries.Inc()
return query, nil
}

notSupportedErr := NotSupportedError{}
if !errors.As(err, &notSupportedErr) {
// Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported.
return nil, err
}

logger := spanlogger.FromContext(ctx, e.logger)
level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", notSupportedErr.reason, "expr", qs)
e.unsupportedQueries.WithLabelValues(notSupportedErr.reason).Inc()

return e.fallback.NewInstantQuery(ctx, q, opts, qs, ts)
}

func (e EngineWithFallback) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
query, err := e.preferred.NewRangeQuery(ctx, q, opts, qs, start, end, interval)

if err == nil {
e.supportedQueries.Inc()
return query, nil
}

notSupportedErr := NotSupportedError{}
if !errors.As(err, &notSupportedErr) {
// Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported.
return nil, err
}

logger := spanlogger.FromContext(ctx, e.logger)
level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", notSupportedErr.reason, "expr", qs)
e.unsupportedQueries.WithLabelValues(notSupportedErr.reason).Inc()

return e.fallback.NewRangeQuery(ctx, q, opts, qs, start, end, interval)
}
166 changes: 166 additions & 0 deletions pkg/streamingpromql/fallback_engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// SPDX-License-Identifier: AGPL-3.0-only

package streamingpromql

import (
"context"
"errors"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/stats"
"github.com/stretchr/testify/require"
)

func TestEngineWithFallback(t *testing.T) {
ctx := context.Background()
logger := log.NewNopLogger()

generators := map[string]func(engine promql.QueryEngine, expr string) (promql.Query, error){
"instant query": func(engine promql.QueryEngine, expr string) (promql.Query, error) {
return engine.NewInstantQuery(ctx, nil, nil, expr, time.Now())
},
"range query": func(engine promql.QueryEngine, expr string) (promql.Query, error) {
return engine.NewRangeQuery(ctx, nil, nil, expr, time.Now(), time.Now().Add(-time.Minute), time.Second)
},
}

for name, createQuery := range generators {
t.Run(name, func(t *testing.T) {
t.Run("should not fall back for supported expressions", func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
preferredEngine := newFakeEngineThatSupportsLimitedQueries()
fallbackEngine := newFakeEngineThatSupportsAllQueries()
engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger)

query, err := createQuery(engineWithFallback, "a_supported_expression")
require.NoError(t, err)
require.Equal(t, preferredEngine.query, query, "should return query from preferred engine")
require.False(t, fallbackEngine.wasCalled, "should not call fallback engine if expression is supported by preferred engine")

require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_streaming_promql_engine_supported_queries_total Total number of queries that were supported by the streaming engine.
# TYPE cortex_streaming_promql_engine_supported_queries_total counter
cortex_streaming_promql_engine_supported_queries_total 1
`), "cortex_streaming_promql_engine_supported_queries_total", "cortex_streaming_promql_engine_unsupported_queries_total"))
})

t.Run("should fall back for unsupported expressions", func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
preferredEngine := newFakeEngineThatSupportsLimitedQueries()
fallbackEngine := newFakeEngineThatSupportsAllQueries()
engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger)

query, err := createQuery(engineWithFallback, "a_non_supported_expression")
require.NoError(t, err)
require.Equal(t, fallbackEngine.query, query, "should return query from fallback engine if expression is not supported by preferred engine")

require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_streaming_promql_engine_supported_queries_total Total number of queries that were supported by the streaming engine.
# TYPE cortex_streaming_promql_engine_supported_queries_total counter
cortex_streaming_promql_engine_supported_queries_total 0
# HELP cortex_streaming_promql_engine_unsupported_queries_total Total number of queries that were not supported by the streaming engine and so fell back to Prometheus' engine.
# TYPE cortex_streaming_promql_engine_unsupported_queries_total counter
cortex_streaming_promql_engine_unsupported_queries_total{reason="this expression is not supported"} 1
`), "cortex_streaming_promql_engine_supported_queries_total", "cortex_streaming_promql_engine_unsupported_queries_total"))
})

t.Run("should not fall back if creating query fails for another reason", func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
preferredEngine := newFakeEngineThatSupportsLimitedQueries()
fallbackEngine := newFakeEngineThatSupportsAllQueries()
engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger)

_, err := createQuery(engineWithFallback, "an_invalid_expression")
require.EqualError(t, err, "the query is invalid")
require.False(t, fallbackEngine.wasCalled, "should not call fallback engine if creating query fails for another reason")
})
})
}
}

type fakeEngineThatSupportsAllQueries struct {
query *fakeQuery
wasCalled bool
}

func newFakeEngineThatSupportsAllQueries() *fakeEngineThatSupportsAllQueries {
return &fakeEngineThatSupportsAllQueries{
query: &fakeQuery{"query from fallback engine"},
}
}

func (f *fakeEngineThatSupportsAllQueries) NewInstantQuery(context.Context, storage.Queryable, promql.QueryOpts, string, time.Time) (promql.Query, error) {
f.wasCalled = true
return f.query, nil
}

func (f *fakeEngineThatSupportsAllQueries) NewRangeQuery(context.Context, storage.Queryable, promql.QueryOpts, string, time.Time, time.Time, time.Duration) (promql.Query, error) {
f.wasCalled = true
return f.query, nil
}

type fakeEngineThatSupportsLimitedQueries struct {
query *fakeQuery
}

func newFakeEngineThatSupportsLimitedQueries() *fakeEngineThatSupportsLimitedQueries {
return &fakeEngineThatSupportsLimitedQueries{
query: &fakeQuery{"query from preferred engine"},
}
}

func (f *fakeEngineThatSupportsLimitedQueries) NewInstantQuery(_ context.Context, _ storage.Queryable, _ promql.QueryOpts, qs string, _ time.Time) (promql.Query, error) {
if qs == "a_supported_expression" {
return f.query, nil
} else if qs == "an_invalid_expression" {
return nil, errors.New("the query is invalid")
}

return nil, NewNotSupportedError("this expression is not supported")
}

func (f *fakeEngineThatSupportsLimitedQueries) NewRangeQuery(_ context.Context, _ storage.Queryable, _ promql.QueryOpts, qs string, _, _ time.Time, _ time.Duration) (promql.Query, error) {
if qs == "a_supported_expression" {
return f.query, nil
} else if qs == "an_invalid_expression" {
return nil, errors.New("the query is invalid")
}

return nil, NewNotSupportedError("this expression is not supported")
}

type fakeQuery struct {
name string
}

func (f fakeQuery) Exec(context.Context) *promql.Result {
panic("fakeQuery: Exec() not supported")
}

func (f fakeQuery) Close() {
panic("fakeQuery: Close() not supported")
}

func (f fakeQuery) Statement() parser.Statement {
panic("fakeQuery: Statement() not supported")
}

func (f fakeQuery) Stats() *stats.Statistics {
panic("fakeQuery: Stats() not supported")
}

func (f fakeQuery) Cancel() {
panic("fakeQuery: Cancel() not supported")
}

func (f fakeQuery) String() string {
panic("fakeQuery: String() not supported")
}

0 comments on commit bce1bbc

Please sign in to comment.