Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First iteration of streaming PromQL engine #7693

Merged
merged 163 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
163 commits
Select commit Hold shift + click to select a range
70418a3
Use forked Prometheus module
charleskorn Aug 10, 2023
09c9f1e
Use promql.QueryEngine rather than *promql.Engine, add config option …
charleskorn Aug 10, 2023
e76edce
Add parallel query path with streaming engine.
charleskorn Aug 10, 2023
290cf6b
Add IntelliJ debugging configuration for streaming querier.
charleskorn Aug 10, 2023
204ae00
Add Grafana datasource for streaming querier
charleskorn Aug 10, 2023
a3a8b89
Use correct query-scheduler port and enable debugging
charleskorn Aug 10, 2023
86a7b11
Set up load test.
charleskorn Aug 10, 2023
9b1ebdc
Fix p99 latency panel
charleskorn Aug 10, 2023
0e08552
Commit settings used in previous test.
charleskorn Aug 10, 2023
676b2ec
Enable chunks streaming.
charleskorn Aug 11, 2023
9005d2f
Disable load generator.
charleskorn Sep 7, 2023
546762e
Create separate load testing query paths to ensure meta-monitoring qu…
charleskorn Sep 7, 2023
8e0fa53
Add initial k6-based load test script
charleskorn Sep 7, 2023
acff7ff
Add script to run load tests against both engines.
charleskorn Sep 7, 2023
fe93410
Add todo
charleskorn Sep 7, 2023
f0d562e
Extract friendly name to hostname logic to separate script
charleskorn Sep 8, 2023
046537c
Rename k6 output file
charleskorn Sep 8, 2023
55d1049
Check both engines produce the same result.
charleskorn Sep 8, 2023
94b54ed
Summarise results after each test run.
charleskorn Sep 8, 2023
7c8c136
Enable streaming chunks from store-gateways to queriers.
charleskorn Sep 8, 2023
909334d
Refactor test script, print max and average CPU and memory after test
charleskorn Sep 8, 2023
49b17c2
Reorder variables to match output order
charleskorn Sep 8, 2023
0ee3b7b
Don't include query name in k6 output file name.
charleskorn Sep 8, 2023
f7aa5b3
Add initial script to generate data.
charleskorn Sep 8, 2023
948ae97
Rename script
charleskorn Sep 8, 2023
80852f5
Add script to generate data generation config.
charleskorn Sep 8, 2023
cee0e1b
Make it easier to test different queries.
charleskorn Sep 8, 2023
231be86
Rename PromQL engine CLI flag.
charleskorn Sep 11, 2023
6a66d0f
Add Thanos' engine.
charleskorn Sep 11, 2023
53f789d
Add Thanos engine to benchmarking setup.
charleskorn Sep 11, 2023
b6462d1
Set default test duration to 5 minutes.
charleskorn Sep 11, 2023
bd06d1e
Rework data generation setup to work around limitation in prometheus-…
charleskorn Sep 12, 2023
16dcb15
Load test data faster.
charleskorn Sep 12, 2023
b0f8f61
Record test config for first test
charleskorn Sep 12, 2023
b103d6e
Record test config for second test
charleskorn Sep 12, 2023
0be5f14
Record test config for third test
charleskorn Sep 12, 2023
173a285
Disable gRPC compression for ingester client
charleskorn Sep 12, 2023
2f43ca6
Record test config for re-run of second test
charleskorn Sep 13, 2023
563ced1
Record test config for re-run of first test
charleskorn Sep 13, 2023
b2016d3
Record test config for re-run of third test
charleskorn Sep 13, 2023
ee065c2
Log URL used for test.
charleskorn Sep 13, 2023
9ec79a3
Record test config for fourth test
charleskorn Sep 13, 2023
4146014
Record test config for fifth test
charleskorn Sep 13, 2023
aeba5b3
Record test config for 10k series test
charleskorn Sep 13, 2023
c4fe6fc
Record test config for 1k series test
charleskorn Sep 13, 2023
c67895d
Record test config for 50k series test
charleskorn Sep 13, 2023
2c1934b
Fix configuration issue that does not affect outcome of testing.
charleskorn Sep 13, 2023
6578a48
Record test config for seventh test
charleskorn Sep 13, 2023
537ba98
Move production code to main Mimir code tree
charleskorn Sep 18, 2023
9aec0be
Copy test files into repo
charleskorn Sep 18, 2023
be52a92
Add license and provenance comments
charleskorn Sep 18, 2023
d533800
Add a readme.
charleskorn Sep 18, 2023
b023c31
Update package path in scripts and remove unnecessary testing script
charleskorn Sep 18, 2023
313c6c4
Add more notes to code
charleskorn Sep 18, 2023
c859e59
Explain benefit of streaming engine.
charleskorn Sep 18, 2023
27ace04
Mention benchmarking tools in readme.
charleskorn Sep 18, 2023
1086168
Remove references to Thanos' engine.
charleskorn Mar 6, 2024
1b375e9
Undo unnecessary go.mod changes
charleskorn Mar 6, 2024
d2e1cab
Remove benchmarking tool for now
charleskorn Mar 6, 2024
aff2b72
Remove docker-compose changes
charleskorn Mar 6, 2024
9ff2fd5
Reset whitespace changes
charleskorn Mar 6, 2024
a1f222f
Remove readme
charleskorn Mar 6, 2024
9c259f3
Merge branch 'main' into charleskorn/streaming-promql-engine-for-merging
charleskorn Mar 6, 2024
24b35d3
Vendor in branch of mimir-prometheus with changes from https://github…
charleskorn Mar 6, 2024
aa04c8f
Fix issues introduced with recent Prometheus changes and fix linting …
charleskorn Mar 6, 2024
6c9f83d
Detect and reject configuration not supported by streaming engine
charleskorn Mar 13, 2024
a019a40
Remove TODOs tracked elsewhere
charleskorn Mar 13, 2024
723e5f2
Bring in latest changes to `QueryEngine` interface
charleskorn Mar 13, 2024
de36024
Remove more TODOs tracked elsewhere
charleskorn Mar 13, 2024
0dbbfcb
Change signature of `Next()` to return an error when the stream is ex…
charleskorn Mar 13, 2024
15645c9
Ignore warnings when comparing results from Prometheus and streaming …
charleskorn Mar 13, 2024
d7edb4a
Correct naming
charleskorn Mar 13, 2024
a6b76d5
Rename `Operator` to clarify that it produces instant vectors.
charleskorn Mar 13, 2024
42bbfb3
Rename test file
charleskorn Mar 14, 2024
761894c
Reorganise test file
charleskorn Mar 14, 2024
c7d46ea
Bring in latest benchmark changes from upstream.
charleskorn Mar 14, 2024
9bcf6f3
Run tests and benchmarks with isolation disabled, to mirror what Mimi…
charleskorn Mar 14, 2024
c6ce4a2
Fix script permissions
charleskorn Mar 14, 2024
ff23e39
Simplify test setup and don't bother testing 2000 series case to spee…
charleskorn Mar 14, 2024
9d7354e
Use assertion helpers in benchmark
charleskorn Mar 14, 2024
61d22a1
Improve engine benchmark comparison script: fix issue with package na…
charleskorn Mar 14, 2024
eae3a5e
Remove unnecessary files
charleskorn Mar 14, 2024
cffbf10
Add note to engine benchmark comparison script
charleskorn Mar 14, 2024
a2c8491
Make test names clearer
charleskorn Mar 15, 2024
c23ea43
Ensure both engines produce the same result before benchmarking
charleskorn Mar 15, 2024
0c9bb09
Add tests for unsupported expressions and improve error messages retu…
charleskorn Mar 15, 2024
ac995e1
Fix linting warnings
charleskorn Mar 15, 2024
a9d3959
Add notes for invalid cases that should be caught by the PromQL parser.
charleskorn Mar 15, 2024
179cf09
Implement `Query.Statement()`
charleskorn Mar 15, 2024
34fdd45
Give variable a better name
charleskorn Mar 15, 2024
944306c
Remove Pool interface
charleskorn Mar 15, 2024
7b09ed4
Remove unnecessary types
charleskorn Mar 15, 2024
687f889
Update comments
charleskorn Mar 15, 2024
2815a1b
Remove unused lookback delta for range vector selector
charleskorn Mar 15, 2024
6189f25
Extract selector logic to a shared type
charleskorn Mar 22, 2024
378a8e3
Add more TODOs
charleskorn Mar 22, 2024
8006939
Use Selector for range vector selectors.
charleskorn Mar 22, 2024
dc292e1
Compute values used on every call to Next() once in instant and range…
charleskorn Mar 22, 2024
94d29aa
Merge branch 'main' into charleskorn/streaming-promql-engine-for-merging
charleskorn Mar 22, 2024
03d065a
Remove unnecessary script and update comment
charleskorn Mar 22, 2024
cbdc44c
Validate query time range
charleskorn Mar 22, 2024
017304a
Enforce that range query expression produces a vector or scalar result.
charleskorn Mar 22, 2024
e427ade
Add CLI flag to experimental features list in docs
charleskorn Mar 22, 2024
e8ace15
Add changelog entry
charleskorn Mar 22, 2024
333c8df
Fix linting warnings
charleskorn Mar 22, 2024
e3933a2
Move common PromQL engine configuration to its own package
charleskorn Mar 25, 2024
7d0182b
Don't bother with 10 series benchmarks.
charleskorn Mar 25, 2024
1bbea89
Sort imports
charleskorn Mar 25, 2024
0058fc5
Add missing license header.
charleskorn Mar 25, 2024
228548b
Use bucketed pool based on zeropool.
charleskorn Mar 25, 2024
91d1cb9
Fix import sorting
charleskorn Mar 25, 2024
6eaec63
Set upper limits for sizes of pooled slices.
charleskorn Mar 25, 2024
2034c55
Set size for SeriesBatch pool
charleskorn Mar 26, 2024
69a25d1
Add integration test.
charleskorn Mar 26, 2024
ebe368e
Move RingBuffer out of util package and pool slices used.
charleskorn Mar 26, 2024
8a1c27a
Add tests for ring buffer type.
charleskorn Mar 26, 2024
f58b52a
Return RingBuffer point slices to pool when no longer required.
charleskorn Mar 26, 2024
4b82b46
Fix linting
charleskorn Mar 26, 2024
5baaab3
Remove TODOs about pooling group slices and maps.
charleskorn Mar 26, 2024
d7f3433
Pool groups in aggregation operator.
charleskorn Mar 26, 2024
1a4dda3
Remove TODOs about pooling label builders.
charleskorn Mar 26, 2024
c5f7858
Remove more TODOs, clarify interface expectations
charleskorn Mar 26, 2024
37c958e
Release memory sooner after query execution
charleskorn Mar 26, 2024
c017273
Fix typo in comment
charleskorn Mar 27, 2024
c44074b
Remove unecessary context cancellation checks from operators.
charleskorn Mar 27, 2024
c367862
Fix linting warnings
charleskorn Mar 27, 2024
676070a
Address remaining TODOs in Selector
charleskorn Mar 27, 2024
c0dd40c
Fix integration test flakiness
charleskorn Mar 27, 2024
43929c9
Don't panic when encountering a native histogram
charleskorn Mar 27, 2024
458c366
Fix issue where bucketed pool could return a slice with capacity less…
charleskorn Mar 27, 2024
4aa8f69
Remove redundant checks
charleskorn Mar 27, 2024
2fe1b10
Add support for @ in instant and range vector selectors.
charleskorn Mar 27, 2024
fc6b340
Run Prometheus' test cases against streaming engine.
charleskorn Mar 27, 2024
52f4671
Fix failing staleness test: don't emit empty series for instant query
charleskorn Mar 28, 2024
e3f0055
Upgrade mimir-prometheus
charleskorn Mar 28, 2024
e4e8123
Add test case for stale markers in a range query.
charleskorn Mar 28, 2024
51ed802
Add test cases for @ modifier with range queries
charleskorn Mar 28, 2024
6a2623b
Check for errors while reading test script
charleskorn Mar 28, 2024
c51b047
Fix whitespace
charleskorn Mar 28, 2024
6bbc93e
Remove duplicate timestamp computation in Selector and operators
charleskorn Apr 1, 2024
1fd7ad5
Update `rate` calculation to match behaviour in https://github.com/pr…
charleskorn Apr 2, 2024
b23d086
Add an overview of the engine.
charleskorn Apr 2, 2024
30ec0c5
Fix linting issues and clarify docs.
charleskorn Apr 2, 2024
663ec17
Add test case for scenario where input to aggregation returns no resu…
charleskorn Apr 2, 2024
06ebc4d
Implement `Query.String()`, and remove stale TODOs.
charleskorn Apr 2, 2024
7bb3ca8
Clarify BucketedPool contract, and add test case for case where min s…
charleskorn Apr 2, 2024
9f9023a
Expand test coverage to include scenarios not covered by upstream tes…
charleskorn Apr 2, 2024
119160c
Elaborate on comment
charleskorn Apr 2, 2024
d5a9127
Merge branch 'main' into charleskorn/streaming-promql-engine-for-merging
charleskorn Apr 2, 2024
65e323f
Expand example in docs and fix typo
charleskorn Apr 2, 2024
c86ecf9
Remove unnecessary sorting and use non-experimental slices package in…
charleskorn Apr 2, 2024
2174241
Add missing comments.
charleskorn Apr 2, 2024
2eadbf5
Clean up temporary files in benchmark comparison script, and improve …
charleskorn Apr 3, 2024
6a1af36
Merge branch 'main' into charleskorn/streaming-promql-engine-for-merging
charleskorn Apr 5, 2024
4419c84
Add note explaining purpose of script.
charleskorn Apr 5, 2024
6e26ba3
Address PR feedback
charleskorn Apr 8, 2024
ba06a32
Replace use of `DurationMilliseconds` with `d.Milliseconds()`
charleskorn Apr 8, 2024
7ecc3fe
Rename `Series()` to `SeriesMetadata()`
charleskorn Apr 8, 2024
d2a9001
Include names of acceptable values in CLI flag description
charleskorn Apr 9, 2024
95699d6
Remove TODOs
charleskorn Apr 9, 2024
10bf1cd
Add explanation for RangeVectorSelectorWithTransformation
charleskorn Apr 9, 2024
1bdf6c6
Move to top-level `streamingpromql` package
charleskorn Apr 9, 2024
2dba89a
Move common query engine config back to original location (revert e39…
charleskorn Apr 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
* [ENHANCEMENT] Store-gateway: Add `stage="wait_max_concurrent"` to `cortex_bucket_store_series_request_stage_duration_seconds` which records how long the query had to wait for its turn for `-blocks-storage.bucket-store.max-concurrent`. #7609
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1876,6 +1876,17 @@
"fieldType": "duration",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "promql_engine",
"required": false,
"desc": "PromQL engine to use, either 'standard' or 'streaming'",
"fieldValue": null,
"fieldDefaultValue": "standard",
"fieldFlag": "querier.promql-engine",
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_concurrent",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,8 @@ Usage of ./cmd/mimir/mimir:
Delay before initiating requests to further ingesters when request minimization is enabled and the initially selected set of ingesters have not all responded. Ignored if -querier.minimize-ingester-requests is not enabled. (default 3s)
-querier.prefer-streaming-chunks-from-store-gateways
[experimental] Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.
-querier.promql-engine string
[experimental] PromQL engine to use, either 'standard' or 'streaming' (default "standard")
-querier.promql-experimental-functions-enabled
[experimental] Enable experimental PromQL functions. This config option should be set on query-frontend too when query sharding is enabled.
-querier.query-ingesters-within duration
Expand Down
1 change: 1 addition & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ The following features are currently experimental:
- Maximum response size for active series queries (`-querier.active-series-results-max-size-bytes`)
- Enable PromQL experimental functions (`-querier.promql-experimental-functions-enabled`)
- Allow streaming of `/active_series` responses to the frontend (`-querier.response-streaming-enabled`)
- Streaming PromQL engine (`-querier.promql-engine=streaming`)
- Query-frontend
- `-query-frontend.querier-forget-delay`
- Instant query splitting (`-query-frontend.split-instant-queries-by-interval`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,10 @@ store_gateway_client:
# CLI flag: -querier.minimize-ingester-requests-hedging-delay
[minimize_ingester_requests_hedging_delay: <duration> | default = 3s]

# (experimental) PromQL engine to use, either 'standard' or 'streaming'
# CLI flag: -querier.promql-engine
[promql_engine: <string> | default = "standard"]

# The number of workers running in each querier process. This setting limits the
# maximum number of concurrent queries in each querier.
# CLI flag: -querier.max-concurrent
Expand Down
46 changes: 46 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,52 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
}
}

