Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming PromQL engine: optionally fall back to Prometheus' engine if query is not supported #7898

Merged
merged 8 commits into from
Apr 30, 2024
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
* [ENHANCEMENT] Store-gateway: Add `stage="wait_max_concurrent"` to `cortex_bucket_store_series_request_stage_duration_seconds` which records how long the query had to wait for its turn for `-blocks-storage.bucket-store.max-concurrent`. #7609
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1887,6 +1887,17 @@
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "enable_promql_engine_fallback",
"required": false,
"desc": "If true, fall back to Prometheus' PromQL engine if the streaming engine is in use and it does not support a query.",
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "querier.enable-promql-engine-fallback",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_concurrent",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1643,6 +1643,8 @@ Usage of ./cmd/mimir/mimir:
The default evaluation interval or step size for subqueries. This config option should be set on query-frontend too when query sharding is enabled. (default 1m0s)
-querier.dns-lookup-period duration
How often to query DNS for query-frontend or query-scheduler address. (default 10s)
-querier.enable-promql-engine-fallback
[experimental] If true, fall back to Prometheus' PromQL engine if the streaming engine is in use and it does not support a query. (default true)
-querier.frontend-address string
Address of the query-frontend component, in host:port format. If multiple query-frontends are running, the host should be a DNS resolving to all query-frontend instances. This option should be set only when query-scheduler component is not in use.
-querier.frontend-client.backoff-max-period duration
Expand Down
2 changes: 1 addition & 1 deletion docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ The following features are currently experimental:
- Maximum response size for active series queries (`-querier.active-series-results-max-size-bytes`)
- Enable PromQL experimental functions (`-querier.promql-experimental-functions-enabled`)
- Allow streaming of `/active_series` responses to the frontend (`-querier.response-streaming-enabled`)
- Streaming PromQL engine (`-querier.promql-engine=streaming`)
- Streaming PromQL engine (`-querier.promql-engine=streaming` and `-querier.enable-promql-engine-fallback`)
- Query-frontend
- `-query-frontend.querier-forget-delay`
- Instant query splitting (`-query-frontend.split-instant-queries-by-interval`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,11 @@ store_gateway_client:
# CLI flag: -querier.promql-engine
[promql_engine: <string> | default = "standard"]

# (experimental) If true, fall back to Prometheus' PromQL engine if the
# streaming engine is in use and it does not support a query.
# CLI flag: -querier.enable-promql-engine-fallback
[enable_promql_engine_fallback: <boolean> | default = true]

# The number of workers running in each querier process. This setting limits the
# maximum number of concurrent queries in each querier.
# CLI flag: -querier.max-concurrent
Expand Down
15 changes: 11 additions & 4 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ type Config struct {
MinimizeIngesterRequests bool `yaml:"minimize_ingester_requests" category:"advanced"`
MinimiseIngesterRequestsHedgingDelay time.Duration `yaml:"minimize_ingester_requests_hedging_delay" category:"advanced"`

PromQLEngine string `yaml:"promql_engine" category:"experimental"`
PromQLEngine string `yaml:"promql_engine" category:"experimental"`
EnablePromQLEngineFallback bool `yaml:"enable_promql_engine_fallback" category:"experimental"`

// PromQL engine config.
EngineConfig engine.Config `yaml:",inline"`
Expand Down Expand Up @@ -88,6 +89,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Uint64Var(&cfg.StreamingChunksPerStoreGatewaySeriesBufferSize, "querier.streaming-chunks-per-store-gateway-buffer-size", 256, "Number of series to buffer per store-gateway when streaming chunks from store-gateways.")

f.StringVar(&cfg.PromQLEngine, "querier.promql-engine", standardPromQLEngine, fmt.Sprintf("PromQL engine to use, either '%v' or '%v'", standardPromQLEngine, streamingPromQLEngine))
f.BoolVar(&cfg.EnablePromQLEngineFallback, "querier.enable-promql-engine-fallback", true, "If true, fall back to Prometheus' PromQL engine if the streaming engine is in use and it does not support a query.")

cfg.EngineConfig.RegisterFlags(f)
}
Expand Down Expand Up @@ -161,12 +163,17 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
case standardPromQLEngine:
eng = promql.NewEngine(opts)
case streamingPromQLEngine:
var err error

eng, err = streamingpromql.NewEngine(opts)
streamingEngine, err := streamingpromql.NewEngine(opts)
if err != nil {
return nil, nil, nil, err
}

if cfg.EnablePromQLEngineFallback {
prometheusEngine := promql.NewEngine(opts)
eng = streamingpromql.NewEngineWithFallback(streamingEngine, prometheusEngine, reg, logger)
} else {
eng = streamingEngine
}
default:
panic(fmt.Sprintf("invalid config not caught by validation: unknown PromQL engine '%s'", cfg.PromQLEngine))
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
t.Run(expression, func(t *testing.T) {
qry, err := engine.NewRangeQuery(ctx, db, nil, expression, time.Now().Add(-time.Hour), time.Now(), time.Minute)
require.Error(t, err)
require.ErrorIs(t, err, ErrNotSupported)
require.ErrorIs(t, err, NotSupportedError{})
require.EqualError(t, err, "not supported by streaming engine: "+expectedError)
require.Nil(t, qry)

qry, err = engine.NewInstantQuery(ctx, db, nil, expression, time.Now())
require.Error(t, err)
require.ErrorIs(t, err, ErrNotSupported)
require.ErrorIs(t, err, NotSupportedError{})
require.EqualError(t, err, "not supported by streaming engine: "+expectedError)
require.Nil(t, qry)
})
Expand All @@ -67,7 +67,7 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
t.Run(expression, func(t *testing.T) {
qry, err := engine.NewInstantQuery(ctx, db, nil, expression, time.Now())
require.Error(t, err)
require.ErrorIs(t, err, ErrNotSupported)
require.ErrorIs(t, err, NotSupportedError{})
require.EqualError(t, err, "not supported by streaming engine: "+expectedError)
require.Nil(t, qry)
})
Expand Down
16 changes: 13 additions & 3 deletions pkg/streamingpromql/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,18 @@ import (
"fmt"
)

var ErrNotSupported = errors.New("not supported by streaming engine")
type NotSupportedError struct {
reason string
}

func NewNotSupportedError(reason string) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason passed here is quite high cardinality (because generated dinamically), and it's used as a label value. Have you considered introducing few classes of pre-defined reasons, and then keeping the current dynamic string as "details"? The reason will be used in metrics, the reason + details in the info log.

Examples of pre-defined reasons:

  • unsupported aggregation (any aggregation)
  • unsupported function (any function)
  • unsupported function argument (any function, any argument)
  • unsupported expression (any expression)
  • offset
  • grouping using without

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maximum cardinality of this should be 109 series:

  • There are 16 binary operators that we don't support, although at present these will be reported as one "binary operators not supported" series
  • There are 11 aggregation functions that we don't support
  • There are 64 functions that we don't support
  • There are 10 aggregation over time functions that we don't support
  • There are 8 particular cases we don't support that will have their own series:
    • instant vector selector with offset
    • range vector selector with offset
    • grouping with without
    • number literals as the top-level expression
    • string literals as the top-level expression
    • unary expressions
    • range vector selectors as the top-level expression
    • subqueries

I'm open to other opinions, but I'm inclined to keep this as-is to keep things simple and make analysis easier. As we implement support for each of these, the associated series will disappear, and we can use Adaptive Metrics to aggregate away unnecessary labels like pod etc. in the meantime. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to merge this as-is, but if you have any opinions on this @pracucci let me know and I can change the behaviour in a follow-up PR.

return NotSupportedError{reason}
}

