diff --git a/CHANGELOG.md b/CHANGELOG.md index 8595b63f949..e3c34651df9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ * [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747 * [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653 * [FEATURE] New `-.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8058 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 * [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739 * [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698 * [ENHANCEMENT] Reduced memory allocations in functions used to propagate contextual information between gRPC calls. #7529 diff --git a/pkg/streamingpromql/README.md b/pkg/streamingpromql/README.md index cc10eeebb2b..aba8b6abfe2 100644 --- a/pkg/streamingpromql/README.md +++ b/pkg/streamingpromql/README.md @@ -53,13 +53,13 @@ flowchart TB ``` Each of these operators satisfies the `InstantVectorOperator` interface, defined [here](./operator/operator.go). -The two key methods of this interface are `SeriesMetadata()` and `Next()`: +The two key methods of this interface are `SeriesMetadata()` and `NextSeries()`: `SeriesMetadata()` returns the list of all series' labels that will be returned by the operator[^2]. In our example, the instant vector selector operator would return all the matching `some_metric` series, and the `sum` aggregation operator would return one series for each unique value of `environment`. -`Next()` is then called by the consuming operator to read each series' data, one series at a time. -In our example, the `sum` aggregation operator would call `Next()` on the instant vector selector operator to get the first series' data, then again to get the second series' data and so on. +`NextSeries()` is then called by the consuming operator to read each series' data, one series at a time. +In our example, the `sum` aggregation operator would call `NextSeries()` on the instant vector selector operator to get the first series' data, then again to get the second series' data and so on. Elaborating on the example from before, the overall query would proceed like this, assuming the request is received over HTTP: @@ -75,16 +75,16 @@ Elaborating on the example from before, the overall query would proceed like thi 1. `sum` aggregation operator computes output series (one per unique value of `environment`) based on input series from instant vector selector 1. `max` aggregation operator computes output series based on input series from `sum` aggregation operator - in this case, there's just one output series, given no grouping is being performed - 1. root of the query calls `Next()` on `max` aggregation operator until all series have been returned - 1. `max` aggregation operator calls `Next()` on `sum` aggregation operator - 1. `sum` aggregation operator calls `Next()` on instant vector selector operator + 1. root of the query calls `NextSeries()` on `max` aggregation operator until all series have been returned + 1. `max` aggregation operator calls `NextSeries()` on `sum` aggregation operator + 1. `sum` aggregation operator calls `NextSeries()` on instant vector selector operator - instant vector selector returns samples for next series 1. `sum` aggregation operator updates its running totals for the relevant output series 1. if all input series have now been seen for the output series just updated, `sum` aggregation operator returns that output series and removes it from its internal state - 1. otherwise, it calls `Next()` again and repeats + 1. otherwise, it calls `NextSeries()` again and repeats 1. `max` aggregation operator updates its running maximum based on the series returned 1. if all input series have been seen, `max` aggregation operator returns - 1. otherwise, it calls `Next()` again and repeats + 1. otherwise, it calls `NextSeries()` again and repeats 1. query HTTP API handler converts returned result to wire format (either JSON or Protobuf) and sends to caller 1. query HTTP API handler calls `Query.Close()` to release remaining resources diff --git a/pkg/streamingpromql/benchmarks/benchmarks.go b/pkg/streamingpromql/benchmarks/benchmarks.go index 3dd72db58ee..4277730a0f0 100644 --- a/pkg/streamingpromql/benchmarks/benchmarks.go +++ b/pkg/streamingpromql/benchmarks/benchmarks.go @@ -25,8 +25,9 @@ import ( var MetricSizes = []int{1, 100, 2000} type BenchCase struct { - Expr string - Steps int + Expr string + Steps int + InstantQueryOnly bool } func (c BenchCase) Name() string { @@ -75,6 +76,11 @@ func TestCases(metricSizes []int) []BenchCase { { Expr: "a_X", }, + // Range vector selector. + { + Expr: "a_X[1m]", + InstantQueryOnly: true, + }, // Simple rate. { Expr: "rate(a_X[1m])", @@ -198,7 +204,7 @@ func TestCases(metricSizes []int) []BenchCase { tmp = append(tmp, c) } else { for _, count := range metricSizes { - tmp = append(tmp, BenchCase{Expr: strings.ReplaceAll(c.Expr, "X", strconv.Itoa(count)), Steps: c.Steps}) + tmp = append(tmp, BenchCase{Expr: strings.ReplaceAll(c.Expr, "X", strconv.Itoa(count)), Steps: c.Steps, InstantQueryOnly: c.InstantQueryOnly}) } } } @@ -207,7 +213,11 @@ func TestCases(metricSizes []int) []BenchCase { // No step will be replaced by cases with the standard step. tmp = []BenchCase{} for _, c := range cases { - if c.Steps != 0 { + if c.Steps != 0 || c.InstantQueryOnly { + if c.InstantQueryOnly && c.Steps != 0 { + panic(fmt.Sprintf("invalid test case '%v': configured as instant query with non-zero number of steps %v", c.Expr, c.Steps)) + } + if c.Steps >= NumIntervals { // Note that this doesn't check we have enough data to cover any range selectors. panic(fmt.Sprintf("invalid test case '%v' with %v steps: test setup only creates %v steps", c.Expr, c.Steps, NumIntervals)) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 05bc841b8cd..2d7cbdfbe31 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" ) @@ -23,16 +25,16 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { // The goal of this is not to list every conceivable expression that is unsupported, but to cover all the // different cases and make sure we produce a reasonable error message when these cases are encountered. unsupportedExpressions := map[string]string{ - "a + b": "PromQL expression type *parser.BinaryExpr", - "1 + 2": "PromQL expression type *parser.BinaryExpr", - "metric{} + other_metric{}": "PromQL expression type *parser.BinaryExpr", - "1": "PromQL expression type *parser.NumberLiteral", - "metric{} offset 2h": "instant vector selector with 'offset'", - "avg(metric{})": "'avg' aggregation", - "sum without(l) (metric{})": "grouping with 'without'", - "rate(metric{}[5m] offset 2h)": "range vector selector with 'offset'", - "avg_over_time(metric{}[5m])": "'avg_over_time' function", - "-sum(metric{})": "PromQL expression type *parser.UnaryExpr", + "a + b": "PromQL expression type *parser.BinaryExpr", + "1 + 2": "scalar value as top-level expression", + "metric{} + other_metric{}": "PromQL expression type *parser.BinaryExpr", + "1": "scalar value as top-level expression", + "metric{} offset 2h": "instant vector selector with 'offset'", + "avg(metric{})": "'avg' aggregation", + "sum without(l) (metric{})": "grouping with 'without'", + "rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr", + "avg_over_time(metric{}[5m])": "'avg_over_time' function", + "-sum(metric{})": "PromQL expression type *parser.UnaryExpr", } for expression, expectedError := range unsupportedExpressions { @@ -53,12 +55,8 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { // These expressions are also unsupported, but are only valid as instant queries. unsupportedInstantQueryExpressions := map[string]string{ - "'a'": "PromQL expression type *parser.StringLiteral", - "metric{}[5m]": "PromQL expression type *parser.MatrixSelector", - "metric{}[5m] offset 2h": "PromQL expression type *parser.MatrixSelector", - "metric{}[5m] @ 123": "PromQL expression type *parser.MatrixSelector", - "metric{}[5m] @ start()": "PromQL expression type *parser.MatrixSelector", - "metric{}[5m] @ end()": "PromQL expression type *parser.MatrixSelector", + "'a'": "string value as top-level expression", + "metric{}[5m] offset 2h": "range vector selector with 'offset'", "metric{}[5m:1m]": "PromQL expression type *parser.SubqueryExpr", } @@ -159,3 +157,120 @@ func TestOurTestCases(t *testing.T) { }) } } + +// Testing instant queries that return a range vector is not supported by Prometheus' PromQL testing framework, +// and adding support for this would be quite involved. +// +// So instead, we test these few cases here instead. +func TestRangeVectorSelectors(t *testing.T) { + opts := NewTestEngineOpts() + streamingEngine, err := NewEngine(opts) + require.NoError(t, err) + + prometheusEngine := promql.NewEngine(opts) + + baseT := timestamp.Time(0) + storage := promql.LoadedStorage(t, ` + load 1m + some_metric{env="1"} 0+1x4 + some_metric{env="2"} 0+2x4 + some_metric_with_gaps 0 1 _ 3 + some_metric_with_stale_marker 0 1 stale 3 + `) + t.Cleanup(func() { require.NoError(t, storage.Close()) }) + + testCases := map[string]struct { + expr string + expected *promql.Result + ts time.Time + }{ + "matches series with points in range": { + expr: "some_metric[1m]", + ts: baseT.Add(2 * time.Minute), + expected: &promql.Result{ + Value: promql.Matrix{ + { + Metric: labels.FromStrings("__name__", "some_metric", "env", "1"), + Floats: []promql.FPoint{ + {T: timestamp.FromTime(baseT.Add(time.Minute)), F: 1}, + {T: timestamp.FromTime(baseT.Add(2 * time.Minute)), F: 2}, + }, + }, + { + Metric: labels.FromStrings("__name__", "some_metric", "env", "2"), + Floats: []promql.FPoint{ + {T: timestamp.FromTime(baseT.Add(time.Minute)), F: 2}, + {T: timestamp.FromTime(baseT.Add(2 * time.Minute)), F: 4}, + }, + }, + }, + }, + }, + "matches no series": { + expr: "some_nonexistent_metric[1m]", + ts: baseT, + expected: &promql.Result{ + Value: promql.Matrix{}, + }, + }, + "no samples in range": { + expr: "some_metric[1m]", + ts: baseT.Add(20 * time.Minute), + expected: &promql.Result{ + Value: promql.Matrix{}, + }, + }, + "does not return points outside range if last selected point does not align to end of range": { + expr: "some_metric_with_gaps[1m]", + ts: baseT.Add(2 * time.Minute), + expected: &promql.Result{ + Value: promql.Matrix{ + { + Metric: labels.FromStrings("__name__", "some_metric_with_gaps"), + Floats: []promql.FPoint{ + {T: timestamp.FromTime(baseT.Add(time.Minute)), F: 1}, + }, + }, + }, + }, + }, + "metric with stale marker": { + expr: "some_metric_with_stale_marker[3m]", + ts: baseT.Add(3 * time.Minute), + expected: &promql.Result{ + Value: promql.Matrix{ + { + Metric: labels.FromStrings("__name__", "some_metric_with_stale_marker"), + Floats: []promql.FPoint{ + {T: timestamp.FromTime(baseT), F: 0}, + {T: timestamp.FromTime(baseT.Add(time.Minute)), F: 1}, + {T: timestamp.FromTime(baseT.Add(3 * time.Minute)), F: 3}, + }, + }, + }, + }, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + runTest := func(t *testing.T, eng promql.QueryEngine, expr string, ts time.Time, expected *promql.Result) { + q, err := eng.NewInstantQuery(context.Background(), storage, nil, expr, ts) + require.NoError(t, err) + defer q.Close() + + res := q.Exec(context.Background()) + require.Equal(t, expected, res) + } + + t.Run("streaming engine", func(t *testing.T) { + runTest(t, streamingEngine, testCase.expr, testCase.ts, testCase.expected) + }) + + // Run the tests against Prometheus' engine to ensure our test cases are valid. + t.Run("Prometheus' engine", func(t *testing.T) { + runTest(t, prometheusEngine, testCase.expr, testCase.ts, testCase.expected) + }) + }) + } +} diff --git a/pkg/streamingpromql/operator/aggregation.go b/pkg/streamingpromql/operator/aggregation.go index 79ea7336254..597e40c9f19 100644 --- a/pkg/streamingpromql/operator/aggregation.go +++ b/pkg/streamingpromql/operator/aggregation.go @@ -118,7 +118,7 @@ func (a *Aggregation) labelsForGroup(m labels.Labels, lb *labels.Builder) labels return lb.Labels() } -func (a *Aggregation) Next(ctx context.Context) (InstantVectorSeriesData, error) { +func (a *Aggregation) NextSeries(ctx context.Context) (InstantVectorSeriesData, error) { if len(a.remainingGroups) == 0 { // No more groups left. return InstantVectorSeriesData{}, EOS @@ -135,7 +135,7 @@ func (a *Aggregation) Next(ctx context.Context) (InstantVectorSeriesData, error) // Iterate through inner series until the desired group is complete for thisGroup.remainingSeriesCount > 0 { - s, err := a.Inner.Next(ctx) + s, err := a.Inner.NextSeries(ctx) if err != nil { if errors.Is(err, EOS) { diff --git a/pkg/streamingpromql/operator/aggregation_test.go b/pkg/streamingpromql/operator/aggregation_test.go index 069d0a8ba5f..331fb7e672c 100644 --- a/pkg/streamingpromql/operator/aggregation_test.go +++ b/pkg/streamingpromql/operator/aggregation_test.go @@ -106,8 +106,8 @@ func (t *testOperator) SeriesMetadata(_ context.Context) ([]SeriesMetadata, erro return labelsToSeriesMetadata(t.series), nil } -func (t *testOperator) Next(_ context.Context) (InstantVectorSeriesData, error) { - panic("Next() not supported") +func (t *testOperator) NextSeries(_ context.Context) (InstantVectorSeriesData, error) { + panic("NextSeries() not supported") } func (t *testOperator) Close() { diff --git a/pkg/streamingpromql/operator/instant_vector_selector.go b/pkg/streamingpromql/operator/instant_vector_selector.go index e72b955b724..5fd06435bc3 100644 --- a/pkg/streamingpromql/operator/instant_vector_selector.go +++ b/pkg/streamingpromql/operator/instant_vector_selector.go @@ -29,13 +29,13 @@ type InstantVectorSelector struct { var _ InstantVectorOperator = &InstantVectorSelector{} func (v *InstantVectorSelector) SeriesMetadata(ctx context.Context) ([]SeriesMetadata, error) { - // Compute value we need on every call to Next() once, here. + // Compute value we need on every call to NextSeries() once, here. v.numSteps = stepCount(v.Selector.Start, v.Selector.End, v.Selector.Interval) return v.Selector.SeriesMetadata(ctx) } -func (v *InstantVectorSelector) Next(_ context.Context) (InstantVectorSeriesData, error) { +func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeriesData, error) { if v.memoizedIterator == nil { v.memoizedIterator = storage.NewMemoizedEmptyIterator(v.Selector.LookbackDelta.Milliseconds()) } @@ -100,7 +100,5 @@ func (v *InstantVectorSelector) Next(_ context.Context) (InstantVectorSeriesData } func (v *InstantVectorSelector) Close() { - if v.Selector != nil { - v.Selector.Close() - } + v.Selector.Close() } diff --git a/pkg/streamingpromql/operator/operator.go b/pkg/streamingpromql/operator/operator.go index a778cf20341..6e81a81bf58 100644 --- a/pkg/streamingpromql/operator/operator.go +++ b/pkg/streamingpromql/operator/operator.go @@ -5,28 +5,62 @@ package operator import ( "context" "errors" + "time" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" ) -// InstantVectorOperator represents all operators that produce instant vectors. -type InstantVectorOperator interface { +// Operator represents all operators. +type Operator interface { // SeriesMetadata returns a list of all series that will be returned by this operator. // The returned []SeriesMetadata can be modified by the caller or returned to a pool. - // SeriesMetadata may return series in any order, but the same order must be used by both SeriesMetadata and Next. + // SeriesMetadata may return series in any order, but the same order must be used by both SeriesMetadata and NextSeries. // SeriesMetadata should be called no more than once. SeriesMetadata(ctx context.Context) ([]SeriesMetadata, error) - // Next returns the next series from this operator, or EOS otherwise. - // SeriesMetadata must be called exactly once before calling Next. + // Close frees all resources associated with this operator. + // Calling SeriesMetadata or NextSeries after calling Close may result in unpredictable behaviour, corruption or crashes. + Close() +} + +// InstantVectorOperator represents all operators that produce instant vectors. +type InstantVectorOperator interface { + Operator + + // NextSeries returns the next series from this operator, or EOS if no more series are available. + // SeriesMetadata must be called exactly once before calling NextSeries. // The returned InstantVectorSeriesData can be modified by the caller or returned to a pool. // The returned InstantVectorSeriesData can contain no points. - Next(ctx context.Context) (InstantVectorSeriesData, error) + NextSeries(ctx context.Context) (InstantVectorSeriesData, error) +} - // Close frees all resources associated with this operator. - // Calling SeriesMetadata or Next after calling Close may result in unpredictable behaviour, corruption or crashes. - Close() +// RangeVectorOperator represents all operators that produce range vectors. +type RangeVectorOperator interface { + Operator + + // StepCount returns the number of time steps produced for each series by this operator. + // StepCount must only be called after calling SeriesMetadata. + StepCount() int + + // Range returns the time range selected by this operator at each time step. + // + // For example, if this operator represents the selector "some_metric[5m]", Range returns 5 minutes. + Range() time.Duration + + // NextSeries advances to the next series produced by this operator, or EOS if no more series are available. + // SeriesMetadata must be called exactly once before calling NextSeries. + NextSeries(ctx context.Context) error + + // NextStepSamples populates the provided RingBuffer with the samples for the next time step for the + // current series and returns the timestamps of the next time step, or returns EOS if no more time + // steps are available. + // The provided RingBuffer is expected to only contain points for the current series, and the same + // RingBuffer should be passed to subsequent NextStepSamples calls for the same series. + // The provided RingBuffer may be populated with points beyond the end of the expected time range, and + // callers should compare returned points' timestamps to the returned RangeVectorStepData.RangeEnd. + // Next must be called at least once before calling NextStepSamples. + NextStepSamples(floats *RingBuffer) (RangeVectorStepData, error) } var EOS = errors.New("operator stream exhausted") //nolint:revive @@ -39,3 +73,35 @@ type InstantVectorSeriesData struct { Floats []promql.FPoint Histograms []promql.HPoint } + +// RangeVectorStepData contains the timestamps associated with a single time step produced by a +// RangeVectorOperator. +// +// All values are in milliseconds since the Unix epoch. +// +// For example, if the operator represents the selector "some_metric[5m]", and this time step is for +// 2024-05-02T00:00:00Z, then: +// - StepT is 1714608000000 (2024-05-02T00:00:00Z) +// - RangeStart is 1714607700000 (2024-05-01T23:55:00Z) +// - RangeEnd is 1714608000000 (2024-05-02T00:00:00Z) +// +// If the operator represents the selector "some_metric[5m] @ 1712016000", and this time step is for +// 2024-05-02T00:00:00Z, then: +// - StepT is 1714608000000 (2024-05-02T00:00:00Z) +// - RangeStart is 1712015700000 (2024-04-01T23:55:00Z) +// - RangeEnd is 1712016000000 (2024-04-02T00:00:00Z) +type RangeVectorStepData struct { + // StepT is the timestamp of this time step. + StepT int64 + + // RangeStart is the beginning of the time range selected by this time step. + // RangeStart is inclusive (ie. points with timestamp >= RangeStart are included in the range). + RangeStart int64 + + // RangeEnd is the end of the time range selected by this time step. + // RangeEnd is the same as StepT except when the @ modifier or offsets are used, in which case + // RangeEnd reflects the time of the underlying points, and StepT is the timestamp of the point + // produced by the query. + // RangeEnd is inclusive (ie. points with timestamp <= RangeEnd are included in the range). + RangeEnd int64 +} diff --git a/pkg/streamingpromql/operator/range_vector_function.go b/pkg/streamingpromql/operator/range_vector_function.go new file mode 100644 index 00000000000..af7648414a9 --- /dev/null +++ b/pkg/streamingpromql/operator/range_vector_function.go @@ -0,0 +1,154 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors + +package operator + +import ( + "context" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" +) + +// RangeVectorFunction performs a rate calculation over a range vector. +type RangeVectorFunction struct { + Inner RangeVectorOperator + + numSteps int + rangeSeconds float64 + buffer *RingBuffer +} + +var _ InstantVectorOperator = &RangeVectorFunction{} + +func (m *RangeVectorFunction) SeriesMetadata(ctx context.Context) ([]SeriesMetadata, error) { + metadata, err := m.Inner.SeriesMetadata(ctx) + if err != nil { + return nil, err + } + + lb := labels.NewBuilder(labels.EmptyLabels()) + for i := range metadata { + metadata[i].Labels = dropMetricName(metadata[i].Labels, lb) + } + + m.numSteps = m.Inner.StepCount() + m.rangeSeconds = m.Inner.Range().Seconds() + + return metadata, nil +} + +func dropMetricName(l labels.Labels, lb *labels.Builder) labels.Labels { + lb.Reset(l) + lb.Del(labels.MetricName) + return lb.Labels() +} + +func (m *RangeVectorFunction) NextSeries(ctx context.Context) (InstantVectorSeriesData, error) { + if err := m.Inner.NextSeries(ctx); err != nil { + return InstantVectorSeriesData{}, err + } + + if m.buffer == nil { + m.buffer = &RingBuffer{} + } + + m.buffer.Reset() + + data := InstantVectorSeriesData{ + Floats: GetFPointSlice(m.numSteps), // TODO: only allocate this if we have any floats (once we support native histograms) + } + + for { + step, err := m.Inner.NextStepSamples(m.buffer) + + // nolint:errorlint // errors.Is introduces a performance overhead, and NextStepSamples is guaranteed to return exactly EOS, never a wrapped error. + if err == EOS { + return data, nil + } else if err != nil { + return InstantVectorSeriesData{}, err + } + + head, tail := m.buffer.UnsafePoints(step.RangeEnd) + count := len(head) + len(tail) + + if count < 2 { + // Not enough points, skip. + continue + } + + firstPoint := m.buffer.First() + lastPoint, _ := m.buffer.LastAtOrBefore(step.RangeEnd) // We already know there is a point at or before this time, no need to check. + delta := lastPoint.F - firstPoint.F + previousValue := firstPoint.F + + accumulate := func(points []promql.FPoint) { + for _, p := range points { + if p.T > step.RangeEnd { // The buffer is already guaranteed to only contain points >= rangeStart. + return + } + + if p.F < previousValue { + // Counter reset. + delta += previousValue + } + + previousValue = p.F + } + } + + accumulate(head) + accumulate(tail) + + val := m.calculateRate(step.RangeStart, step.RangeEnd, firstPoint, lastPoint, delta, count) + + data.Floats = append(data.Floats, promql.FPoint{T: step.StepT, F: val}) + } +} + +// This is based on extrapolatedRate from promql/functions.go. +// https://github.com/prometheus/prometheus/pull/13725 has a good explanation of the intended behaviour here. +func (m *RangeVectorFunction) calculateRate(rangeStart, rangeEnd int64, firstPoint, lastPoint promql.FPoint, delta float64, count int) float64 { + durationToStart := float64(firstPoint.T-rangeStart) / 1000 + durationToEnd := float64(rangeEnd-lastPoint.T) / 1000 + + sampledInterval := float64(lastPoint.T-firstPoint.T) / 1000 + averageDurationBetweenSamples := sampledInterval / float64(count-1) + + extrapolationThreshold := averageDurationBetweenSamples * 1.1 + extrapolateToInterval := sampledInterval + + if durationToStart >= extrapolationThreshold { + durationToStart = averageDurationBetweenSamples / 2 + } + + if delta > 0 && firstPoint.F >= 0 { + durationToZero := sampledInterval * (firstPoint.F / delta) + if durationToZero < durationToStart { + durationToStart = durationToZero + } + } + + extrapolateToInterval += durationToStart + + if durationToEnd >= extrapolationThreshold { + durationToEnd = averageDurationBetweenSamples / 2 + } + + extrapolateToInterval += durationToEnd + + factor := extrapolateToInterval / sampledInterval + factor /= m.rangeSeconds + return delta * factor +} + +func (m *RangeVectorFunction) Close() { + m.Inner.Close() + + if m.buffer != nil { + m.buffer.Close() + } +} diff --git a/pkg/streamingpromql/operator/range_vector_selector.go b/pkg/streamingpromql/operator/range_vector_selector.go new file mode 100644 index 00000000000..9d80eba9097 --- /dev/null +++ b/pkg/streamingpromql/operator/range_vector_selector.go @@ -0,0 +1,117 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors + +package operator + +import ( + "context" + "fmt" + "time" + + "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +type RangeVectorSelector struct { + Selector *Selector + + rangeMilliseconds int64 + numSteps int + + chunkIterator chunkenc.Iterator + nextT int64 +} + +var _ RangeVectorOperator = &RangeVectorSelector{} + +func (m *RangeVectorSelector) SeriesMetadata(ctx context.Context) ([]SeriesMetadata, error) { + // Compute value we need on every call to NextSeries() once, here. + m.rangeMilliseconds = m.Selector.Range.Milliseconds() + m.numSteps = stepCount(m.Selector.Start, m.Selector.End, m.Selector.Interval) + + return m.Selector.SeriesMetadata(ctx) +} + +func (m *RangeVectorSelector) StepCount() int { + return m.numSteps +} + +func (m *RangeVectorSelector) Range() time.Duration { + return m.Selector.Range +} + +func (m *RangeVectorSelector) NextSeries(_ context.Context) error { + var err error + m.chunkIterator, err = m.Selector.Next(m.chunkIterator) + if err != nil { + return err + } + + m.nextT = m.Selector.Start + return nil +} + +func (m *RangeVectorSelector) NextStepSamples(floats *RingBuffer) (RangeVectorStepData, error) { + if m.nextT > m.Selector.End { + return RangeVectorStepData{}, EOS + } + + stepT := m.nextT + rangeEnd := stepT + + if m.Selector.Timestamp != nil { + rangeEnd = *m.Selector.Timestamp + } + + rangeStart := rangeEnd - m.rangeMilliseconds + floats.DiscardPointsBefore(rangeStart) + + if err := m.fillBuffer(floats, rangeStart, rangeEnd); err != nil { + return RangeVectorStepData{}, err + } + + m.nextT += m.Selector.Interval + + return RangeVectorStepData{ + StepT: stepT, + RangeStart: rangeStart, + RangeEnd: rangeEnd, + }, nil +} + +func (m *RangeVectorSelector) fillBuffer(floats *RingBuffer, rangeStart, rangeEnd int64) error { + // Keep filling the buffer until we reach the end of the range or the end of the iterator. + for { + valueType := m.chunkIterator.Next() + + switch valueType { + case chunkenc.ValNone: + // No more data. We are done. + return m.chunkIterator.Err() + case chunkenc.ValFloat: + t, f := m.chunkIterator.At() + if value.IsStaleNaN(f) || t < rangeStart { + continue + } + + // We might append a sample beyond the range end, but this is OK: + // - callers of NextStepSamples are expected to pass the same RingBuffer to subsequent calls, so the point is not lost + // - callers of NextStepSamples are expected to handle the case where the buffer contains points beyond the end of the range + floats.Append(promql.FPoint{T: t, F: f}) + + if t >= rangeEnd { + return nil + } + default: + // TODO: handle native histograms + return fmt.Errorf("unknown value type %s", valueType.String()) + } + } +} + +func (m *RangeVectorSelector) Close() { + m.Selector.Close() +} diff --git a/pkg/streamingpromql/operator/range_vector_selector_with_transformation.go b/pkg/streamingpromql/operator/range_vector_selector_with_transformation.go deleted file mode 100644 index 98ca848f093..00000000000 --- a/pkg/streamingpromql/operator/range_vector_selector_with_transformation.go +++ /dev/null @@ -1,200 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only -// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go -// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions.go -// Provenance-includes-license: Apache-2.0 -// Provenance-includes-copyright: The Prometheus Authors - -package operator - -import ( - "context" - "fmt" - - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/value" - "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/tsdb/chunkenc" -) - -// RangeVectorSelectorWithTransformation performs a rate calculation over a range vector selector. -// -// This will one day be split into two operators: the rate calculation operator and a range vector selector operator. -type RangeVectorSelectorWithTransformation struct { - Selector *Selector - - rangeMilliseconds int64 - numSteps int - - chunkIterator chunkenc.Iterator - buffer *RingBuffer -} - -var _ InstantVectorOperator = &RangeVectorSelectorWithTransformation{} - -func (m *RangeVectorSelectorWithTransformation) SeriesMetadata(ctx context.Context) ([]SeriesMetadata, error) { - // Compute values we need on every call to Next() once, here. - m.rangeMilliseconds = m.Selector.Range.Milliseconds() - m.numSteps = stepCount(m.Selector.Start, m.Selector.End, m.Selector.Interval) - - metadata, err := m.Selector.SeriesMetadata(ctx) - if err != nil { - return nil, err - } - - lb := labels.NewBuilder(labels.EmptyLabels()) - for i := range metadata { - metadata[i].Labels = dropMetricName(metadata[i].Labels, lb) - } - - return metadata, nil -} - -func dropMetricName(l labels.Labels, lb *labels.Builder) labels.Labels { - lb.Reset(l) - lb.Del(labels.MetricName) - return lb.Labels() -} - -func (m *RangeVectorSelectorWithTransformation) Next(_ context.Context) (InstantVectorSeriesData, error) { - if m.buffer == nil { - m.buffer = &RingBuffer{} - } - - var err error - m.chunkIterator, err = m.Selector.Next(m.chunkIterator) - if err != nil { - return InstantVectorSeriesData{}, err - } - - m.buffer.Reset() - - data := InstantVectorSeriesData{ - Floats: GetFPointSlice(m.numSteps), // TODO: only allocate this if we have any floats (once we support native histograms) - } - - // TODO: handle native histograms - for stepT := m.Selector.Start; stepT <= m.Selector.End; stepT += m.Selector.Interval { - rangeEnd := stepT - - if m.Selector.Timestamp != nil { - rangeEnd = *m.Selector.Timestamp - } - - rangeStart := rangeEnd - m.rangeMilliseconds - m.buffer.DiscardPointsBefore(rangeStart) - - if err := m.fillBuffer(rangeStart, rangeEnd); err != nil { - return InstantVectorSeriesData{}, err - } - - head, tail := m.buffer.Points() - count := len(head) + len(tail) - - if count < 2 { - // Not enough points, skip. - continue - } - - firstPoint := m.buffer.First() - lastPoint := m.buffer.Last() - delta := lastPoint.F - firstPoint.F - previousValue := firstPoint.F - - accumulate := func(points []promql.FPoint) { - for _, p := range points { - if p.T > rangeEnd { // The buffer is already guaranteed to only contain points >= rangeStart. - return - } - - if p.F < previousValue { - // Counter reset. - delta += previousValue - } - - previousValue = p.F - } - } - - accumulate(head) - accumulate(tail) - - val := m.calculateRate(rangeStart, rangeEnd, firstPoint, lastPoint, delta, count) - - data.Floats = append(data.Floats, promql.FPoint{T: stepT, F: val}) - } - - return data, nil -} - -func (m *RangeVectorSelectorWithTransformation) fillBuffer(rangeStart, rangeEnd int64) error { - // Keep filling the buffer until we reach the end of the range or the end of the iterator. - for { - valueType := m.chunkIterator.Next() - - switch valueType { - case chunkenc.ValNone: - // No more data. We are done. - return m.chunkIterator.Err() - case chunkenc.ValFloat: - t, f := m.chunkIterator.At() - if value.IsStaleNaN(f) || t < rangeStart { - continue - } - - m.buffer.Append(promql.FPoint{T: t, F: f}) - - if t >= rangeEnd { - return nil - } - default: - // TODO: handle native histograms - return fmt.Errorf("unknown value type %s", valueType.String()) - } - } -} - -// This is based on extrapolatedRate from promql/functions.go. -// https://github.com/prometheus/prometheus/pull/13725 has a good explanation of the intended behaviour here. -func (m *RangeVectorSelectorWithTransformation) calculateRate(rangeStart, rangeEnd int64, firstPoint, lastPoint promql.FPoint, delta float64, count int) float64 { - durationToStart := float64(firstPoint.T-rangeStart) / 1000 - durationToEnd := float64(rangeEnd-lastPoint.T) / 1000 - - sampledInterval := float64(lastPoint.T-firstPoint.T) / 1000 - averageDurationBetweenSamples := sampledInterval / float64(count-1) - - extrapolationThreshold := averageDurationBetweenSamples * 1.1 - extrapolateToInterval := sampledInterval - - if durationToStart >= extrapolationThreshold { - durationToStart = averageDurationBetweenSamples / 2 - } - - if delta > 0 && firstPoint.F >= 0 { - durationToZero := sampledInterval * (firstPoint.F / delta) - if durationToZero < durationToStart { - durationToStart = durationToZero - } - } - - extrapolateToInterval += durationToStart - - if durationToEnd >= extrapolationThreshold { - durationToEnd = averageDurationBetweenSamples / 2 - } - - extrapolateToInterval += durationToEnd - - factor := extrapolateToInterval / sampledInterval - factor /= m.Selector.Range.Seconds() - return delta * factor -} - -func (m *RangeVectorSelectorWithTransformation) Close() { - if m.Selector != nil { - m.Selector.Close() - } - - if m.buffer != nil { - m.buffer.Close() - } -} diff --git a/pkg/streamingpromql/operator/ring_buffer.go b/pkg/streamingpromql/operator/ring_buffer.go index b5dc76d9579..8e6869ba756 100644 --- a/pkg/streamingpromql/operator/ring_buffer.go +++ b/pkg/streamingpromql/operator/ring_buffer.go @@ -26,23 +26,50 @@ func (b *RingBuffer) DiscardPointsBefore(t int64) { } } -// Points returns slices of the points in this buffer. +// UnsafePoints returns slices of the points in this buffer, including only points with timestamp less than or equal to maxT. // Either or both slice could be empty. -// Callers must not modify the values in the returned slices. +// Callers must not modify the values in the returned slices or return them to a pool. +// Calling UnsafePoints is more efficient than calling CopyPoints, as CopyPoints will create a new slice and copy all +// points into the slice, whereas UnsafePoints returns a view into the internal state of this buffer. +// The returned slices are no longer valid if this buffer is modified (eg. a point is added, or the buffer is reset). // // FIXME: the fact we have to expose this is a bit gross, but the overhead of calling a function with ForEach is terrible. // Perhaps we can use range-over function iterators (https://go.dev/wiki/RangefuncExperiment) once this is not experimental? -func (b *RingBuffer) Points() ([]promql.FPoint, []promql.FPoint) { - endOfTailSegment := b.firstIndex + b.size +func (b *RingBuffer) UnsafePoints(maxT int64) (head []promql.FPoint, tail []promql.FPoint) { + size := b.size - if endOfTailSegment > len(b.points) { + for size > 0 && b.points[(b.firstIndex+size-1)%len(b.points)].T > maxT { + size-- + } + + endOfHeadSegment := b.firstIndex + size + + if endOfHeadSegment > len(b.points) { // Need to wrap around. - endOfHeadSegment := endOfTailSegment % len(b.points) - endOfTailSegment = len(b.points) - return b.points[b.firstIndex:endOfTailSegment], b.points[0:endOfHeadSegment] + endOfTailSegment := endOfHeadSegment % len(b.points) + endOfHeadSegment = len(b.points) + return b.points[b.firstIndex:endOfHeadSegment], b.points[0:endOfTailSegment] } - return b.points[b.firstIndex:endOfTailSegment], nil + return b.points[b.firstIndex:endOfHeadSegment], nil +} + +// CopyPoints returns a single slice of the points in this buffer, including only points with timestamp less than or equal to maxT. +// Callers may modify the values in the returned slice, and should return the slice to the pool by calling +// PutFPointSlice when it is no longer needed. +// Calling UnsafePoints is more efficient than calling CopyPoints, as CopyPoints will create a new slice and copy all +// points into the slice, whereas UnsafePoints returns a view into the internal state of this buffer. +func (b *RingBuffer) CopyPoints(maxT int64) []promql.FPoint { + if b.size == 0 { + return nil + } + + head, tail := b.UnsafePoints(maxT) + combined := GetFPointSlice(len(head) + len(tail)) + combined = append(combined, head...) + combined = append(combined, tail...) + + return combined } // ForEach calls f for each point in this buffer. @@ -121,12 +148,20 @@ func (b *RingBuffer) First() promql.FPoint { return b.points[b.firstIndex] } -// Last returns the last point in this buffer. -// It panics if the buffer is empty. -func (b *RingBuffer) Last() promql.FPoint { - if b.size == 0 { - panic("Can't get last element of empty buffer") +// LastAtOrBefore returns the last point in this buffer with timestamp less than or equal to maxT. +// It returns false if there is no point satisfying this requirement. +func (b *RingBuffer) LastAtOrBefore(maxT int64) (promql.FPoint, bool) { + size := b.size + + for size > 0 { + p := b.points[(b.firstIndex+size-1)%len(b.points)] + + if p.T <= maxT { + return p, true + } + + size-- } - return b.points[(b.firstIndex+b.size-1)%len(b.points)] + return promql.FPoint{}, false } diff --git a/pkg/streamingpromql/operator/ring_buffer_test.go b/pkg/streamingpromql/operator/ring_buffer_test.go index 361db494299..40327965d04 100644 --- a/pkg/streamingpromql/operator/ring_buffer_test.go +++ b/pkg/streamingpromql/operator/ring_buffer_test.go @@ -3,6 +3,7 @@ package operator import ( + "math" "testing" "github.com/prometheus/prometheus/promql" @@ -130,27 +131,49 @@ func shouldHaveNoPoints(t *testing.T, buf *RingBuffer) { } func shouldHavePoints(t *testing.T, buf *RingBuffer, expected ...promql.FPoint) { - var actual []promql.FPoint + var pointsFromForEach []promql.FPoint buf.ForEach(func(p promql.FPoint) { - actual = append(actual, p) + pointsFromForEach = append(pointsFromForEach, p) }) - require.Equal(t, expected, actual) + require.Equal(t, expected, pointsFromForEach) - head, tail := buf.Points() - actual = append(head, tail...) + if len(expected) == 0 { + shouldHavePointsAtOrBeforeTime(t, buf, math.MaxInt64, expected...) + _, present := buf.LastAtOrBefore(math.MaxInt64) + require.False(t, present) + } else { + require.Equal(t, expected[0], buf.First()) + // We test LastAtOrBefore() below. - if len(actual) == 0 { - actual = nil // expected will be nil when it's empty, but appending two empty slices returns a non-nil slice. + lastPointT := expected[len(expected)-1].T + + shouldHavePointsAtOrBeforeTime(t, buf, lastPointT, expected...) + shouldHavePointsAtOrBeforeTime(t, buf, lastPointT+1, expected...) + shouldHavePointsAtOrBeforeTime(t, buf, lastPointT-1, expected[:len(expected)-1]...) } +} - require.Equal(t, expected, actual) +func shouldHavePointsAtOrBeforeTime(t *testing.T, buf *RingBuffer, ts int64, expected ...promql.FPoint) { + head, tail := buf.UnsafePoints(ts) + combinedPoints := append(head, tail...) - if len(actual) == 0 { - return + if len(expected) == 0 { + require.Len(t, combinedPoints, 0) + } else { + require.Equal(t, expected, combinedPoints) } - require.Equal(t, expected[0], buf.First()) - require.Equal(t, expected[len(expected)-1], buf.Last()) + copiedPoints := buf.CopyPoints(ts) + require.Equal(t, expected, copiedPoints) + + end, present := buf.LastAtOrBefore(ts) + + if len(expected) == 0 { + require.False(t, present) + } else { + require.True(t, present) + require.Equal(t, expected[len(expected)-1], end) + } } diff --git a/pkg/streamingpromql/operator/selector.go b/pkg/streamingpromql/operator/selector.go index 1d900e7c013..8225daa0992 100644 --- a/pkg/streamingpromql/operator/selector.go +++ b/pkg/streamingpromql/operator/selector.go @@ -15,10 +15,10 @@ import ( type Selector struct { Queryable storage.Queryable - Start int64 // Milliseconds since Unix epoch - End int64 // Milliseconds since Unix epoch - Timestamp *int64 - Interval int64 // In milliseconds + Start int64 // Milliseconds since Unix epoch + End int64 // Milliseconds since Unix epoch + Timestamp *int64 // Milliseconds since Unix epoch, only set if selector uses @ modifier (eg. metric{...} @ 123) + Interval int64 // In milliseconds Matchers []*labels.Matcher // Set for instant vector selectors, otherwise 0. diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index ed65dd736a9..cae4ace5c61 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -26,7 +26,7 @@ type Query struct { queryable storage.Queryable opts promql.QueryOpts statement *parser.EvalStmt - root operator.InstantVectorOperator + root operator.Operator engine *Engine qs string @@ -65,15 +65,29 @@ func newQuery(queryable storage.Queryable, opts promql.QueryOpts, qs string, sta } } - q.root, err = q.convertToOperator(expr) - if err != nil { - return nil, err + switch expr.Type() { + case parser.ValueTypeMatrix: + q.root, err = q.convertToRangeVectorOperator(expr) + if err != nil { + return nil, err + } + case parser.ValueTypeVector: + q.root, err = q.convertToInstantVectorOperator(expr) + if err != nil { + return nil, err + } + default: + return nil, NewNotSupportedError(fmt.Sprintf("%s value as top-level expression", parser.DocumentedType(expr.Type()))) } return q, nil } -func (q *Query) convertToOperator(expr parser.Expr) (operator.InstantVectorOperator, error) { +func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (operator.InstantVectorOperator, error) { + if expr.Type() != parser.ValueTypeVector { + return nil, fmt.Errorf("cannot create instant vector operator for expression that produces a %s", parser.DocumentedType(expr.Type())) + } + interval := q.statement.Interval if q.IsInstant() { @@ -118,7 +132,7 @@ func (q *Query) convertToOperator(expr parser.Expr) (operator.InstantVectorOpera slices.Sort(e.Grouping) - inner, err := q.convertToOperator(e.Expr) + inner, err := q.convertToInstantVectorOperator(e.Expr) if err != nil { return nil, err } @@ -140,34 +154,59 @@ func (q *Query) convertToOperator(expr parser.Expr) (operator.InstantVectorOpera return nil, fmt.Errorf("expected exactly one argument for rate, got %v", len(e.Args)) } - matrixSelector, ok := e.Args[0].(*parser.MatrixSelector) - if !ok { - // Should be caught by the PromQL parser, but we check here for safety. - return nil, NewNotSupportedError(fmt.Sprintf("unsupported rate argument type %T", e.Args[0])) + inner, err := q.convertToRangeVectorOperator(e.Args[0]) + if err != nil { + return nil, err } - vectorSelector := matrixSelector.VectorSelector.(*parser.VectorSelector) + return &operator.RangeVectorFunction{ + Inner: inner, + }, nil + case *parser.StepInvariantExpr: + // One day, we'll do something smarter here. + return q.convertToInstantVectorOperator(e.Expr) + case *parser.ParenExpr: + return q.convertToInstantVectorOperator(e.Expr) + default: + return nil, NewNotSupportedError(fmt.Sprintf("PromQL expression type %T", e)) + } +} + +func (q *Query) convertToRangeVectorOperator(expr parser.Expr) (operator.RangeVectorOperator, error) { + if expr.Type() != parser.ValueTypeMatrix { + return nil, fmt.Errorf("cannot create range vector operator for expression that produces a %s", parser.DocumentedType(expr.Type())) + } + + switch e := expr.(type) { + case *parser.MatrixSelector: + vectorSelector := e.VectorSelector.(*parser.VectorSelector) if vectorSelector.OriginalOffset != 0 || vectorSelector.Offset != 0 { return nil, NewNotSupportedError("range vector selector with 'offset'") } - return &operator.RangeVectorSelectorWithTransformation{ + interval := q.statement.Interval + + if q.IsInstant() { + interval = time.Millisecond + } + + return &operator.RangeVectorSelector{ Selector: &operator.Selector{ Queryable: q.queryable, Start: timestamp.FromTime(q.statement.Start), End: timestamp.FromTime(q.statement.End), Timestamp: vectorSelector.Timestamp, Interval: interval.Milliseconds(), - Range: matrixSelector.Range, + Range: e.Range, Matchers: vectorSelector.LabelMatchers, }, }, nil case *parser.StepInvariantExpr: // One day, we'll do something smarter here. - return q.convertToOperator(e.Expr) + return q.convertToRangeVectorOperator(e.Expr) case *parser.ParenExpr: - return q.convertToOperator(e.Expr) + return q.convertToRangeVectorOperator(e.Expr) default: return nil, NewNotSupportedError(fmt.Sprintf("PromQL expression type %T", e)) } @@ -186,31 +225,44 @@ func (q *Query) Exec(ctx context.Context) *promql.Result { } defer operator.PutSeriesMetadataSlice(series) - if q.IsInstant() { - v, err := q.populateVector(ctx, series) + switch q.statement.Expr.Type() { + case parser.ValueTypeMatrix: + v, err := q.populateMatrixFromRangeVectorOperator(ctx, q.root.(operator.RangeVectorOperator), series) if err != nil { return &promql.Result{Err: err} } q.result = &promql.Result{Value: v} - } else { - m, err := q.populateMatrix(ctx, series) - if err != nil { - return &promql.Result{Value: m} - } + case parser.ValueTypeVector: + if q.IsInstant() { + v, err := q.populateVectorFromInstantVectorOperator(ctx, q.root.(operator.InstantVectorOperator), series) + if err != nil { + return &promql.Result{Err: err} + } + + q.result = &promql.Result{Value: v} + } else { + v, err := q.populateMatrixFromInstantVectorOperator(ctx, q.root.(operator.InstantVectorOperator), series) + if err != nil { + return &promql.Result{Err: err} + } - q.result = &promql.Result{Value: m} + q.result = &promql.Result{Value: v} + } + default: + // This should be caught in newQuery above. + return &promql.Result{Err: NewNotSupportedError(fmt.Sprintf("unsupported result type %s", parser.DocumentedType(q.statement.Expr.Type())))} } return q.result } -func (q *Query) populateVector(ctx context.Context, series []operator.SeriesMetadata) (promql.Vector, error) { +func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o operator.InstantVectorOperator, series []operator.SeriesMetadata) (promql.Vector, error) { ts := timeMilliseconds(q.statement.Start) v := operator.GetVector(len(series)) for i, s := range series { - d, err := q.root.Next(ctx) + d, err := o.NextSeries(ctx) if err != nil { if errors.Is(err, operator.EOS) { return nil, fmt.Errorf("expected %v series, but only received %v", len(series), i) @@ -244,11 +296,11 @@ func (q *Query) populateVector(ctx context.Context, series []operator.SeriesMeta return v, nil } -func (q *Query) populateMatrix(ctx context.Context, series []operator.SeriesMetadata) (promql.Matrix, error) { +func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o operator.InstantVectorOperator, series []operator.SeriesMetadata) (promql.Matrix, error) { m := operator.GetMatrix(len(series)) for i, s := range series { - d, err := q.root.Next(ctx) + d, err := o.NextSeries(ctx) if err != nil { if errors.Is(err, operator.EOS) { return nil, fmt.Errorf("expected %v series, but only received %v", len(series), i) @@ -278,6 +330,40 @@ func (q *Query) populateMatrix(ctx context.Context, series []operator.SeriesMeta return m, nil } +func (q *Query) populateMatrixFromRangeVectorOperator(ctx context.Context, o operator.RangeVectorOperator, series []operator.SeriesMetadata) (promql.Matrix, error) { + m := operator.GetMatrix(len(series)) + b := &operator.RingBuffer{} + defer b.Close() + + for i, s := range series { + err := o.NextSeries(ctx) + if err != nil { + if errors.Is(err, operator.EOS) { + return nil, fmt.Errorf("expected %v series, but only received %v", len(series), i) + } + + return nil, err + } + + b.Reset() + step, err := o.NextStepSamples(b) + if err != nil { + return nil, err + } + + m = append(m, promql.Series{ + Metric: s.Labels, + Floats: b.CopyPoints(step.RangeEnd), + }) + } + + slices.SortFunc(m, func(a, b promql.Series) int { + return labels.Compare(a.Metric, b.Metric) + }) + + return m, nil +} + func (q *Query) Close() { if q.result == nil { return diff --git a/pkg/streamingpromql/testdata/ours/functions.test b/pkg/streamingpromql/testdata/ours/functions.test index 5bf1ae48e99..433e08d902b 100644 --- a/pkg/streamingpromql/testdata/ours/functions.test +++ b/pkg/streamingpromql/testdata/ours/functions.test @@ -8,6 +8,8 @@ load 1m some_metric{env="prod", cluster="us"} 0+120x4 some_metric{env="test", cluster="eu"} 0+180x4 some_metric{env="test", cluster="us"} 0+240x4 + some_metric_with_gaps 0 60 120 180 240 _ 2000 2120 2240 + some_metric_with_stale_marker 0 60 120 stale 240 300 # Range query with rate. eval range from 0 to 4m step 1m rate(some_metric[1m]) @@ -19,3 +21,21 @@ eval range from 0 to 4m step 1m rate(some_metric[1m]) # If no series are matched, we shouldn't return any results. eval range from 0 to 4m step 1m rate(some_nonexistent_metric[1m]) # Should return no results. + +# Ensure we don't include points outside the range of each individual step. +# +# When evaluating a range selector, if there is no point with timestamp equal to the end of the range, +# the next point (from outside the range) will be included in the ring buffer passed to rate(). +# So we need to ensure we ignore this. +# +# The first query below (with 1m) tests that we correctly skip evaluating rate() when there aren't enough points in the range. +# The second query below (with 2m) tests that we correctly pick the last point from the buffer if the last point in the buffer is outside the range. +eval range from 0 to 8m step 1m rate(some_metric_with_gaps[1m]) + {} _ 1 1 1 1 _ _ 2 2 + +eval range from 0 to 8m step 1m rate(some_metric_with_gaps[2m]) + {} _ 0.5 1 1 1 1 14.666666666666666 2 2 + +# Test that we handle staleness markers correctly. +eval range from 0 to 5m step 1m rate(some_metric_with_stale_marker[2m]) + {} _ 0.5 1 1 1 1