From c059433aae7e013c80eea2e8535e1f3894f2f8a0 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 3 Jun 2024 16:02:20 +1000 Subject: [PATCH 1/3] Move interface definitions to `types` package --- pkg/streamingpromql/README.md | 2 +- pkg/streamingpromql/operator/aggregation.go | 8 +++--- .../operator/binary_operation.go | 14 +++++----- .../operator/instant_vector_selector.go | 2 +- pkg/streamingpromql/operator/operator_test.go | 2 +- .../operator/range_vector_function.go | 10 +++---- .../operator/range_vector_selector.go | 8 +++--- pkg/streamingpromql/operator/selector.go | 2 +- pkg/streamingpromql/query.go | 26 +++++++++---------- .../{operator => types}/operator.go | 10 +++---- .../{operator => types}/ring_buffer.go | 2 +- .../{operator => types}/ring_buffer_test.go | 2 +- 12 files changed, 43 insertions(+), 45 deletions(-) rename pkg/streamingpromql/{operator => types}/operator.go (90%) rename pkg/streamingpromql/{operator => types}/ring_buffer.go (99%) rename pkg/streamingpromql/{operator => types}/ring_buffer_test.go (99%) diff --git a/pkg/streamingpromql/README.md b/pkg/streamingpromql/README.md index aba8b6abfe2..055350bded8 100644 --- a/pkg/streamingpromql/README.md +++ b/pkg/streamingpromql/README.md @@ -52,7 +52,7 @@ flowchart TB max --> output ``` -Each of these operators satisfies the `InstantVectorOperator` interface, defined [here](./operator/operator.go). +Each of these operators satisfies the `InstantVectorOperator` interface, defined [here](./types/operator.go). The two key methods of this interface are `SeriesMetadata()` and `NextSeries()`: `SeriesMetadata()` returns the list of all series' labels that will be returned by the operator[^2]. diff --git a/pkg/streamingpromql/operator/aggregation.go b/pkg/streamingpromql/operator/aggregation.go index 606b57f5807..a620016d943 100644 --- a/pkg/streamingpromql/operator/aggregation.go +++ b/pkg/streamingpromql/operator/aggregation.go @@ -22,7 +22,7 @@ import ( ) type Aggregation struct { - Inner InstantVectorOperator + Inner types.InstantVectorOperator Start time.Time End time.Time Interval time.Duration @@ -51,7 +51,7 @@ type group struct { present []bool } -var _ InstantVectorOperator = &Aggregation{} +var _ types.InstantVectorOperator = &Aggregation{} var groupPool = zeropool.New(func() *group { return &group{} @@ -125,7 +125,7 @@ func (a *Aggregation) labelsForGroup(m labels.Labels, lb *labels.Builder) labels func (a *Aggregation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { if len(a.remainingGroups) == 0 { // No more groups left. - return types.InstantVectorSeriesData{}, EOS + return types.InstantVectorSeriesData{}, types.EOS } start := timestamp.FromTime(a.Start) @@ -142,7 +142,7 @@ func (a *Aggregation) NextSeries(ctx context.Context) (types.InstantVectorSeries s, err := a.Inner.NextSeries(ctx) if err != nil { - if errors.Is(err, EOS) { + if errors.Is(err, types.EOS) { return types.InstantVectorSeriesData{}, fmt.Errorf("exhausted series before all groups were completed: %w", err) } diff --git a/pkg/streamingpromql/operator/binary_operation.go b/pkg/streamingpromql/operator/binary_operation.go index 4dfbed12960..d4fb2ba1b2f 100644 --- a/pkg/streamingpromql/operator/binary_operation.go +++ b/pkg/streamingpromql/operator/binary_operation.go @@ -25,8 +25,8 @@ import ( // BinaryOperation represents a binary operation between instant vectors such as " + " or " - ". type BinaryOperation struct { - Left InstantVectorOperator - Right InstantVectorOperator + Left types.InstantVectorOperator + Right types.InstantVectorOperator Op parser.ItemType Pool *pooling.LimitingPool @@ -45,7 +45,7 @@ type BinaryOperation struct { opFunc binaryOperationFunc } -var _ InstantVectorOperator = &BinaryOperation{} +var _ types.InstantVectorOperator = &BinaryOperation{} type binaryOperationOutputSeries struct { leftSeriesIndices []int @@ -66,7 +66,7 @@ func (s binaryOperationOutputSeries) latestRightSeries() int { return s.rightSeriesIndices[len(s.rightSeriesIndices)-1] } -func NewBinaryOperation(left InstantVectorOperator, right InstantVectorOperator, vectorMatching parser.VectorMatching, op parser.ItemType, pool *pooling.LimitingPool) (*BinaryOperation, error) { +func NewBinaryOperation(left types.InstantVectorOperator, right types.InstantVectorOperator, vectorMatching parser.VectorMatching, op parser.ItemType, pool *pooling.LimitingPool) (*BinaryOperation, error) { opFunc := arithmeticOperationFuncs[op] if opFunc == nil { return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%s'", op)) @@ -331,7 +331,7 @@ func (b *BinaryOperation) labelsFunc() func(labels.Labels) labels.Labels { func (b *BinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { if len(b.remainingSeries) == 0 { - return types.InstantVectorSeriesData{}, EOS + return types.InstantVectorSeriesData{}, types.EOS } thisSeries := b.remainingSeries[0] @@ -545,7 +545,7 @@ func (b *BinaryOperation) Close() { // binary operation are in order B, A, C, binaryOperationSeriesBuffer will buffer the data for series A while series B is // produced, then return series A when needed. type binaryOperationSeriesBuffer struct { - source InstantVectorOperator + source types.InstantVectorOperator nextIndexToRead int // If seriesUsed[i] == true, then the series at index i is needed for this operation and should be buffered if not used immediately. @@ -562,7 +562,7 @@ type binaryOperationSeriesBuffer struct { output []types.InstantVectorSeriesData } -func newBinaryOperationSeriesBuffer(source InstantVectorOperator, seriesUsed []bool, pool *pooling.LimitingPool) *binaryOperationSeriesBuffer { +func newBinaryOperationSeriesBuffer(source types.InstantVectorOperator, seriesUsed []bool, pool *pooling.LimitingPool) *binaryOperationSeriesBuffer { return &binaryOperationSeriesBuffer{ source: source, seriesUsed: seriesUsed, diff --git a/pkg/streamingpromql/operator/instant_vector_selector.go b/pkg/streamingpromql/operator/instant_vector_selector.go index 26bd1673314..c64ab773fd1 100644 --- a/pkg/streamingpromql/operator/instant_vector_selector.go +++ b/pkg/streamingpromql/operator/instant_vector_selector.go @@ -29,7 +29,7 @@ type InstantVectorSelector struct { memoizedIterator *storage.MemoizedSeriesIterator } -var _ InstantVectorOperator = &InstantVectorSelector{} +var _ types.InstantVectorOperator = &InstantVectorSelector{} func (v *InstantVectorSelector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { // Compute value we need on every call to NextSeries() once, here. diff --git a/pkg/streamingpromql/operator/operator_test.go b/pkg/streamingpromql/operator/operator_test.go index e658a43f171..aea88a42653 100644 --- a/pkg/streamingpromql/operator/operator_test.go +++ b/pkg/streamingpromql/operator/operator_test.go @@ -22,7 +22,7 @@ func (t *testOperator) SeriesMetadata(_ context.Context) ([]types.SeriesMetadata func (t *testOperator) NextSeries(_ context.Context) (types.InstantVectorSeriesData, error) { if len(t.data) == 0 { - return types.InstantVectorSeriesData{}, EOS + return types.InstantVectorSeriesData{}, types.EOS } d := t.data[0] diff --git a/pkg/streamingpromql/operator/range_vector_function.go b/pkg/streamingpromql/operator/range_vector_function.go index 8efd8de391d..154242919d6 100644 --- a/pkg/streamingpromql/operator/range_vector_function.go +++ b/pkg/streamingpromql/operator/range_vector_function.go @@ -18,15 +18,15 @@ import ( // RangeVectorFunction performs a rate calculation over a range vector. type RangeVectorFunction struct { - Inner RangeVectorOperator + Inner types.RangeVectorOperator Pool *pooling.LimitingPool numSteps int rangeSeconds float64 - buffer *RingBuffer + buffer *types.RingBuffer } -var _ InstantVectorOperator = &RangeVectorFunction{} +var _ types.InstantVectorOperator = &RangeVectorFunction{} func (m *RangeVectorFunction) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { metadata, err := m.Inner.SeriesMetadata(ctx) @@ -57,7 +57,7 @@ func (m *RangeVectorFunction) NextSeries(ctx context.Context) (types.InstantVect } if m.buffer == nil { - m.buffer = NewRingBuffer(m.Pool) + m.buffer = types.NewRingBuffer(m.Pool) } m.buffer.Reset() @@ -75,7 +75,7 @@ func (m *RangeVectorFunction) NextSeries(ctx context.Context) (types.InstantVect step, err := m.Inner.NextStepSamples(m.buffer) // nolint:errorlint // errors.Is introduces a performance overhead, and NextStepSamples is guaranteed to return exactly EOS, never a wrapped error. - if err == EOS { + if err == types.EOS { return data, nil } else if err != nil { return types.InstantVectorSeriesData{}, err diff --git a/pkg/streamingpromql/operator/range_vector_selector.go b/pkg/streamingpromql/operator/range_vector_selector.go index d398ddaba7e..c7b8158a142 100644 --- a/pkg/streamingpromql/operator/range_vector_selector.go +++ b/pkg/streamingpromql/operator/range_vector_selector.go @@ -27,7 +27,7 @@ type RangeVectorSelector struct { nextT int64 } -var _ RangeVectorOperator = &RangeVectorSelector{} +var _ types.RangeVectorOperator = &RangeVectorSelector{} func (m *RangeVectorSelector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { // Compute value we need on every call to NextSeries() once, here. @@ -56,9 +56,9 @@ func (m *RangeVectorSelector) NextSeries(ctx context.Context) error { return nil } -func (m *RangeVectorSelector) NextStepSamples(floats *RingBuffer) (types.RangeVectorStepData, error) { +func (m *RangeVectorSelector) NextStepSamples(floats *types.RingBuffer) (types.RangeVectorStepData, error) { if m.nextT > m.Selector.End { - return types.RangeVectorStepData{}, EOS + return types.RangeVectorStepData{}, types.EOS } stepT := m.nextT @@ -84,7 +84,7 @@ func (m *RangeVectorSelector) NextStepSamples(floats *RingBuffer) (types.RangeVe }, nil } -func (m *RangeVectorSelector) fillBuffer(floats *RingBuffer, rangeStart, rangeEnd int64) error { +func (m *RangeVectorSelector) fillBuffer(floats *types.RingBuffer, rangeStart, rangeEnd int64) error { // Keep filling the buffer until we reach the end of the range or the end of the iterator. for { valueType := m.chunkIterator.Next() diff --git a/pkg/streamingpromql/operator/selector.go b/pkg/streamingpromql/operator/selector.go index 56a9c2e4fac..a1f5c0d1bba 100644 --- a/pkg/streamingpromql/operator/selector.go +++ b/pkg/streamingpromql/operator/selector.go @@ -90,7 +90,7 @@ func (s *Selector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, func (s *Selector) Next(ctx context.Context, existing chunkenc.Iterator) (chunkenc.Iterator, error) { if s.series.Len() == 0 { - return nil, EOS + return nil, types.EOS } s.seriesIdx++ diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index e8e8c145cc9..3bb4574ecac 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -34,7 +34,7 @@ type Query struct { queryable storage.Queryable opts promql.QueryOpts statement *parser.EvalStmt - root operator.Operator + root types.Operator engine *Engine qs string cancel context.CancelCauseFunc @@ -99,7 +99,7 @@ func newQuery(ctx context.Context, queryable storage.Queryable, opts promql.Quer return q, nil } -func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (operator.InstantVectorOperator, error) { +func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantVectorOperator, error) { if expr.Type() != parser.ValueTypeVector { return nil, fmt.Errorf("cannot create instant vector operator for expression that produces a %s", parser.DocumentedType(expr.Type())) } @@ -211,7 +211,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (operator.Insta } } -func (q *Query) convertToRangeVectorOperator(expr parser.Expr) (operator.RangeVectorOperator, error) { +func (q *Query) convertToRangeVectorOperator(expr parser.Expr) (types.RangeVectorOperator, error) { if expr.Type() != parser.ValueTypeMatrix { return nil, fmt.Errorf("cannot create range vector operator for expression that produces a %s", parser.DocumentedType(expr.Type())) } @@ -280,7 +280,7 @@ func (q *Query) Exec(ctx context.Context) *promql.Result { switch q.statement.Expr.Type() { case parser.ValueTypeMatrix: - v, err := q.populateMatrixFromRangeVectorOperator(ctx, q.root.(operator.RangeVectorOperator), series) + v, err := q.populateMatrixFromRangeVectorOperator(ctx, q.root.(types.RangeVectorOperator), series) if err != nil { return &promql.Result{Err: err} } @@ -288,14 +288,14 @@ func (q *Query) Exec(ctx context.Context) *promql.Result { q.result = &promql.Result{Value: v} case parser.ValueTypeVector: if q.IsInstant() { - v, err := q.populateVectorFromInstantVectorOperator(ctx, q.root.(operator.InstantVectorOperator), series) + v, err := q.populateVectorFromInstantVectorOperator(ctx, q.root.(types.InstantVectorOperator), series) if err != nil { return &promql.Result{Err: err} } q.result = &promql.Result{Value: v} } else { - v, err := q.populateMatrixFromInstantVectorOperator(ctx, q.root.(operator.InstantVectorOperator), series) + v, err := q.populateMatrixFromInstantVectorOperator(ctx, q.root.(types.InstantVectorOperator), series) if err != nil { return &promql.Result{Err: err} } @@ -310,7 +310,7 @@ func (q *Query) Exec(ctx context.Context) *promql.Result { return q.result } -func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o operator.InstantVectorOperator, series []types.SeriesMetadata) (promql.Vector, error) { +func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o types.InstantVectorOperator, series []types.SeriesMetadata) (promql.Vector, error) { ts := timeMilliseconds(q.statement.Start) v, err := q.pool.GetVector(len(series)) if err != nil { @@ -320,7 +320,7 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o o for i, s := range series { d, err := o.NextSeries(ctx) if err != nil { - if errors.Is(err, operator.EOS) { + if errors.Is(err, types.EOS) { return nil, fmt.Errorf("expected %v series, but only received %v", len(series), i) } @@ -356,13 +356,13 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o o return v, nil } -func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o operator.InstantVectorOperator, series []types.SeriesMetadata) (promql.Matrix, error) { +func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o types.InstantVectorOperator, series []types.SeriesMetadata) (promql.Matrix, error) { m := pooling.GetMatrix(len(series)) for i, s := range series { d, err := o.NextSeries(ctx) if err != nil { - if errors.Is(err, operator.EOS) { + if errors.Is(err, types.EOS) { return nil, fmt.Errorf("expected %v series, but only received %v", len(series), i) } @@ -388,15 +388,15 @@ func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o o return m, nil } -func (q *Query) populateMatrixFromRangeVectorOperator(ctx context.Context, o operator.RangeVectorOperator, series []types.SeriesMetadata) (promql.Matrix, error) { +func (q *Query) populateMatrixFromRangeVectorOperator(ctx context.Context, o types.RangeVectorOperator, series []types.SeriesMetadata) (promql.Matrix, error) { m := pooling.GetMatrix(len(series)) - b := operator.NewRingBuffer(q.pool) + b := types.NewRingBuffer(q.pool) defer b.Close() for i, s := range series { err := o.NextSeries(ctx) if err != nil { - if errors.Is(err, operator.EOS) { + if errors.Is(err, types.EOS) { return nil, fmt.Errorf("expected %v series, but only received %v", len(series), i) } diff --git a/pkg/streamingpromql/operator/operator.go b/pkg/streamingpromql/types/operator.go similarity index 90% rename from pkg/streamingpromql/operator/operator.go rename to pkg/streamingpromql/types/operator.go index 65826e2d9db..66719b6fb6d 100644 --- a/pkg/streamingpromql/operator/operator.go +++ b/pkg/streamingpromql/types/operator.go @@ -1,13 +1,11 @@ // SPDX-License-Identifier: AGPL-3.0-only -package operator +package types import ( "context" "errors" "time" - - "github.com/grafana/mimir/pkg/streamingpromql/types" ) // Operator represents all operators. @@ -16,7 +14,7 @@ type Operator interface { // The returned []SeriesMetadata can be modified by the caller or returned to a pool. // SeriesMetadata may return series in any order, but the same order must be used by both SeriesMetadata and NextSeries. // SeriesMetadata should be called no more than once. - SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) + SeriesMetadata(ctx context.Context) ([]SeriesMetadata, error) // Close frees all resources associated with this operator. // Calling SeriesMetadata or NextSeries after calling Close may result in unpredictable behaviour, corruption or crashes. @@ -32,7 +30,7 @@ type InstantVectorOperator interface { // SeriesMetadata must be called exactly once before calling NextSeries. // The returned InstantVectorSeriesData can be modified by the caller or returned to a pool. // The returned InstantVectorSeriesData can contain no points. - NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) + NextSeries(ctx context.Context) (InstantVectorSeriesData, error) } // RangeVectorOperator represents all operators that produce range vectors. @@ -60,7 +58,7 @@ type RangeVectorOperator interface { // The provided RingBuffer may be populated with points beyond the end of the expected time range, and // callers should compare returned points' timestamps to the returned RangeVectorStepData.RangeEnd. // Next must be called at least once before calling NextStepSamples. - NextStepSamples(floats *RingBuffer) (types.RangeVectorStepData, error) + NextStepSamples(floats *RingBuffer) (RangeVectorStepData, error) } var EOS = errors.New("operator stream exhausted") //nolint:revive diff --git a/pkg/streamingpromql/operator/ring_buffer.go b/pkg/streamingpromql/types/ring_buffer.go similarity index 99% rename from pkg/streamingpromql/operator/ring_buffer.go rename to pkg/streamingpromql/types/ring_buffer.go index 003185512d9..bd137e7fc41 100644 --- a/pkg/streamingpromql/operator/ring_buffer.go +++ b/pkg/streamingpromql/types/ring_buffer.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package operator +package types import ( "github.com/prometheus/prometheus/promql" diff --git a/pkg/streamingpromql/operator/ring_buffer_test.go b/pkg/streamingpromql/types/ring_buffer_test.go similarity index 99% rename from pkg/streamingpromql/operator/ring_buffer_test.go rename to pkg/streamingpromql/types/ring_buffer_test.go index 5fc6faae961..9a634c65851 100644 --- a/pkg/streamingpromql/operator/ring_buffer_test.go +++ b/pkg/streamingpromql/types/ring_buffer_test.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package operator +package types import ( "math" From a0c15978f3cf86104048816492b9a45ea38ce374 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 3 Jun 2024 16:05:58 +1000 Subject: [PATCH 2/3] Move operators to `operators` package --- .../{operator => operators}/aggregation.go | 2 +- .../{operator => operators}/aggregation_test.go | 2 +- .../{operator => operators}/binary_operation.go | 2 +- .../binary_operation_test.go | 2 +- .../instant_vector_selector.go | 2 +- .../{operator => operators}/operator_test.go | 2 +- .../range_vector_function.go | 2 +- .../range_vector_selector.go | 2 +- .../{operator => operators}/selector.go | 2 +- .../{operator => operators}/selector_test.go | 2 +- .../{operator => operators}/time.go | 2 +- pkg/streamingpromql/query.go | 16 ++++++++-------- 12 files changed, 19 insertions(+), 19 deletions(-) rename pkg/streamingpromql/{operator => operators}/aggregation.go (99%) rename pkg/streamingpromql/{operator => operators}/aggregation_test.go (99%) rename pkg/streamingpromql/{operator => operators}/binary_operation.go (99%) rename pkg/streamingpromql/{operator => operators}/binary_operation_test.go (99%) rename pkg/streamingpromql/{operator => operators}/instant_vector_selector.go (99%) rename pkg/streamingpromql/{operator => operators}/operator_test.go (97%) rename pkg/streamingpromql/{operator => operators}/range_vector_function.go (99%) rename pkg/streamingpromql/{operator => operators}/range_vector_selector.go (99%) rename pkg/streamingpromql/{operator => operators}/selector.go (99%) rename pkg/streamingpromql/{operator => operators}/selector_test.go (99%) rename pkg/streamingpromql/{operator => operators}/time.go (88%) diff --git a/pkg/streamingpromql/operator/aggregation.go b/pkg/streamingpromql/operators/aggregation.go similarity index 99% rename from pkg/streamingpromql/operator/aggregation.go rename to pkg/streamingpromql/operators/aggregation.go index a620016d943..8c60de326a9 100644 --- a/pkg/streamingpromql/operator/aggregation.go +++ b/pkg/streamingpromql/operators/aggregation.go @@ -3,7 +3,7 @@ // Provenance-includes-license: Apache-2.0 // Provenance-includes-copyright: The Prometheus Authors -package operator +package operators import ( "context" diff --git a/pkg/streamingpromql/operator/aggregation_test.go b/pkg/streamingpromql/operators/aggregation_test.go similarity index 99% rename from pkg/streamingpromql/operator/aggregation_test.go rename to pkg/streamingpromql/operators/aggregation_test.go index 0132252aa7d..8070df6212e 100644 --- a/pkg/streamingpromql/operator/aggregation_test.go +++ b/pkg/streamingpromql/operators/aggregation_test.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package operator +package operators import ( "context" diff --git a/pkg/streamingpromql/operator/binary_operation.go b/pkg/streamingpromql/operators/binary_operation.go similarity index 99% rename from pkg/streamingpromql/operator/binary_operation.go rename to pkg/streamingpromql/operators/binary_operation.go index d4fb2ba1b2f..1d93ab4c02a 100644 --- a/pkg/streamingpromql/operator/binary_operation.go +++ b/pkg/streamingpromql/operators/binary_operation.go @@ -3,7 +3,7 @@ // Provenance-includes-license: Apache-2.0 // Provenance-includes-copyright: The Prometheus Authors -package operator +package operators import ( "context" diff --git a/pkg/streamingpromql/operator/binary_operation_test.go b/pkg/streamingpromql/operators/binary_operation_test.go similarity index 99% rename from pkg/streamingpromql/operator/binary_operation_test.go rename to pkg/streamingpromql/operators/binary_operation_test.go index 61a71c84ce9..413c4162597 100644 --- a/pkg/streamingpromql/operator/binary_operation_test.go +++ b/pkg/streamingpromql/operators/binary_operation_test.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package operator +package operators import ( "context" diff --git a/pkg/streamingpromql/operator/instant_vector_selector.go b/pkg/streamingpromql/operators/instant_vector_selector.go similarity index 99% rename from pkg/streamingpromql/operator/instant_vector_selector.go rename to pkg/streamingpromql/operators/instant_vector_selector.go index c64ab773fd1..de22b7f4bb3 100644 --- a/pkg/streamingpromql/operator/instant_vector_selector.go +++ b/pkg/streamingpromql/operators/instant_vector_selector.go @@ -3,7 +3,7 @@ // Provenance-includes-license: Apache-2.0 // Provenance-includes-copyright: The Prometheus Authors -package operator +package operators import ( "context" diff --git a/pkg/streamingpromql/operator/operator_test.go b/pkg/streamingpromql/operators/operator_test.go similarity index 97% rename from pkg/streamingpromql/operator/operator_test.go rename to pkg/streamingpromql/operators/operator_test.go index aea88a42653..ab0530eb9ff 100644 --- a/pkg/streamingpromql/operator/operator_test.go +++ b/pkg/streamingpromql/operators/operator_test.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package operator +package operators import ( "context" diff --git a/pkg/streamingpromql/operator/range_vector_function.go b/pkg/streamingpromql/operators/range_vector_function.go similarity index 99% rename from pkg/streamingpromql/operator/range_vector_function.go rename to pkg/streamingpromql/operators/range_vector_function.go index 154242919d6..948f0696051 100644 --- a/pkg/streamingpromql/operator/range_vector_function.go +++ b/pkg/streamingpromql/operators/range_vector_function.go @@ -4,7 +4,7 @@ // Provenance-includes-license: Apache-2.0 // Provenance-includes-copyright: The Prometheus Authors -package operator +package operators import ( "context" diff --git a/pkg/streamingpromql/operator/range_vector_selector.go b/pkg/streamingpromql/operators/range_vector_selector.go similarity index 99% rename from pkg/streamingpromql/operator/range_vector_selector.go rename to pkg/streamingpromql/operators/range_vector_selector.go index c7b8158a142..3b834524fd9 100644 --- a/pkg/streamingpromql/operator/range_vector_selector.go +++ b/pkg/streamingpromql/operators/range_vector_selector.go @@ -3,7 +3,7 @@ // Provenance-includes-license: Apache-2.0 // Provenance-includes-copyright: The Prometheus Authors -package operator +package operators import ( "context" diff --git a/pkg/streamingpromql/operator/selector.go b/pkg/streamingpromql/operators/selector.go similarity index 99% rename from pkg/streamingpromql/operator/selector.go rename to pkg/streamingpromql/operators/selector.go index a1f5c0d1bba..42cbcea2579 100644 --- a/pkg/streamingpromql/operator/selector.go +++ b/pkg/streamingpromql/operators/selector.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package operator +package operators import ( "context" diff --git a/pkg/streamingpromql/operator/selector_test.go b/pkg/streamingpromql/operators/selector_test.go similarity index 99% rename from pkg/streamingpromql/operator/selector_test.go rename to pkg/streamingpromql/operators/selector_test.go index 91a94ed9087..7a66893b38b 100644 --- a/pkg/streamingpromql/operator/selector_test.go +++ b/pkg/streamingpromql/operators/selector_test.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package operator +package operators import ( "fmt" diff --git a/pkg/streamingpromql/operator/time.go b/pkg/streamingpromql/operators/time.go similarity index 88% rename from pkg/streamingpromql/operator/time.go rename to pkg/streamingpromql/operators/time.go index bc52c1cd27e..259c2b4d97f 100644 --- a/pkg/streamingpromql/operator/time.go +++ b/pkg/streamingpromql/operators/time.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package operator +package operators func stepCount(start, end, interval int64) int { return int((end-start)/interval) + 1 diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 3bb4574ecac..343d55e5621 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -21,7 +21,7 @@ import ( "golang.org/x/exp/slices" "github.com/grafana/mimir/pkg/streamingpromql/compat" - "github.com/grafana/mimir/pkg/streamingpromql/operator" + "github.com/grafana/mimir/pkg/streamingpromql/operators" "github.com/grafana/mimir/pkg/streamingpromql/pooling" "github.com/grafana/mimir/pkg/streamingpromql/types" ) @@ -121,9 +121,9 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV return nil, compat.NewNotSupportedError("instant vector selector with 'offset'") } - return &operator.InstantVectorSelector{ + return &operators.InstantVectorSelector{ Pool: q.pool, - Selector: &operator.Selector{ + Selector: &operators.Selector{ Queryable: q.queryable, Start: timestamp.FromTime(q.statement.Start), End: timestamp.FromTime(q.statement.End), @@ -154,7 +154,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV return nil, err } - return &operator.Aggregation{ + return &operators.Aggregation{ Inner: inner, Start: q.statement.Start, End: q.statement.End, @@ -177,7 +177,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV return nil, err } - return &operator.RangeVectorFunction{ + return &operators.RangeVectorFunction{ Inner: inner, Pool: q.pool, }, nil @@ -200,7 +200,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV return nil, err } - return operator.NewBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, q.pool) + return operators.NewBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, q.pool) case *parser.StepInvariantExpr: // One day, we'll do something smarter here. return q.convertToInstantVectorOperator(e.Expr) @@ -230,8 +230,8 @@ func (q *Query) convertToRangeVectorOperator(expr parser.Expr) (types.RangeVecto interval = time.Millisecond } - return &operator.RangeVectorSelector{ - Selector: &operator.Selector{ + return &operators.RangeVectorSelector{ + Selector: &operators.Selector{ Queryable: q.queryable, Start: timestamp.FromTime(q.statement.Start), End: timestamp.FromTime(q.statement.End), From 89e873383a75ff61abe1f91255f18b4585b8c07e Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 3 Jun 2024 16:09:27 +1000 Subject: [PATCH 3/3] Add changelog entry --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a4a3b139dd..1298f1862a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ * [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747 * [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653 * [FEATURE] New `-.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 * [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739 * [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698 * [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123