func (e NotSupportedError) Error() string {
return fmt.Sprintf("not supported by streaming engine: %v", e.reason)
}

func NewNotSupportedError(detail string) error {
return fmt.Errorf("%w: %s", ErrNotSupported, detail)
func (e NotSupportedError) Is(target error) bool {
return errors.As(target, &NotSupportedError{})
}
88 changes: 88 additions & 0 deletions pkg/streamingpromql/fallback_engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// SPDX-License-Identifier: AGPL-3.0-only

package streamingpromql

import (
"context"
"errors"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"

"github.com/grafana/mimir/pkg/util/spanlogger"
)

type EngineWithFallback struct {
preferred promql.QueryEngine
fallback promql.QueryEngine

supportedQueries prometheus.Counter
unsupportedQueries *prometheus.CounterVec

logger log.Logger
}

func NewEngineWithFallback(preferred, fallback promql.QueryEngine, reg prometheus.Registerer, logger log.Logger) promql.QueryEngine {
return &EngineWithFallback{
preferred: preferred,
fallback: fallback,

supportedQueries: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_querier_streaming_promql_engine_supported_queries_total",
Help: "Total number of queries that were supported by the streaming engine.",
}),
unsupportedQueries: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_querier_streaming_promql_engine_unsupported_queries_total",
Help: "Total number of queries that were not supported by the streaming engine and so fell back to Prometheus' engine.",
}, []string{"reason"}),

