Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First iteration of streaming PromQL engine #7693

Merged
merged 163 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 161 commits
Commits
Show all changes
163 commits
Select commit Hold shift + click to select a range
70418a3
Use forked Prometheus module
charleskorn Aug 10, 2023
09c9f1e
Use promql.QueryEngine rather than *promql.Engine, add config option …
charleskorn Aug 10, 2023
e76edce
Add parallel query path with streaming engine.
charleskorn Aug 10, 2023
290cf6b
Add IntelliJ debugging configuration for streaming querier.
charleskorn Aug 10, 2023
204ae00
Add Grafana datasource for streaming querier
charleskorn Aug 10, 2023
a3a8b89
Use correct query-scheduler port and enable debugging
charleskorn Aug 10, 2023
86a7b11
Set up load test.
charleskorn Aug 10, 2023
9b1ebdc
Fix p99 latency panel
charleskorn Aug 10, 2023
0e08552
Commit settings used in previous test.
charleskorn Aug 10, 2023
676b2ec
Enable chunks streaming.
charleskorn Aug 11, 2023
9005d2f
Disable load generator.
charleskorn Sep 7, 2023
546762e
Create separate load testing query paths to ensure meta-monitoring qu…
charleskorn Sep 7, 2023
8e0fa53
Add initial k6-based load test script
charleskorn Sep 7, 2023
acff7ff
Add script to run load tests against both engines.
charleskorn Sep 7, 2023
fe93410
Add todo
charleskorn Sep 7, 2023
f0d562e
Extract friendly name to hostname logic to separate script
charleskorn Sep 8, 2023
046537c
Rename k6 output file
charleskorn Sep 8, 2023
55d1049
Check both engines produce the same result.
charleskorn Sep 8, 2023
94b54ed
Summarise results after each test run.
charleskorn Sep 8, 2023
7c8c136
Enable streaming chunks from store-gateways to queriers.
charleskorn Sep 8, 2023
909334d
Refactor test script, print max and average CPU and memory after test
charleskorn Sep 8, 2023
49b17c2
Reorder variables to match output order
charleskorn Sep 8, 2023
0ee3b7b
Don't include query name in k6 output file name.
charleskorn Sep 8, 2023
f7aa5b3
Add initial script to generate data.
charleskorn Sep 8, 2023
948ae97
Rename script
charleskorn Sep 8, 2023
80852f5
Add script to generate data generation config.
charleskorn Sep 8, 2023
cee0e1b
Make it easier to test different queries.
charleskorn Sep 8, 2023
231be86
Rename PromQL engine CLI flag.
charleskorn Sep 11, 2023
6a66d0f
Add Thanos' engine.
charleskorn Sep 11, 2023
53f789d
Add Thanos engine to benchmarking setup.
charleskorn Sep 11, 2023
b6462d1
Set default test duration to 5 minutes.
charleskorn Sep 11, 2023
bd06d1e
Rework data generation setup to work around limitation in prometheus-…
charleskorn Sep 12, 2023
16dcb15
Load test data faster.
charleskorn Sep 12, 2023
b0f8f61
Record test config for first test
charleskorn Sep 12, 2023
b103d6e
Record test config for second test
charleskorn Sep 12, 2023
0be5f14
Record test config for third test
charleskorn Sep 12, 2023
173a285
Disable gRPC compression for ingester client
charleskorn Sep 12, 2023
2f43ca6
Record test config for re-run of second test
charleskorn Sep 13, 2023
563ced1
Record test config for re-run of first test
charleskorn Sep 13, 2023
b2016d3
Record test config for re-run of third test
charleskorn Sep 13, 2023
ee065c2
Log URL used for test.
charleskorn Sep 13, 2023
9ec79a3
Record test config for fourth test
charleskorn Sep 13, 2023
4146014
Record test config for fifth test
charleskorn Sep 13, 2023
aeba5b3
Record test config for 10k series test
charleskorn Sep 13, 2023
c4fe6fc
Record test config for 1k series test
charleskorn Sep 13, 2023
c67895d
Record test config for 50k series test
charleskorn Sep 13, 2023
2c1934b
Fix configuration issue that does not affect outcome of testing.
charleskorn Sep 13, 2023
6578a48
Record test config for seventh test
charleskorn Sep 13, 2023
537ba98
Move production code to main Mimir code tree
charleskorn Sep 18, 2023
9aec0be
Copy test files into repo
charleskorn Sep 18, 2023
be52a92
Add license and provenance comments
charleskorn Sep 18, 2023
d533800
Add a readme.
charleskorn Sep 18, 2023
b023c31
Update package path in scripts and remove unnecessary testing script
charleskorn Sep 18, 2023
313c6c4
Add more notes to code
charleskorn Sep 18, 2023
c859e59
Explain benefit of streaming engine.
charleskorn Sep 18, 2023
27ace04
Mention benchmarking tools in readme.
charleskorn Sep 18, 2023
1086168
Remove references to Thanos' engine.
charleskorn Mar 6, 2024
1b375e9
Undo unnecessary go.mod changes
charleskorn Mar 6, 2024
d2e1cab
Remove benchmarking tool for now
charleskorn Mar 6, 2024
aff2b72
Remove docker-compose changes
charleskorn Mar 6, 2024
9ff2fd5
Reset whitespace changes
charleskorn Mar 6, 2024
a1f222f
Remove readme
charleskorn Mar 6, 2024
9c259f3
Merge branch 'main' into charleskorn/streaming-promql-engine-for-merging
charleskorn Mar 6, 2024
24b35d3
Vendor in branch of mimir-prometheus with changes from https://github…
charleskorn Mar 6, 2024
aa04c8f
Fix issues introduced with recent Prometheus changes and fix linting …
charleskorn Mar 6, 2024
6c9f83d
Detect and reject configuration not supported by streaming engine
charleskorn Mar 13, 2024
a019a40
Remove TODOs tracked elsewhere
charleskorn Mar 13, 2024
723e5f2
Bring in latest changes to `QueryEngine` interface
charleskorn Mar 13, 2024
de36024
Remove more TODOs tracked elsewhere
charleskorn Mar 13, 2024
0dbbfcb
Change signature of `Next()` to return an error when the stream is ex…
charleskorn Mar 13, 2024
15645c9
Ignore warnings when comparing results from Prometheus and streaming …
charleskorn Mar 13, 2024
d7edb4a
Correct naming
charleskorn Mar 13, 2024
a6b76d5
Rename `Operator` to clarify that it produces instant vectors.
charleskorn Mar 13, 2024
42bbfb3
Rename test file
charleskorn Mar 14, 2024
761894c
Reorganise test file
charleskorn Mar 14, 2024
c7d46ea
Bring in latest benchmark changes from upstream.
charleskorn Mar 14, 2024
9bcf6f3
Run tests and benchmarks with isolation disabled, to mirror what Mimi…
charleskorn Mar 14, 2024
c6ce4a2
Fix script permissions
charleskorn Mar 14, 2024
ff23e39
Simplify test setup and don't bother testing 2000 series case to spee…
charleskorn Mar 14, 2024
9d7354e
Use assertion helpers in benchmark
charleskorn Mar 14, 2024
61d22a1
Improve engine benchmark comparison script: fix issue with package na…
charleskorn Mar 14, 2024
eae3a5e
Remove unnecessary files
charleskorn Mar 14, 2024
cffbf10
Add note to engine benchmark comparison script
charleskorn Mar 14, 2024
a2c8491
Make test names clearer
charleskorn Mar 15, 2024
c23ea43
Ensure both engines produce the same result before benchmarking
charleskorn Mar 15, 2024
0c9bb09
Add tests for unsupported expressions and improve error messages retu…
charleskorn Mar 15, 2024
ac995e1
Fix linting warnings
charleskorn Mar 15, 2024
a9d3959
Add notes for invalid cases that should be caught by the PromQL parser.
charleskorn Mar 15, 2024
179cf09
Implement `Query.Statement()`
charleskorn Mar 15, 2024
34fdd45
Give variable a better name
charleskorn Mar 15, 2024
944306c
Remove Pool interface
charleskorn Mar 15, 2024
7b09ed4
Remove unnecessary types
charleskorn Mar 15, 2024
687f889
Update comments
charleskorn Mar 15, 2024
2815a1b
Remove unused lookback delta for range vector selector
charleskorn Mar 15, 2024
6189f25
Extract selector logic to a shared type
charleskorn Mar 22, 2024
378a8e3
Add more TODOs
charleskorn Mar 22, 2024
8006939
Use Selector for range vector selectors.
charleskorn Mar 22, 2024
dc292e1
Compute values used on every call to Next() once in instant and range…
charleskorn Mar 22, 2024
94d29aa
Merge branch 'main' into charleskorn/streaming-promql-engine-for-merging
charleskorn Mar 22, 2024
03d065a
Remove unnecessary script and update comment
charleskorn Mar 22, 2024
cbdc44c
Validate query time range
charleskorn Mar 22, 2024
017304a
Enforce that range query expression produces a vector or scalar result.
charleskorn Mar 22, 2024
e427ade
Add CLI flag to experimental features list in docs
charleskorn Mar 22, 2024
e8ace15
Add changelog entry
charleskorn Mar 22, 2024
333c8df
Fix linting warnings
charleskorn Mar 22, 2024
e3933a2
Move common PromQL engine configuration to its own package
charleskorn Mar 25, 2024
7d0182b
Don't bother with 10 series benchmarks.
charleskorn Mar 25, 2024
1bbea89
Sort imports
charleskorn Mar 25, 2024
0058fc5
Add missing license header.
charleskorn Mar 25, 2024
228548b
Use bucketed pool based on zeropool.
charleskorn Mar 25, 2024
91d1cb9
Fix import sorting
charleskorn Mar 25, 2024
6eaec63
Set upper limits for sizes of pooled slices.
charleskorn Mar 25, 2024
2034c55
Set size for SeriesBatch pool
charleskorn Mar 26, 2024
69a25d1
Add integration test.
charleskorn Mar 26, 2024
ebe368e
Move RingBuffer out of util package and pool slices used.
charleskorn Mar 26, 2024
8a1c27a
Add tests for ring buffer type.
charleskorn Mar 26, 2024
f58b52a
Return RingBuffer point slices to pool when no longer required.
charleskorn Mar 26, 2024
4b82b46
Fix linting
charleskorn Mar 26, 2024
5baaab3
Remove TODOs about pooling group slices and maps.
charleskorn Mar 26, 2024
d7f3433
Pool groups in aggregation operator.
charleskorn Mar 26, 2024
1a4dda3
Remove TODOs about pooling label builders.
charleskorn Mar 26, 2024
c5f7858
Remove more TODOs, clarify interface expectations
charleskorn Mar 26, 2024
37c958e
Release memory sooner after query execution
charleskorn Mar 26, 2024
c017273
Fix typo in comment
charleskorn Mar 27, 2024
c44074b
Remove unecessary context cancellation checks from operators.
charleskorn Mar 27, 2024
c367862
Fix linting warnings
charleskorn Mar 27, 2024
676070a
Address remaining TODOs in Selector
charleskorn Mar 27, 2024
c0dd40c
Fix integration test flakiness
charleskorn Mar 27, 2024
43929c9
Don't panic when encountering a native histogram
charleskorn Mar 27, 2024
458c366
Fix issue where bucketed pool could return a slice with capacity less…
charleskorn Mar 27, 2024
4aa8f69
Remove redundant checks
charleskorn Mar 27, 2024
2fe1b10
Add support for @ in instant and range vector selectors.
charleskorn Mar 27, 2024
fc6b340
Run Prometheus' test cases against streaming engine.
charleskorn Mar 27, 2024
52f4671
Fix failing staleness test: don't emit empty series for instant query
charleskorn Mar 28, 2024
e3f0055
Upgrade mimir-prometheus
charleskorn Mar 28, 2024
e4e8123
Add test case for stale markers in a range query.
charleskorn Mar 28, 2024
51ed802
Add test cases for @ modifier with range queries
charleskorn Mar 28, 2024
6a2623b
Check for errors while reading test script
charleskorn Mar 28, 2024
c51b047
Fix whitespace
charleskorn Mar 28, 2024
6bbc93e
Remove duplicate timestamp computation in Selector and operators
charleskorn Apr 1, 2024
1fd7ad5
Update `rate` calculation to match behaviour in https://github.com/pr…
charleskorn Apr 2, 2024
b23d086
Add an overview of the engine.
charleskorn Apr 2, 2024
30ec0c5
Fix linting issues and clarify docs.
charleskorn Apr 2, 2024
663ec17
Add test case for scenario where input to aggregation returns no resu…
charleskorn Apr 2, 2024
06ebc4d
Implement `Query.String()`, and remove stale TODOs.
charleskorn Apr 2, 2024
7bb3ca8
Clarify BucketedPool contract, and add test case for case where min s…
charleskorn Apr 2, 2024
9f9023a
Expand test coverage to include scenarios not covered by upstream tes…
charleskorn Apr 2, 2024
119160c
Elaborate on comment
charleskorn Apr 2, 2024
d5a9127
Merge branch 'main' into charleskorn/streaming-promql-engine-for-merging
charleskorn Apr 2, 2024
65e323f
Expand example in docs and fix typo
charleskorn Apr 2, 2024
c86ecf9
Remove unnecessary sorting and use non-experimental slices package in…
charleskorn Apr 2, 2024
2174241
Add missing comments.
charleskorn Apr 2, 2024
2eadbf5
Clean up temporary files in benchmark comparison script, and improve …
charleskorn Apr 3, 2024
6a1af36
Merge branch 'main' into charleskorn/streaming-promql-engine-for-merging
charleskorn Apr 5, 2024
4419c84
Add note explaining purpose of script.
charleskorn Apr 5, 2024
6e26ba3
Address PR feedback
charleskorn Apr 8, 2024
ba06a32
Replace use of `DurationMilliseconds` with `d.Milliseconds()`
charleskorn Apr 8, 2024
7ecc3fe
Rename `Series()` to `SeriesMetadata()`
charleskorn Apr 8, 2024
d2a9001
Include names of acceptable values in CLI flag description
charleskorn Apr 9, 2024
95699d6
Remove TODOs
charleskorn Apr 9, 2024
10bf1cd
Add explanation for RangeVectorSelectorWithTransformation
charleskorn Apr 9, 2024
1bdf6c6
Move to top-level `streamingpromql` package
charleskorn Apr 9, 2024
2dba89a
Move common query engine config back to original location (revert e39…
charleskorn Apr 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
* [ENHANCEMENT] Store-gateway: Add `stage="wait_max_concurrent"` to `cortex_bucket_store_series_request_stage_duration_seconds` which records how long the query had to wait for its turn for `-blocks-storage.bucket-store.max-concurrent`. #7609
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ lint: check-makefiles
faillint -paths "github.com/grafana/mimir/pkg/..." ./pkg/alertmanager/alertspb/...
faillint -paths "github.com/grafana/mimir/pkg/..." ./pkg/ruler/rulespb/...
faillint -paths "github.com/grafana/mimir/pkg/..." ./pkg/storage/sharding/...
faillint -paths "github.com/grafana/mimir/pkg/..." ./pkg/querier/engine/...
faillint -paths "github.com/grafana/mimir/pkg/..." ./pkg/querier/engine/common/...
faillint -paths "github.com/grafana/mimir/pkg/..." ./pkg/querier/api/...
faillint -paths "github.com/grafana/mimir/pkg/..." ./pkg/util/globalerror

Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1876,6 +1876,17 @@
"fieldType": "duration",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "promql_engine",
"required": false,
"desc": "PromQL engine to use, either 'standard' or 'streaming'",
"fieldValue": null,
"fieldDefaultValue": "standard",
"fieldFlag": "querier.promql-engine",
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_concurrent",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,8 @@ Usage of ./cmd/mimir/mimir:
Delay before initiating requests to further ingesters when request minimization is enabled and the initially selected set of ingesters have not all responded. Ignored if -querier.minimize-ingester-requests is not enabled. (default 3s)
-querier.prefer-streaming-chunks-from-store-gateways
[experimental] Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.
-querier.promql-engine string
[experimental] PromQL engine to use, either 'standard' or 'streaming' (default "standard")
-querier.promql-experimental-functions-enabled
[experimental] Enable experimental PromQL functions. This config option should be set on query-frontend too when query sharding is enabled.
-querier.query-ingesters-within duration
Expand Down
1 change: 1 addition & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ The following features are currently experimental:
- Maximum response size for active series queries (`-querier.active-series-results-max-size-bytes`)
- Enable PromQL experimental functions (`-querier.promql-experimental-functions-enabled`)
- Allow streaming of `/active_series` responses to the frontend (`-querier.response-streaming-enabled`)
- Streaming PromQL engine (`-querier.promql-engine=streaming`)
- Query-frontend
- `-query-frontend.querier-forget-delay`
- Instant query splitting (`-query-frontend.split-instant-queries-by-interval`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,10 @@ store_gateway_client:
# CLI flag: -querier.minimize-ingester-requests-hedging-delay
[minimize_ingester_requests_hedging_delay: <duration> | default = 3s]

# (experimental) PromQL engine to use, either 'standard' or 'streaming'
# CLI flag: -querier.promql-engine
[promql_engine: <string> | default = "standard"]

# The number of workers running in each querier process. This setting limits the
# maximum number of concurrent queries in each querier.
# CLI flag: -querier.max-concurrent
Expand Down
46 changes: 46 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,52 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
}
}

