Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mimir query engine: finish reorganisation started in #8230 #8247

Merged
merged 3 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ flowchart TB
max --> output
```

Each of these operators satisfies the `InstantVectorOperator` interface, defined [here](./operator/operator.go).
Each of these operators satisfies the `InstantVectorOperator` interface, defined [here](./types/operator.go).
The two key methods of this interface are `SeriesMetadata()` and `NextSeries()`:

`SeriesMetadata()` returns the list of all series' labels that will be returned by the operator[^2].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package operator
package operators

import (
"context"
Expand All @@ -22,7 +22,7 @@ import (
)

type Aggregation struct {
Inner InstantVectorOperator
Inner types.InstantVectorOperator
Start time.Time
End time.Time
Interval time.Duration
Expand Down Expand Up @@ -51,7 +51,7 @@ type group struct {
present []bool
}

var _ InstantVectorOperator = &Aggregation{}
var _ types.InstantVectorOperator = &Aggregation{}

var groupPool = zeropool.New(func() *group {
return &group{}
Expand Down Expand Up @@ -125,7 +125,7 @@ func (a *Aggregation) labelsForGroup(m labels.Labels, lb *labels.Builder) labels
func (a *Aggregation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
if len(a.remainingGroups) == 0 {
// No more groups left.
return types.InstantVectorSeriesData{}, EOS
return types.InstantVectorSeriesData{}, types.EOS
}

start := timestamp.FromTime(a.Start)
Expand All @@ -142,7 +142,7 @@ func (a *Aggregation) NextSeries(ctx context.Context) (types.InstantVectorSeries
s, err := a.Inner.NextSeries(ctx)

if err != nil {
if errors.Is(err, EOS) {
if errors.Is(err, types.EOS) {
return types.InstantVectorSeriesData{}, fmt.Errorf("exhausted series before all groups were completed: %w", err)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operator
package operators

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package operator
package operators

import (
"context"
Expand All @@ -25,8 +25,8 @@ import (

// BinaryOperation represents a binary operation between instant vectors such as "<expr> + <expr>" or "<expr> - <expr>".
type BinaryOperation struct {
Left InstantVectorOperator
Right InstantVectorOperator
Left types.InstantVectorOperator
Right types.InstantVectorOperator
Op parser.ItemType
Pool *pooling.LimitingPool

Expand All @@ -45,7 +45,7 @@ type BinaryOperation struct {
opFunc binaryOperationFunc
}

var _ InstantVectorOperator = &BinaryOperation{}
var _ types.InstantVectorOperator = &BinaryOperation{}

type binaryOperationOutputSeries struct {
leftSeriesIndices []int
Expand All @@ -66,7 +66,7 @@ func (s binaryOperationOutputSeries) latestRightSeries() int {
return s.rightSeriesIndices[len(s.rightSeriesIndices)-1]
}

func NewBinaryOperation(left InstantVectorOperator, right InstantVectorOperator, vectorMatching parser.VectorMatching, op parser.ItemType, pool *pooling.LimitingPool) (*BinaryOperation, error) {
func NewBinaryOperation(left types.InstantVectorOperator, right types.InstantVectorOperator, vectorMatching parser.VectorMatching, op parser.ItemType, pool *pooling.LimitingPool) (*BinaryOperation, error) {
opFunc := arithmeticOperationFuncs[op]
if opFunc == nil {
return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%s'", op))
Expand Down Expand Up @@ -331,7 +331,7 @@ func (b *BinaryOperation) labelsFunc() func(labels.Labels) labels.Labels {

func (b *BinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
if len(b.remainingSeries) == 0 {
return types.InstantVectorSeriesData{}, EOS
return types.InstantVectorSeriesData{}, types.EOS
}

thisSeries := b.remainingSeries[0]
Expand Down Expand Up @@ -545,7 +545,7 @@ func (b *BinaryOperation) Close() {
// binary operation are in order B, A, C, binaryOperationSeriesBuffer will buffer the data for series A while series B is
// produced, then return series A when needed.
type binaryOperationSeriesBuffer struct {
source InstantVectorOperator
source types.InstantVectorOperator
nextIndexToRead int

// If seriesUsed[i] == true, then the series at index i is needed for this operation and should be buffered if not used immediately.
Expand All @@ -562,7 +562,7 @@ type binaryOperationSeriesBuffer struct {
output []types.InstantVectorSeriesData
}

func newBinaryOperationSeriesBuffer(source InstantVectorOperator, seriesUsed []bool, pool *pooling.LimitingPool) *binaryOperationSeriesBuffer {
func newBinaryOperationSeriesBuffer(source types.InstantVectorOperator, seriesUsed []bool, pool *pooling.LimitingPool) *binaryOperationSeriesBuffer {
return &binaryOperationSeriesBuffer{
source: source,
seriesUsed: seriesUsed,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operator
package operators

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package operator
package operators

import (
"context"
Expand All @@ -29,7 +29,7 @@ type InstantVectorSelector struct {
memoizedIterator *storage.MemoizedSeriesIterator
}

var _ InstantVectorOperator = &InstantVectorSelector{}
var _ types.InstantVectorOperator = &InstantVectorSelector{}

func (v *InstantVectorSelector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
// Compute value we need on every call to NextSeries() once, here.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operator
package operators

import (
"context"
Expand All @@ -22,7 +22,7 @@ func (t *testOperator) SeriesMetadata(_ context.Context) ([]types.SeriesMetadata

func (t *testOperator) NextSeries(_ context.Context) (types.InstantVectorSeriesData, error) {
if len(t.data) == 0 {
return types.InstantVectorSeriesData{}, EOS
return types.InstantVectorSeriesData{}, types.EOS
}

d := t.data[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package operator
package operators

import (
"context"
Expand All @@ -18,15 +18,15 @@ import (

// RangeVectorFunction performs a rate calculation over a range vector.
type RangeVectorFunction struct {
Inner RangeVectorOperator
Inner types.RangeVectorOperator
Pool *pooling.LimitingPool

numSteps int
rangeSeconds float64
buffer *RingBuffer
buffer *types.RingBuffer
}

var _ InstantVectorOperator = &RangeVectorFunction{}
var _ types.InstantVectorOperator = &RangeVectorFunction{}

func (m *RangeVectorFunction) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
metadata, err := m.Inner.SeriesMetadata(ctx)
Expand Down Expand Up @@ -57,7 +57,7 @@ func (m *RangeVectorFunction) NextSeries(ctx context.Context) (types.InstantVect
}

if m.buffer == nil {
m.buffer = NewRingBuffer(m.Pool)
m.buffer = types.NewRingBuffer(m.Pool)
}

m.buffer.Reset()
Expand All @@ -75,7 +75,7 @@ func (m *RangeVectorFunction) NextSeries(ctx context.Context) (types.InstantVect
step, err := m.Inner.NextStepSamples(m.buffer)

// nolint:errorlint // errors.Is introduces a performance overhead, and NextStepSamples is guaranteed to return exactly EOS, never a wrapped error.
if err == EOS {
if err == types.EOS {
return data, nil
} else if err != nil {
return types.InstantVectorSeriesData{}, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package operator
package operators

import (
"context"
Expand All @@ -27,7 +27,7 @@ type RangeVectorSelector struct {
nextT int64
}

var _ RangeVectorOperator = &RangeVectorSelector{}
var _ types.RangeVectorOperator = &RangeVectorSelector{}

func (m *RangeVectorSelector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
// Compute value we need on every call to NextSeries() once, here.
Expand Down Expand Up @@ -56,9 +56,9 @@ func (m *RangeVectorSelector) NextSeries(ctx context.Context) error {
return nil
}

func (m *RangeVectorSelector) NextStepSamples(floats *RingBuffer) (types.RangeVectorStepData, error) {
func (m *RangeVectorSelector) NextStepSamples(floats *types.RingBuffer) (types.RangeVectorStepData, error) {
if m.nextT > m.Selector.End {
return types.RangeVectorStepData{}, EOS
return types.RangeVectorStepData{}, types.EOS
}

stepT := m.nextT
Expand All @@ -84,7 +84,7 @@ func (m *RangeVectorSelector) NextStepSamples(floats *RingBuffer) (types.RangeVe
}, nil
}

func (m *RangeVectorSelector) fillBuffer(floats *RingBuffer, rangeStart, rangeEnd int64) error {
func (m *RangeVectorSelector) fillBuffer(floats *types.RingBuffer, rangeStart, rangeEnd int64) error {
// Keep filling the buffer until we reach the end of the range or the end of the iterator.
for {
valueType := m.chunkIterator.Next()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operator
package operators

import (
"context"
Expand Down Expand Up @@ -90,7 +90,7 @@ func (s *Selector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata,

func (s *Selector) Next(ctx context.Context, existing chunkenc.Iterator) (chunkenc.Iterator, error) {
if s.series.Len() == 0 {
return nil, EOS
return nil, types.EOS
}

s.seriesIdx++
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operator
package operators

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operator
package operators

func stepCount(start, end, interval int64) int {
return int((end-start)/interval) + 1
Expand Down
Loading
Loading