logger: logger,
}
}

func (e EngineWithFallback) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
query, err := e.preferred.NewInstantQuery(ctx, q, opts, qs, ts)

if err == nil {
e.supportedQueries.Inc()
return query, nil
}

notSupportedErr := NotSupportedError{}
if !errors.As(err, &notSupportedErr) {
// Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported.
return nil, err
}

logger := spanlogger.FromContext(ctx, e.logger)
level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", notSupportedErr.reason, "expr", qs)
e.unsupportedQueries.WithLabelValues(notSupportedErr.reason).Inc()

return e.fallback.NewInstantQuery(ctx, q, opts, qs, ts)
}

func (e EngineWithFallback) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
query, err := e.preferred.NewRangeQuery(ctx, q, opts, qs, start, end, interval)

if err == nil {
e.supportedQueries.Inc()
return query, nil
}

notSupportedErr := NotSupportedError{}
if !errors.As(err, &notSupportedErr) {
// Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported.
return nil, err
}

logger := spanlogger.FromContext(ctx, e.logger)
level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", notSupportedErr.reason, "expr", qs)
e.unsupportedQueries.WithLabelValues(notSupportedErr.reason).Inc()

return e.fallback.NewRangeQuery(ctx, q, opts, qs, start, end, interval)
}
166 changes: 166 additions & 0 deletions pkg/streamingpromql/fallback_engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// SPDX-License-Identifier: AGPL-3.0-only

package streamingpromql

import (
"context"
"errors"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/stats"
"github.com/stretchr/testify/require"
)

func TestEngineWithFallback(t *testing.T) {
ctx := context.Background()
logger := log.NewNopLogger()

generators := map[string]func(engine promql.QueryEngine, expr string) (promql.Query, error){
"instant query": func(engine promql.QueryEngine, expr string) (promql.Query, error) {
return engine.NewInstantQuery(ctx, nil, nil, expr, time.Now())
},
"range query": func(engine promql.QueryEngine, expr string) (promql.Query, error) {
return engine.NewRangeQuery(ctx, nil, nil, expr, time.Now(), time.Now().Add(-time.Minute), time.Second)
},
}

for name, createQuery := range generators {
t.Run(name, func(t *testing.T) {
t.Run("should not fall back for supported expressions", func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
preferredEngine := newFakeEngineThatSupportsLimitedQueries()
fallbackEngine := newFakeEngineThatSupportsAllQueries()
engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger)

query, err := createQuery(engineWithFallback, "a_supported_expression")
require.NoError(t, err)
require.Equal(t, preferredEngine.query, query, "should return query from preferred engine")
require.False(t, fallbackEngine.wasCalled, "should not call fallback engine if expression is supported by preferred engine")

require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_querier_streaming_promql_engine_supported_queries_total Total number of queries that were supported by the streaming engine.
# TYPE cortex_querier_streaming_promql_engine_supported_queries_total counter
cortex_querier_streaming_promql_engine_supported_queries_total 1
`), "cortex_querier_streaming_promql_engine_supported_queries_total", "cortex_querier_streaming_promql_engine_unsupported_queries_total"))
})

t.Run("should fall back for unsupported expressions", func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
preferredEngine := newFakeEngineThatSupportsLimitedQueries()
fallbackEngine := newFakeEngineThatSupportsAllQueries()
engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger)

query, err := createQuery(engineWithFallback, "a_non_supported_expression")
require.NoError(t, err)
require.Equal(t, fallbackEngine.query, query, "should return query from fallback engine if expression is not supported by preferred engine")

require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_querier_streaming_promql_engine_supported_queries_total Total number of queries that were supported by the streaming engine.
# TYPE cortex_querier_streaming_promql_engine_supported_queries_total counter
cortex_querier_streaming_promql_engine_supported_queries_total 0
# HELP cortex_querier_streaming_promql_engine_unsupported_queries_total Total number of queries that were not supported by the streaming engine and so fell back to Prometheus' engine.
# TYPE cortex_querier_streaming_promql_engine_unsupported_queries_total counter
cortex_querier_streaming_promql_engine_unsupported_queries_total{reason="this expression is not supported"} 1
`), "cortex_querier_streaming_promql_engine_supported_queries_total", "cortex_querier_streaming_promql_engine_unsupported_queries_total"))
})

