Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming PromQL engine: add support for range vector selectors as the top-level expression #8023

Merged
merged 22 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c61dc12
Introduce RangeVectorOperator interface and split RangeVectorSelector…
charleskorn May 2, 2024
ccc00c9
Add more explanation for RangeVectorStepData.
charleskorn May 2, 2024
07000f5
Improve performance of rate() calculations by exposing range vector o…
charleskorn May 2, 2024
6f08073
Add changelog entry
charleskorn May 2, 2024
2935a5f
Rename RingBuffer.Points() to HeadAndTail(), and add CopyPoints() met…
charleskorn May 2, 2024
bd6c902
Add support for range vector selectors as the top-level expression.
charleskorn May 2, 2024
81db06f
Remove outdated comment
charleskorn May 2, 2024
e5404f8
Clarify more comments.
charleskorn May 2, 2024
477700b
Simplify test setup
charleskorn May 6, 2024
a437465
Ensure matrix series are always sorted by labels
charleskorn May 6, 2024
43e6c5c
Rename `Next` to `NextSeries`
charleskorn May 6, 2024
ee64985
Document whether RangeStart and RangeEnd are inclusive or exclusive.
charleskorn May 9, 2024
529231a
Document when Selector.Timestamp is set.
charleskorn May 9, 2024
6505978
Rename `NextStep` to `NextStepSamples`
charleskorn May 9, 2024
a405e08
Rename `RingBuffer.HeadAndTail` to `RingBuffer.UnsafePoints`
charleskorn May 9, 2024
76df7b1
Remove unnecessary checks in `Close` methods
charleskorn May 9, 2024
34fa59d
Fix issue where range vector selectors incorrectly return points from…
charleskorn May 9, 2024
a9c6d40
Add tests to verify behaviour when a series contains a stale marker.
charleskorn May 9, 2024
3d59500
Address PR feedback: name return parameters
charleskorn May 9, 2024
71298d7
Address PR feedback: query metric that exists but has no samples in r…
charleskorn May 9, 2024
867303d
Merge branch 'main' into charleskorn/streaming-promql-range-vector-se…
charleskorn May 9, 2024
27c3446
Fix breaking change after merge
charleskorn May 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [ENHANCEMENT] Distributor: add experimental limit for exemplars per series per request, enabled with `-distributor.max-exemplars-per-series-per-request`, the number of discarded exemplars are tracked with `cortex_discarded_exemplars_total{reason="too_many_exemplars_per_series_per_request"}` #7989 #8010
Expand Down
16 changes: 8 additions & 8 deletions pkg/streamingpromql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ flowchart TB
```

Each of these operators satisfies the `InstantVectorOperator` interface, defined [here](./operator/operator.go).
The two key methods of this interface are `SeriesMetadata()` and `Next()`:
The two key methods of this interface are `SeriesMetadata()` and `NextSeries()`:

`SeriesMetadata()` returns the list of all series' labels that will be returned by the operator[^2].
In our example, the instant vector selector operator would return all the matching `some_metric` series, and the `sum` aggregation operator would return one series for each unique value of `environment`.

`Next()` is then called by the consuming operator to read each series' data, one series at a time.
In our example, the `sum` aggregation operator would call `Next()` on the instant vector selector operator to get the first series' data, then again to get the second series' data and so on.
`NextSeries()` is then called by the consuming operator to read each series' data, one series at a time.
In our example, the `sum` aggregation operator would call `NextSeries()` on the instant vector selector operator to get the first series' data, then again to get the second series' data and so on.

Elaborating on the example from before, the overall query would proceed like this, assuming the request is received over HTTP:

Expand All @@ -75,16 +75,16 @@ Elaborating on the example from before, the overall query would proceed like thi
1. `sum` aggregation operator computes output series (one per unique value of `environment`) based on input series from instant vector selector
1. `max` aggregation operator computes output series based on input series from `sum` aggregation operator
- in this case, there's just one output series, given no grouping is being performed
1. root of the query calls `Next()` on `max` aggregation operator until all series have been returned
1. `max` aggregation operator calls `Next()` on `sum` aggregation operator
1. `sum` aggregation operator calls `Next()` on instant vector selector operator
1. root of the query calls `NextSeries()` on `max` aggregation operator until all series have been returned
1. `max` aggregation operator calls `NextSeries()` on `sum` aggregation operator
1. `sum` aggregation operator calls `NextSeries()` on instant vector selector operator
- instant vector selector returns samples for next series
1. `sum` aggregation operator updates its running totals for the relevant output series
1. if all input series have now been seen for the output series just updated, `sum` aggregation operator returns that output series and removes it from its internal state
1. otherwise, it calls `Next()` again and repeats
1. otherwise, it calls `NextSeries()` again and repeats
1. `max` aggregation operator updates its running maximum based on the series returned
1. if all input series have been seen, `max` aggregation operator returns
1. otherwise, it calls `Next()` again and repeats
1. otherwise, it calls `NextSeries()` again and repeats
1. query HTTP API handler converts returned result to wire format (either JSON or Protobuf) and sends to caller
1. query HTTP API handler calls `Query.Close()` to release remaining resources

Expand Down
18 changes: 14 additions & 4 deletions pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
var MetricSizes = []int{1, 100, 2000}

type BenchCase struct {
Expr string
Steps int
Expr string
Steps int
InstantQueryOnly bool
}

func (c BenchCase) Name() string {
Expand Down Expand Up @@ -75,6 +76,11 @@ func TestCases(metricSizes []int) []BenchCase {
{
Expr: "a_X",
},
// Range vector selector.
{
Expr: "a_X[1m]",
InstantQueryOnly: true,
},
// Simple rate.
{
Expr: "rate(a_X[1m])",
Expand Down Expand Up @@ -198,7 +204,7 @@ func TestCases(metricSizes []int) []BenchCase {
tmp = append(tmp, c)
} else {
for _, count := range metricSizes {
tmp = append(tmp, BenchCase{Expr: strings.ReplaceAll(c.Expr, "X", strconv.Itoa(count)), Steps: c.Steps})
tmp = append(tmp, BenchCase{Expr: strings.ReplaceAll(c.Expr, "X", strconv.Itoa(count)), Steps: c.Steps, InstantQueryOnly: c.InstantQueryOnly})
}
}
}
Expand All @@ -207,7 +213,11 @@ func TestCases(metricSizes []int) []BenchCase {
// No step will be replaced by cases with the standard step.
tmp = []BenchCase{}
for _, c := range cases {
if c.Steps != 0 {
if c.Steps != 0 || c.InstantQueryOnly {
if c.InstantQueryOnly && c.Steps != 0 {
panic(fmt.Sprintf("invalid test case '%v': configured as instant query with non-zero number of steps %v", c.Expr, c.Steps))
}

if c.Steps >= NumIntervals {
// Note that this doesn't check we have enough data to cover any range selectors.
panic(fmt.Sprintf("invalid test case '%v' with %v steps: test setup only creates %v steps", c.Expr, c.Steps, NumIntervals))
Expand Down
115 changes: 99 additions & 16 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
)
Expand All @@ -23,16 +25,16 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// The goal of this is not to list every conceivable expression that is unsupported, but to cover all the
// different cases and make sure we produce a reasonable error message when these cases are encountered.
unsupportedExpressions := map[string]string{
"a + b": "PromQL expression type *parser.BinaryExpr",
"1 + 2": "PromQL expression type *parser.BinaryExpr",
"metric{} + other_metric{}": "PromQL expression type *parser.BinaryExpr",
"1": "PromQL expression type *parser.NumberLiteral",
"metric{} offset 2h": "instant vector selector with 'offset'",
"avg(metric{})": "'avg' aggregation",
"sum without(l) (metric{})": "grouping with 'without'",
"rate(metric{}[5m] offset 2h)": "range vector selector with 'offset'",
"avg_over_time(metric{}[5m])": "'avg_over_time' function",
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr",
"a + b": "PromQL expression type *parser.BinaryExpr",
"1 + 2": "scalar value as top-level expression",
"metric{} + other_metric{}": "PromQL expression type *parser.BinaryExpr",
"1": "scalar value as top-level expression",
"metric{} offset 2h": "instant vector selector with 'offset'",
"avg(metric{})": "'avg' aggregation",
"sum without(l) (metric{})": "grouping with 'without'",
"rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr",
"avg_over_time(metric{}[5m])": "'avg_over_time' function",
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr",
}

for expression, expectedError := range unsupportedExpressions {
Expand All @@ -53,12 +55,8 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {

// These expressions are also unsupported, but are only valid as instant queries.
unsupportedInstantQueryExpressions := map[string]string{
"'a'": "PromQL expression type *parser.StringLiteral",
"metric{}[5m]": "PromQL expression type *parser.MatrixSelector",
"metric{}[5m] offset 2h": "PromQL expression type *parser.MatrixSelector",
"metric{}[5m] @ 123": "PromQL expression type *parser.MatrixSelector",
"metric{}[5m] @ start()": "PromQL expression type *parser.MatrixSelector",
"metric{}[5m] @ end()": "PromQL expression type *parser.MatrixSelector",
"'a'": "string value as top-level expression",
"metric{}[5m] offset 2h": "range vector selector with 'offset'",
"metric{}[5m:1m]": "PromQL expression type *parser.SubqueryExpr",
}

Expand Down Expand Up @@ -159,3 +157,88 @@ func TestOurTestCases(t *testing.T) {
})
}
}

// Testing instant queries that return a range vector is not supported by Prometheus' PromQL testing framework,
// and adding support for this would be quite involved.
//
// So instead, we test these few cases here instead.
func TestRangeVectorSelectors(t *testing.T) {
opts := NewTestEngineOpts()
streamingEngine, err := NewEngine(opts)
require.NoError(t, err)

prometheusEngine := promql.NewEngine(opts)

baseT := timestamp.Time(0)
storage := promql.LoadedStorage(t, `
load 1m
some_metric{env="1"} 0+1x4
some_metric{env="2"} 0+2x4
`)
t.Cleanup(func() { require.NoError(t, storage.Close()) })

testCases := map[string]struct {
expr string
expected *promql.Result
ts time.Time
}{
"matches series with points in range": {
expr: "some_metric[1m]",
ts: baseT.Add(2 * time.Minute),
expected: &promql.Result{
Value: promql.Matrix{
{
Metric: labels.FromStrings("__name__", "some_metric", "env", "1"),
Floats: []promql.FPoint{
{T: timestamp.FromTime(baseT.Add(time.Minute)), F: 1},
{T: timestamp.FromTime(baseT.Add(2 * time.Minute)), F: 2},
},
},
{
Metric: labels.FromStrings("__name__", "some_metric", "env", "2"),
Floats: []promql.FPoint{
{T: timestamp.FromTime(baseT.Add(time.Minute)), F: 2},
{T: timestamp.FromTime(baseT.Add(2 * time.Minute)), F: 4},
},
},
},
},
},
"matches no series": {
expr: "some_nonexistent_metric[1m]",
ts: baseT,
expected: &promql.Result{
Value: promql.Matrix{},
},
},
"no samples in range": {
expr: "some_nonexistent_metric[1m]",
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
ts: baseT.Add(20 * time.Minute),
expected: &promql.Result{
Value: promql.Matrix{},
},
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
runTest := func(t *testing.T, eng promql.QueryEngine, expr string, ts time.Time, expected *promql.Result) {
q, err := eng.NewInstantQuery(context.Background(), storage, nil, expr, ts)
require.NoError(t, err)
defer q.Close()

res := q.Exec(context.Background())
require.Equal(t, expected, res)
}

t.Run("streaming engine", func(t *testing.T) {
runTest(t, streamingEngine, testCase.expr, testCase.ts, testCase.expected)
})

// Run the tests against Prometheus' engine to ensure our test cases are valid.
t.Run("Prometheus' engine", func(t *testing.T) {
runTest(t, prometheusEngine, testCase.expr, testCase.ts, testCase.expected)
})
})
}
}
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operator/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (a *Aggregation) labelsForGroup(m labels.Labels, lb *labels.Builder) labels
return lb.Labels()
}

func (a *Aggregation) Next(ctx context.Context) (InstantVectorSeriesData, error) {
func (a *Aggregation) NextSeries(ctx context.Context) (InstantVectorSeriesData, error) {
if len(a.remainingGroups) == 0 {
// No more groups left.
return InstantVectorSeriesData{}, EOS
Expand All @@ -127,7 +127,7 @@ func (a *Aggregation) Next(ctx context.Context) (InstantVectorSeriesData, error)

// Iterate through inner series until the desired group is complete
for thisGroup.remainingSeriesCount > 0 {
s, err := a.Inner.Next(ctx)
s, err := a.Inner.NextSeries(ctx)

if err != nil {
if errors.Is(err, EOS) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operator/instant_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ type InstantVectorSelector struct {
var _ InstantVectorOperator = &InstantVectorSelector{}

func (v *InstantVectorSelector) SeriesMetadata(ctx context.Context) ([]SeriesMetadata, error) {
// Compute value we need on every call to Next() once, here.
// Compute value we need on every call to NextSeries() once, here.
v.numSteps = stepCount(v.Selector.Start, v.Selector.End, v.Selector.Interval)

return v.Selector.SeriesMetadata(ctx)
}

func (v *InstantVectorSelector) Next(_ context.Context) (InstantVectorSeriesData, error) {
func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeriesData, error) {
if v.memoizedIterator == nil {
v.memoizedIterator = storage.NewMemoizedEmptyIterator(v.Selector.LookbackDelta.Milliseconds())
}
Expand Down
82 changes: 73 additions & 9 deletions pkg/streamingpromql/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,62 @@ package operator
import (
"context"
"errors"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
)

// InstantVectorOperator represents all operators that produce instant vectors.
type InstantVectorOperator interface {
// Operator represents all operators.
type Operator interface {
// SeriesMetadata returns a list of all series that will be returned by this operator.
// The returned []SeriesMetadata can be modified by the caller or returned to a pool.
// SeriesMetadata may return series in any order, but the same order must be used by both SeriesMetadata and Next.
// SeriesMetadata may return series in any order, but the same order must be used by both SeriesMetadata and NextSeries.
// SeriesMetadata should be called no more than once.
SeriesMetadata(ctx context.Context) ([]SeriesMetadata, error)

// Next returns the next series from this operator, or EOS otherwise.
// SeriesMetadata must be called exactly once before calling Next.
// Close frees all resources associated with this operator.
// Calling SeriesMetadata or NextSeries after calling Close may result in unpredictable behaviour, corruption or crashes.
Close()
}

// InstantVectorOperator represents all operators that produce instant vectors.
type InstantVectorOperator interface {
Operator

// NextSeries returns the next series from this operator, or EOS if no more series are available.
// SeriesMetadata must be called exactly once before calling NextSeries.
// The returned InstantVectorSeriesData can be modified by the caller or returned to a pool.
// The returned InstantVectorSeriesData can contain no points.
Next(ctx context.Context) (InstantVectorSeriesData, error)
NextSeries(ctx context.Context) (InstantVectorSeriesData, error)
}

// Close frees all resources associated with this operator.
// Calling SeriesMetadata or Next after calling Close may result in unpredictable behaviour, corruption or crashes.
Close()
// RangeVectorOperator represents all operators that produce range vectors.
type RangeVectorOperator interface {
Operator

// StepCount returns the number of time steps produced for each series by this operator.
// StepCount must only be called after calling SeriesMetadata.
StepCount() int

// Range returns the time range selected by this operator at each time step.
//
// For example, if this operator represents the selector "some_metric[5m]", Range returns 5 minutes.
Range() time.Duration

// NextSeries advances to the next series produced by this operator, or EOS if no more series are available.
// SeriesMetadata must be called exactly once before calling NextSeries.
NextSeries(ctx context.Context) error

// NextStep populates the provided RingBuffer with the samples for the next time step for the
// current series and returns the timestamps of the next time step, or returns EOS if no more time
// steps are available.
// The provided RingBuffer is expected to only contain points for the current series, and the same
// RingBuffer should be passed to subsequent NextStep calls for the same series.
// The provided RingBuffer may be populated with points beyond the end of the expected time range, and
// callers should compare returned points' timestamps to the returned RangeVectorStepData.RangeEnd.
// Next must be called at least once before calling NextStep.
NextStep(floats *RingBuffer) (RangeVectorStepData, error)
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
}

var EOS = errors.New("operator stream exhausted") //nolint:revive
Expand All @@ -39,3 +73,33 @@ type InstantVectorSeriesData struct {
Floats []promql.FPoint
Histograms []promql.HPoint
}

// RangeVectorStepData contains the timestamps associated with a single time step produced by a
// RangeVectorOperator.
//
// All values are in milliseconds since the Unix epoch.
//
// For example, if the operator represents the selector "some_metric[5m]", and this time step is for
// 2024-05-02T00:00:00Z, then:
// - StepT is 1714608000000 (2024-05-02T00:00:00Z)
// - RangeStart is 1714607700000 (2024-05-01T23:55:00Z)
// - RangeEnd is 1714608000000 (2024-05-02T00:00:00Z)
//
// If the operator represents the selector "some_metric[5m] @ 1712016000", and this time step is for
// 2024-05-02T00:00:00Z, then:
// - StepT is 1714608000000 (2024-05-02T00:00:00Z)
// - RangeStart is 1712015700000 (2024-04-01T23:55:00Z)
// - RangeEnd is 1712016000000 (2024-04-02T00:00:00Z)
type RangeVectorStepData struct {
// StepT is the timestamp of this time step.
StepT int64

// RangeStart is the beginning of the time range selected by this time step.
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
RangeStart int64

// RangeEnd is the end of the time range selected by this time step.
// RangeEnd is the same as StepT except when the @ modifier or offsets are used, in which case
// RangeEnd reflects the time of the underlying points, and StepT is the timestamp of the point
// produced by the query.
RangeEnd int64
}
Loading
Loading