func TestStreamingPromQLEngine(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags(), map[string]string{
"-querier.promql-engine": "streaming",
})

consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))

// Wait until the distributor and querier have updated the ring.
// The distributor should have 512 tokens for the ingester ring and 1 for the distributor ring,
// and the querier should have 512 tokens for the ingester ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512+1), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push a series to Mimir.
writeClient, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

seriesName := "series_1"
seriesTimestamp := time.Now()
series, expectedVector, _ := generateFloatSeries(seriesName, seriesTimestamp, prompb.Label{Name: seriesName, Value: seriesName})

res, err := writeClient.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Query back the same series using the streaming PromQL engine.
c, err := e2emimir.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

result, err := c.Query(seriesName, seriesTimestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}

func testMetadataQueriesWithBlocksStorage(
t *testing.T,
c *e2emimir.Client,
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func NewQuerierHandler(
queryable storage.SampleAndChunkQueryable,
exemplarQueryable storage.ExemplarQueryable,
metadataSupplier querier.MetadataSupplier,
engine *promql.Engine,
engine promql.QueryEngine,
distributor Distributor,
reg prometheus.Registerer,
logger log.Logger,
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ type Mimir struct {
QuerierQueryable prom_storage.SampleAndChunkQueryable
ExemplarQueryable prom_storage.ExemplarQueryable
MetadataSupplier querier.MetadataSupplier
QuerierEngine *promql.Engine
QuerierEngine promql.QueryEngine
QueryFrontendTripperware querymiddleware.Tripperware
QueryFrontendCodec querymiddleware.Codec
Ruler *ruler.Ruler
Expand Down
4 changes: 4 additions & 0 deletions pkg/mimir/mimir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/grafana/mimir/pkg/distributor"
"github.com/grafana/mimir/pkg/frontend/v1/frontendv1pb"
"github.com/grafana/mimir/pkg/ingester"
"github.com/grafana/mimir/pkg/querier"
"github.com/grafana/mimir/pkg/ruler"
"github.com/grafana/mimir/pkg/ruler/rulestore"
"github.com/grafana/mimir/pkg/scheduler/schedulerpb"
Expand Down Expand Up @@ -161,6 +162,9 @@ func TestMimir(t *testing.T) {
ReplicationFactor: 1,
InstanceInterfaceNames: []string{"en0", "eth0", "lo0", "lo"},
}},
Querier: querier.Config{
PromQLEngine: "standard",
},
}
require.NoError(t, cfg.Server.LogLevel.Set("info"))

Expand Down
11 changes: 9 additions & 2 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,12 @@ func (t *Mimir) initQueryable() (serv services.Service, err error) {
registerer := prometheus.WrapRegistererWith(querierEngine, t.Registerer)

// Create a querier queryable and PromQL engine
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine, err = querier.New(
t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryable, registerer, util_log.Logger, t.ActivityTracker,
)
if err != nil {
return nil, fmt.Errorf("could not create queryable: %w", err)
}

// Use the distributor to return metric metadata by default
t.MetadataSupplier = t.Distributor
Expand Down Expand Up @@ -842,7 +845,11 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
// TODO: Consider wrapping logger to differentiate from querier module logger
rulerRegisterer := prometheus.WrapRegistererWith(rulerEngine, t.Registerer)

queryable, _, eng := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryable, rulerRegisterer, util_log.Logger, t.ActivityTracker)
queryable, _, eng, err := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryable, rulerRegisterer, util_log.Logger, t.ActivityTracker)
if err != nil {
return nil, fmt.Errorf("could not create queryable for ruler: %w", err)
}