t.Run("should not fall back if creating query fails for another reason", func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
preferredEngine := newFakeEngineThatSupportsLimitedQueries()
fallbackEngine := newFakeEngineThatSupportsAllQueries()
engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger)

_, err := createQuery(engineWithFallback, "an_invalid_expression")
require.EqualError(t, err, "the query is invalid")
require.False(t, fallbackEngine.wasCalled, "should not call fallback engine if creating query fails for another reason")
})
})
}
}

type fakeEngineThatSupportsAllQueries struct {
query *fakeQuery
wasCalled bool
}

func newFakeEngineThatSupportsAllQueries() *fakeEngineThatSupportsAllQueries {
return &fakeEngineThatSupportsAllQueries{
query: &fakeQuery{"query from fallback engine"},
}
}

func (f *fakeEngineThatSupportsAllQueries) NewInstantQuery(context.Context, storage.Queryable, promql.QueryOpts, string, time.Time) (promql.Query, error) {
f.wasCalled = true
return f.query, nil
}

func (f *fakeEngineThatSupportsAllQueries) NewRangeQuery(context.Context, storage.Queryable, promql.QueryOpts, string, time.Time, time.Time, time.Duration) (promql.Query, error) {
f.wasCalled = true
return f.query, nil
}

type fakeEngineThatSupportsLimitedQueries struct {
query *fakeQuery
}

func newFakeEngineThatSupportsLimitedQueries() *fakeEngineThatSupportsLimitedQueries {
return &fakeEngineThatSupportsLimitedQueries{
query: &fakeQuery{"query from preferred engine"},
}
}

func (f *fakeEngineThatSupportsLimitedQueries) NewInstantQuery(_ context.Context, _ storage.Queryable, _ promql.QueryOpts, qs string, _ time.Time) (promql.Query, error) {
if qs == "a_supported_expression" {
return f.query, nil
} else if qs == "an_invalid_expression" {
return nil, errors.New("the query is invalid")
}

return nil, NewNotSupportedError("this expression is not supported")
}

func (f *fakeEngineThatSupportsLimitedQueries) NewRangeQuery(_ context.Context, _ storage.Queryable, _ promql.QueryOpts, qs string, _, _ time.Time, _ time.Duration) (promql.Query, error) {
if qs == "a_supported_expression" {
return f.query, nil
} else if qs == "an_invalid_expression" {
return nil, errors.New("the query is invalid")
}

return nil, NewNotSupportedError("this expression is not supported")
}

type fakeQuery struct {
name string
}

func (f fakeQuery) Exec(context.Context) *promql.Result {
panic("fakeQuery: Exec() not supported")
}

func (f fakeQuery) Close() {
panic("fakeQuery: Close() not supported")
}

func (f fakeQuery) Statement() parser.Statement {
panic("fakeQuery: Statement() not supported")
}

func (f fakeQuery) Stats() *stats.Statistics {
panic("fakeQuery: Stats() not supported")
}

func (f fakeQuery) Cancel() {
panic("fakeQuery: Cancel() not supported")
}

func (f fakeQuery) String() string {
panic("fakeQuery: String() not supported")
}
Loading