func TestStreamingPromQLEngine(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags(), map[string]string{
"-querier.promql-engine": "streaming",
})

consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))

// Wait until the distributor and querier have updated the ring.
// The distributor should have 512 tokens for the ingester ring and 1 for the distributor ring,
// and the querier should have 512 tokens for the ingester ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512+1), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push a series to Mimir.
writeClient, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

seriesName := "series_1"
seriesTimestamp := time.Now()
series, expectedVector, _ := generateFloatSeries(seriesName, seriesTimestamp, prompb.Label{Name: seriesName, Value: seriesName})

res, err := writeClient.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Query back the same series using the streaming PromQL engine.
c, err := e2emimir.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

result, err := c.Query(seriesName, seriesTimestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}

func testMetadataQueriesWithBlocksStorage(
t *testing.T,
c *e2emimir.Client,
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func NewQuerierHandler(
queryable storage.SampleAndChunkQueryable,
exemplarQueryable storage.ExemplarQueryable,
metadataSupplier querier.MetadataSupplier,
engine *promql.Engine,
engine promql.QueryEngine,
distributor Distributor,
reg prometheus.Registerer,
logger log.Logger,
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ type Mimir struct {
QuerierQueryable prom_storage.SampleAndChunkQueryable
ExemplarQueryable prom_storage.ExemplarQueryable
MetadataSupplier querier.MetadataSupplier
QuerierEngine *promql.Engine
QuerierEngine promql.QueryEngine
QueryFrontendTripperware querymiddleware.Tripperware
QueryFrontendCodec querymiddleware.Codec
Ruler *ruler.Ruler
Expand Down
4 changes: 4 additions & 0 deletions pkg/mimir/mimir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/grafana/mimir/pkg/distributor"
"github.com/grafana/mimir/pkg/frontend/v1/frontendv1pb"
"github.com/grafana/mimir/pkg/ingester"
"github.com/grafana/mimir/pkg/querier"
"github.com/grafana/mimir/pkg/ruler"
"github.com/grafana/mimir/pkg/ruler/rulestore"
"github.com/grafana/mimir/pkg/scheduler/schedulerpb"
Expand Down Expand Up @@ -161,6 +162,9 @@ func TestMimir(t *testing.T) {
ReplicationFactor: 1,
InstanceInterfaceNames: []string{"en0", "eth0", "lo0", "lo"},
}},
Querier: querier.Config{
PromQLEngine: "standard",
},
}
require.NoError(t, cfg.Server.LogLevel.Set("info"))

Expand Down
15 changes: 11 additions & 4 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (
"github.com/grafana/mimir/pkg/frontend/transport"
"github.com/grafana/mimir/pkg/ingester"
"github.com/grafana/mimir/pkg/querier"
"github.com/grafana/mimir/pkg/querier/engine"
"github.com/grafana/mimir/pkg/querier/engine/common"
"github.com/grafana/mimir/pkg/querier/tenantfederation"
querier_worker "github.com/grafana/mimir/pkg/querier/worker"
"github.com/grafana/mimir/pkg/ruler"
Expand Down Expand Up @@ -500,9 +500,12 @@ func (t *Mimir) initQueryable() (serv services.Service, err error) {
registerer := prometheus.WrapRegistererWith(querierEngine, t.Registerer)

// Create a querier queryable and PromQL engine
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine, err = querier.New(
t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryable, registerer, util_log.Logger, t.ActivityTracker,
)
if err != nil {
return nil, fmt.Errorf("could not create queryable: %w", err)
}

// Use the distributor to return metric metadata by default
t.MetadataSupplier = t.Distributor
Expand Down Expand Up @@ -718,7 +721,7 @@ func (t *Mimir) initQueryFrontendCodec() (services.Service, error) {
func (t *Mimir) initQueryFrontendTripperware() (serv services.Service, err error) {
promqlEngineRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "query-frontend"}, t.Registerer)

engineOpts, engineExperimentalFunctionsEnabled := engine.NewPromQLEngineOptions(t.Cfg.Querier.EngineConfig, t.ActivityTracker, util_log.Logger, promqlEngineRegisterer)
engineOpts, engineExperimentalFunctionsEnabled := common.NewPromQLEngineOptions(t.Cfg.Querier.EngineConfig, t.ActivityTracker, util_log.Logger, promqlEngineRegisterer)

tripperware, err := querymiddleware.NewTripperware(
t.Cfg.Frontend.QueryMiddleware,
Expand Down Expand Up @@ -842,7 +845,11 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
// TODO: Consider wrapping logger to differentiate from querier module logger
rulerRegisterer := prometheus.WrapRegistererWith(rulerEngine, t.Registerer)

queryable, _, eng := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryable, rulerRegisterer, util_log.Logger, t.ActivityTracker)
queryable, _, eng, err := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryable, rulerRegisterer, util_log.Logger, t.ActivityTracker)
if err != nil {
return nil, fmt.Errorf("could not create queryable for ruler: %w", err)
}

queryable = querier.NewErrorTranslateQueryableWithFn(queryable, ruler.WrapQueryableErrors)

if t.Cfg.Ruler.TenantFederation.Enabled {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package engine
package common

import (
"flag"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package engine
package common

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package engine
package common

import (
"context"
Expand Down
97 changes: 97 additions & 0 deletions pkg/querier/engine/streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Streaming PromQL engine
charleskorn marked this conversation as resolved.
Show resolved Hide resolved

This file contains a brief overview of the internals of the streaming PromQL engine.

For an introduction to the engine itself and the problems it tries to solve, check out [this PromCon 2023 talk](https://www.youtube.com/watch?v=3kM2Asj6hcg).

The goal of the engine is to allow evaluating queries over millions of series in a safe, performant and cost-effective way.
To allow this, the engine aims to ensure that peak memory consumption of queriers is not proportional to the number of series selected.
This will make it safe for operators to loosen the various query-related limits without risking the stability of their Mimir cluster or needing to devote enormous amounts of compute resources to queriers.

The key way the engine achieves this is by not loading all the input series into memory at once, and instead streaming them into memory when needed.

For example, let's say we're evaluating the query `sum by (environment) (some_metric{cluster="cluster-1"})`.

Prometheus' PromQL engine will first load all samples for all series selected by `some_metric{cluster="cluster-1"}` into memory.
It will then compute the sum for each unique value of `environment`.
At its peak, Prometheus' PromQL engine will hold all samples for all input series (from `some_metric{cluster="cluster-1"}`) and all samples for all output series in memory at once.

The streaming engine here will instead execute the selector `some_metric{cluster="cluster-1"}` and gather the labels of all series returned.
With these labels, it will then compute all the possible output series for the `sum by (environment)` operation (ie. one output series per unique value of `environment`).
Having computed the output series, it will then begin reading series from the selector, one at a time, and update the running total for the appropriate output series.
At its peak, the streaming engine in this example will hold all samples for one input series and all samples for all output series in memory at once[^1],
a significant reduction compared to Prometheus' PromQL engine, particularly when the selector selects many series.

This idea of streaming can be applied to multiple levels as well. Imagine we're evaluating the query `max(sum by (environment) (some_metric{cluster="cluster-1"}))`.
In the streaming engine, once the result of each group series produced by `sum` is complete, it is passed to `max`, which can update its running maximum seen so far across all groups.
At its peak, the streaming engine will hold all samples for one input series, all samples for all incomplete `sum` group series, and the single incomplete `max` output series in memory at once.

## Internals

Within the streaming engine, a query is represented by a set of linked operators (one for each operation) that together form the query plan.

For example, the `max(sum by (environment) (some_metric{cluster="cluster-1"}))` example from before would have a query plan made up of three operators:

- The instant vector selector operator (`some_metric{cluster="cluster-1"}`)
- The `sum` aggregation operator (`sum by (environment) (...)`), which consumes series from the instant vector selector operator
- The `max` aggregation operator (`max (...)`), which consumes series from the `sum` aggregation operator

Visually, the plan looks like this:

```mermaid
flowchart TB
IVS["`**instant vector selector**
some_metric#123;cluster=#quot;cluster-1#quot;#125;`"]
sum["`**sum aggregation**
sum by (environment) (...)`"]
max["`**max aggregation**
max (...)`"]
output((output))
IVS --> sum
sum --> max
max --> output
```

Each of these operators satisfies the `InstantVectorOperator` interface, defined [here](./operator/operator.go).
The two key methods of this interface are `SeriesMetadata()` and `Next()`:

`SeriesMetadata()` returns the list of all series' labels that will be returned by the operator[^2].
In our example, the instant vector selector operator would return all the matching `some_metric` series, and the `sum` aggregation operator would return one series for each unique value of `environment`.

`Next()` is then called by the consuming operator to read each series' data, one series at a time.
In our example, the `sum` aggregation operator would call `Next()` on the instant vector selector operator to get the first series' data, then again to get the second series' data and so on.

Elaborating on the example from before, the overall query would proceed like this, assuming the request is received over HTTP:

1. query HTTP API handler calls `Engine.NewInstantQuery()` or `Engine.NewRangeQuery()` as appropriate ([source](./engine.go))
1. engine parses PromQL expression using Prometheus' PromQL parser, producing an abstract syntax tree (AST) ([source](./query.go))
1. engine converts AST produced by PromQL parser to query plan ([source](./query.go))
1. engine returns created `Query` instance
1. query HTTP API handler calls `Query.Exec()`
1. `Query.Exec()` calls `SeriesMetadata()` on `max` aggregation operator
1. `max` aggregation operator calls `SeriesMetadata()` on `sum` aggregation operator
1. `sum` aggregation operator calls `SeriesMetadata()` on instant vector selector operator
- instant vector selector operator issues `Select()` call, which retrieves labels from ingesters and store-gateways
1. `sum` aggregation operator computes output series (one per unique value of `environment`) based on input series from instant vector selector
1. `max` aggregation operator computes output series based on input series from `sum` aggregation operator
- in this case, there's just one output series, given no grouping is being performed
1. root of the query calls `Next()` on `max` aggregation operator until all series have been returned
1. `max` aggregation operator calls `Next()` on `sum` aggregation operator
1. `sum` aggregation operator calls `Next()` on instant vector selector operator
- instant vector selector returns samples for next series
1. `sum` aggregation operator updates its running totals for the relevant output series
1. if all input series have now been seen for the output series just updated, `sum` aggregation operator returns that output series and removes it from its internal state
1. otherwise, it calls `Next()` again and repeats
1. `max` aggregation operator updates its running maximum based on the series returned
1. if all input series have been seen, `max` aggregation operator returns
1. otherwise, it calls `Next()` again and repeats
1. query HTTP API handler converts returned result to wire format (either JSON or Protobuf) and sends to caller
1. query HTTP API handler calls `Query.Close()` to release remaining resources

[^1]:
This isn't strictly correct, as chunks streaming will buffer chunks for some series in memory as they're received over the network, and it ignores the initial memory consumption caused by the non-streaming calls to `SeriesMetadata()`.
But this applies equally to both engines when used in Mimir.

[^2]:
This isn't done in a streaming fashion: all series' labels are loaded into memory at once.
In a future iteration of the engine, `SeriesMetadata()` could be made streaming as well, but this is out of scope for now.
17 changes: 17 additions & 0 deletions pkg/querier/engine/streaming/compare.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#! /usr/bin/env bash
# SPDX-License-Identifier: AGPL-3.0-only
# This script compares benchmark results for the two engines.

set -euo pipefail

RESULTS_FILE="$1" # Should be the path to a file produced by a command like `go test -run=XXX -bench="BenchmarkQuery" -count=6 -benchmem -timeout=1h .`

STANDARD_RESULTS_FILE=$(mktemp /tmp/standard.XXXX)
STREAMING_RESULTS_FILE=$(mktemp /tmp/streaming.XXXX)

grep --invert-match "streaming-" "$RESULTS_FILE" | sed -E 's#/standard-[0-9]+##g' > "$STANDARD_RESULTS_FILE"
grep --invert-match "standard-" "$RESULTS_FILE" | sed -E 's#/streaming-[0-9]+##g' > "$STREAMING_RESULTS_FILE"

benchstat "$STANDARD_RESULTS_FILE" "$STREAMING_RESULTS_FILE" | sed "s#$STANDARD_RESULTS_FILE# standard #g" | sed "s#$STREAMING_RESULTS_FILE# streaming #g"

rm "$STANDARD_RESULTS_FILE" "$STREAMING_RESULTS_FILE"
Loading
Loading