queryable = querier.NewErrorTranslateQueryableWithFn(queryable, ruler.WrapQueryableErrors)

if t.Cfg.Ruler.TenantFederation.Enabled {
Expand Down
36 changes: 31 additions & 5 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storage/chunk"
"github.com/grafana/mimir/pkg/storage/lazyquery"
"github.com/grafana/mimir/pkg/streamingpromql"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/activitytracker"
"github.com/grafana/mimir/pkg/util/limiter"
Expand All @@ -55,12 +56,16 @@ type Config struct {
MinimizeIngesterRequests bool `yaml:"minimize_ingester_requests" category:"advanced"`
MinimiseIngesterRequestsHedgingDelay time.Duration `yaml:"minimize_ingester_requests_hedging_delay" category:"advanced"`

PromQLEngine string `yaml:"promql_engine" category:"experimental"`

// PromQL engine config.
EngineConfig engine.Config `yaml:",inline"`
}

const (
queryStoreAfterFlag = "querier.query-store-after"
queryStoreAfterFlag = "querier.query-store-after"
standardPromQLEngine = "standard"
streamingPromQLEngine = "streaming"
)

// RegisterFlags adds the flags required to config this to the given FlagSet.
Expand All @@ -82,10 +87,16 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Uint64Var(&cfg.StreamingChunksPerIngesterSeriesBufferSize, "querier.streaming-chunks-per-ingester-buffer-size", 256, "Number of series to buffer per ingester when streaming chunks from ingesters.")
f.Uint64Var(&cfg.StreamingChunksPerStoreGatewaySeriesBufferSize, "querier.streaming-chunks-per-store-gateway-buffer-size", 256, "Number of series to buffer per store-gateway when streaming chunks from store-gateways.")

f.StringVar(&cfg.PromQLEngine, "querier.promql-engine", standardPromQLEngine, fmt.Sprintf("PromQL engine to use, either '%v' or '%v'", standardPromQLEngine, streamingPromQLEngine))

cfg.EngineConfig.RegisterFlags(f)
}

func (cfg *Config) Validate() error {
if cfg.PromQLEngine != standardPromQLEngine && cfg.PromQLEngine != streamingPromQLEngine {
return fmt.Errorf("unknown PromQL engine '%s'", cfg.PromQLEngine)
}

return nil
}

Expand Down Expand Up @@ -123,7 +134,7 @@ func ShouldQueryBlockStore(queryStoreAfter time.Duration, now time.Time, queryMi
}

// New builds a queryable and promql engine.
func New(cfg Config, limits *validation.Overrides, distributor Distributor, storeQueryable storage.Queryable, reg prometheus.Registerer, logger log.Logger, tracker *activitytracker.ActivityTracker) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, *promql.Engine) {
func New(cfg Config, limits *validation.Overrides, distributor Distributor, storeQueryable storage.Queryable, reg prometheus.Registerer, logger log.Logger, tracker *activitytracker.ActivityTracker) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, promql.QueryEngine, error) {
queryMetrics := stats.NewQueryMetrics(reg)

distributorQueryable := newDistributorQueryable(distributor, limits, queryMetrics, logger)
Expand All @@ -139,13 +150,28 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
return lazyquery.NewLazyQuerier(querier), nil
})

engineOpts, engineExperimentalFunctionsEnabled := engine.NewPromQLEngineOptions(cfg.EngineConfig, tracker, logger, reg)
engine := promql.NewEngine(engineOpts)
opts, engineExperimentalFunctionsEnabled := engine.NewPromQLEngineOptions(cfg.EngineConfig, tracker, logger, reg)

// Experimental functions can only be enabled globally, and not on a per-engine basis.
parser.EnableExperimentalFunctions = engineExperimentalFunctionsEnabled

return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, engine
var eng promql.QueryEngine

switch cfg.PromQLEngine {
case standardPromQLEngine:
eng = promql.NewEngine(opts)
case streamingPromQLEngine:
var err error

eng, err = streamingpromql.NewEngine(opts)
if err != nil {
return nil, nil, nil, err
}
default:
panic(fmt.Sprintf("invalid config not caught by validation: unknown PromQL engine '%s'", cfg.PromQLEngine))
}

return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, eng, nil
}

// NewSampleAndChunkQueryable creates a SampleAndChunkQueryable from a Queryable.
Expand Down
Loading
Loading