Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mimir Query Engine: Add instant-vector functions #8256

Merged
merged 40 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
a297de4
Mimir query engine: Add support for histogram_count function
jhesketh May 31, 2024
6fd0e99
Support new limit pooling
jhesketh Jun 3, 2024
fb6b581
Test mixed data use cases
jhesketh Jun 3, 2024
0752b29
Refactor function calling
jhesketh Jun 3, 2024
3aa7014
Add support for histogram_sum
jhesketh Jun 3, 2024
5b02813
Add in simple function
jhesketh Jun 3, 2024
0fbd7b4
Update CHANGELOG
jhesketh Jun 3, 2024
0565a58
Fix lint
jhesketh Jun 3, 2024
c2e4ae5
Merge branch 'main' into jhesketh/promql
jhesketh Jun 4, 2024
6924f7c
Refactor functions into their own package
jhesketh Jun 4, 2024
c531127
Fix lint
jhesketh Jun 4, 2024
d41ac2d
Remove dropMetricName helper
jhesketh Jun 4, 2024
715dab0
Put series back into pool after count+sum
jhesketh Jun 4, 2024
0caf6e2
Rename function over types
jhesketh Jun 4, 2024
abc09ba
Add support for clamp function
jhesketh Jun 4, 2024
cf98d31
Move clamp into functions package
jhesketh Jun 4, 2024
13afa3f
We support scalars now
jhesketh Jun 4, 2024
2cef2c6
fix lint
jhesketh Jun 4, 2024
b9e9710
Re-arrange functions slightly
jhesketh Jun 5, 2024
88e0705
Rename methods+vars for consistency
jhesketh Jun 5, 2024
076a367
Fix native_histogram tests
jhesketh Jun 5, 2024
2ad5cbe
Ignore histogram with float functions
jhesketh Jun 5, 2024
827b2f7
Add tests for functions
jhesketh Jun 5, 2024
8024f1d
Remove scalar operator for now (along with clamp function)
jhesketh Jun 6, 2024
3c7c56b
Address review feedback
jhesketh Jun 6, 2024
d67f81f
Merge branch 'main' into jhesketh/promql
jhesketh Jun 6, 2024
30a3197
Rename instantVectorFunctionOperatorFactories to instantVectorFunctio…
jhesketh Jun 6, 2024
7da3fa5
Allow registering instantVectorFunctions
jhesketh Jun 6, 2024
881b816
Address review feedback
jhesketh Jun 12, 2024
bfc40d2
Add TestRegisterInstantVectorFunctionOperator
jhesketh Jun 12, 2024
cf89455
Merge branch 'main' into jhesketh/promql
jhesketh Jun 12, 2024
8f40b7d
Fix unit tests
jhesketh Jun 12, 2024
75506d6
Make clearer what are factories
jhesketh Jun 12, 2024
70791a7
Fix lint
jhesketh Jun 12, 2024
361070c
Test extra case
jhesketh Jun 12, 2024
2199afe
Enable trig functions
jhesketh Jun 12, 2024
f484280
Remove redundant tests
jhesketh Jun 12, 2024
5ff8972
Fix doc strings
jhesketh Jun 12, 2024
7b7f169
Make sure tests tidy themselves up
jhesketh Jun 12, 2024
b4f3234
Add check for NaN in acos
jhesketh Jun 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=mimir`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 #8270 #8276 #8277 #8291 #8303 #8340
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=mimir`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 #8270 #8276 #8277 #8291 #8303 #8340 #8256
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Would be good to keep the PRs sorted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends if you want them sorted by order merged, or order opened. I'd argue for merged since this branch merges main back into itself and will be squashed and applied on top of what was 8340.

* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123
Expand Down
105 changes: 105 additions & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// SPDX-License-Identifier: AGPL-3.0-only

package streamingpromql

import (
"fmt"

"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/operators"
"github.com/grafana/mimir/pkg/streamingpromql/pooling"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

type InstantVectorFunctionOperatorFactory func(args []types.Operator, pool *pooling.LimitingPool) (types.InstantVectorOperator, error)

// SingleInputVectorFunctionOperator creates an InstantVectorFunctionOperatorFactory for functions
// that have exactly 1 argument (v instant-vector).
//
// Parameters:
// - name: The name of the function.
// - metadataFunc: The function for handling metadata
// - seriesDataFunc: The function to handle series data
func SingleInputVectorFunctionOperator(name string, metadataFunc functions.SeriesMetadataFunction, seriesDataFunc functions.InstantVectorFunction) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, pool *pooling.LimitingPool) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args))
}

inner, ok := args[0].(types.InstantVectorOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", name, args[0])
}

return &operators.FunctionOverInstantVector{
Inner: inner,
Pool: pool,

MetadataFunc: metadataFunc,
SeriesDataFunc: seriesDataFunc,
}, nil
}
}

// TransformationFunctionOperator creates an InstantVectorFunctionOperatorFactory for functions
// that have exactly 1 argument (v instant-vector), and drop the series __name__ label.
//
// Parameters:
// - name: The name of the function.
// - seriesDataFunc: The function to handle series data
func TransformationFunctionOperator(name string, seriesDataFunc functions.InstantVectorFunction) InstantVectorFunctionOperatorFactory {
return SingleInputVectorFunctionOperator(name, functions.DropSeriesName, seriesDataFunc)
}

// LabelManipulationFunctionOperator creates an InstantVectorFunctionOperator for functions
// that have exactly 1 argument (v instant-vector), and need to manipulate the labels of
// each series without manipulating the returned samples.
// The values of v are passed through.
//
// Parameters:
// - name: The name of the function.
// - metadataFunc: The function for handling metadata
//
// Returns:
//
// An InstantVectorFunctionOperator.
func LabelManipulationFunctionOperator(name string, metadataFunc functions.SeriesMetadataFunction) InstantVectorFunctionOperatorFactory {
return SingleInputVectorFunctionOperator(name, metadataFunc, functions.Passthrough)
}

func rateFunctionOperator(args []types.Operator, pool *pooling.LimitingPool) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for rate, got %v", len(args))
}

inner, ok := args[0].(types.RangeVectorOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected a range vector argument for rate, got %T", args[0])
}

return &operators.FunctionOverRangeVector{
Inner: inner,
Pool: pool,
}, nil
}

// These functions return an instant-vector.
var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{
"acos": TransformationFunctionOperator("acos", functions.Acos),
"histogram_count": TransformationFunctionOperator("histogram_count", functions.HistogramCount),
"histogram_sum": TransformationFunctionOperator("histogram_sum", functions.HistogramSum),
"rate": rateFunctionOperator,
}

func RegisterInstantVectorFunctionOperator(functionName string, functionOperator InstantVectorFunctionOperatorFactory) error {
if _, exists := instantVectorFunctionOperatorFactories[functionName]; exists {
return fmt.Errorf("function '%s' has already been registered", functionName)
}

instantVectorFunctionOperatorFactories[functionName] = functionOperator
return nil
}
45 changes: 45 additions & 0 deletions pkg/streamingpromql/functions/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// SPDX-License-Identifier: AGPL-3.0-only

package functions

import (
"github.com/grafana/mimir/pkg/streamingpromql/pooling"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

type SeriesMetadataFunction func(seriesMetadata []types.SeriesMetadata, pool *pooling.LimitingPool) ([]types.SeriesMetadata, error)

func DropSeriesName(seriesMetadata []types.SeriesMetadata, _ *pooling.LimitingPool) ([]types.SeriesMetadata, error) {
for i := range seriesMetadata {
seriesMetadata[i].Labels = seriesMetadata[i].Labels.DropMetricName()
}

return seriesMetadata, nil
}

type InstantVectorFunction func(seriesData types.InstantVectorSeriesData, pool *pooling.LimitingPool) (types.InstantVectorSeriesData, error)

// floatTransformationFunc is not needed elsewhere, so it is not exported yet
func floatTransformationFunc(transform func(f float64) float64) InstantVectorFunction {
return func(seriesData types.InstantVectorSeriesData, pool *pooling.LimitingPool) (types.InstantVectorSeriesData, error) {
for i := range seriesData.Floats {
seriesData.Floats[i].F = transform(seriesData.Floats[i].F)
}
return seriesData, nil
}
}

func FloatTransformationDropHistogramsFunc(transform func(f float64) float64) InstantVectorFunction {
ft := floatTransformationFunc(transform)
return func(seriesData types.InstantVectorSeriesData, pool *pooling.LimitingPool) (types.InstantVectorSeriesData, error) {
// Functions that do not explicitly mention native histograms in their documentation will ignore histogram samples.
// https://prometheus.io/docs/prometheus/latest/querying/functions
pool.PutHPointSlice(seriesData.Histograms)
seriesData.Histograms = nil
return ft(seriesData, pool)
}
}

func Passthrough(seriesData types.InstantVectorSeriesData, _ *pooling.LimitingPool) (types.InstantVectorSeriesData, error) {
return seriesData, nil
}
87 changes: 87 additions & 0 deletions pkg/streamingpromql/functions/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// SPDX-License-Identifier: AGPL-3.0-only

package functions

import (
"testing"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/streamingpromql/pooling"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

func TestDropSeriesName(t *testing.T) {
seriesMetadata := []types.SeriesMetadata{
{Labels: labels.FromStrings("__name__", "metric_name", "label1", "value1")},
{Labels: labels.FromStrings("__name__", "another_metric", "label2", "value2")},
}

expected := []types.SeriesMetadata{
{Labels: labels.FromStrings("label1", "value1")},
{Labels: labels.FromStrings("label2", "value2")},
}

modifiedMetadata, err := DropSeriesName(seriesMetadata, pooling.NewLimitingPool(0, nil))
require.NoError(t, err)
require.Equal(t, expected, modifiedMetadata)
}

func TestFloatTransformationFunc(t *testing.T) {
transform := func(f float64) float64 { return f * 2 }
transformFunc := floatTransformationFunc(transform)

seriesData := types.InstantVectorSeriesData{
Floats: []promql.FPoint{
{F: 1.0},
{F: 2.5},
},
Histograms: []promql.HPoint{
{H: &histogram.FloatHistogram{Count: 1, Sum: 2}},
},
}

expected := types.InstantVectorSeriesData{
Floats: []promql.FPoint{
{F: 2.0},
{F: 5.0},
},
Histograms: []promql.HPoint{
{H: &histogram.FloatHistogram{Count: 1, Sum: 2}},
},
}

modifiedSeriesData, err := transformFunc(seriesData, pooling.NewLimitingPool(0, nil))
require.NoError(t, err)
require.Equal(t, expected, modifiedSeriesData)
}

func TestFloatTransformationDropHistogramsFunc(t *testing.T) {
transform := func(f float64) float64 { return f * 2 }
transformFunc := FloatTransformationDropHistogramsFunc(transform)

seriesData := types.InstantVectorSeriesData{
Floats: []promql.FPoint{
{F: 1.0},
{F: 2.5},
},
Histograms: []promql.HPoint{
{H: &histogram.FloatHistogram{Count: 1, Sum: 2}},
},
}

expected := types.InstantVectorSeriesData{
Floats: []promql.FPoint{
{F: 2.0},
{F: 5.0},
},
Histograms: nil, // Histograms should be dropped
}

modifiedSeriesData, err := transformFunc(seriesData, pooling.NewLimitingPool(0, nil))
require.NoError(t, err)
require.Equal(t, expected, modifiedSeriesData)
}
9 changes: 9 additions & 0 deletions pkg/streamingpromql/functions/math.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// SPDX-License-Identifier: AGPL-3.0-only

package functions

import (
"math"
)

var Acos = FloatTransformationDropHistogramsFunc(math.Acos)
48 changes: 48 additions & 0 deletions pkg/streamingpromql/functions/native_histograms.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// SPDX-License-Identifier: AGPL-3.0-only

package functions

import (
"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/pooling"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

func HistogramCount(seriesData types.InstantVectorSeriesData, pool *pooling.LimitingPool) (types.InstantVectorSeriesData, error) {
floats, err := pool.GetFPointSlice(len(seriesData.Histograms))
if err != nil {
return types.InstantVectorSeriesData{}, err
}

data := types.InstantVectorSeriesData{
Floats: floats,
}
for _, histogram := range seriesData.Histograms {
data.Floats = append(data.Floats, promql.FPoint{
T: histogram.T,
F: histogram.H.Count,
})
}
pool.PutInstantVectorSeriesData(seriesData)
return data, nil
}

func HistogramSum(seriesData types.InstantVectorSeriesData, pool *pooling.LimitingPool) (types.InstantVectorSeriesData, error) {
floats, err := pool.GetFPointSlice(len(seriesData.Histograms))
if err != nil {
return types.InstantVectorSeriesData{}, err
}

data := types.InstantVectorSeriesData{
Floats: floats,
}
for _, histogram := range seriesData.Histograms {
data.Floats = append(data.Floats, promql.FPoint{
T: histogram.T,
F: histogram.H.Sum,
})
}
pool.PutInstantVectorSeriesData(seriesData)
return data, nil
}
32 changes: 32 additions & 0 deletions pkg/streamingpromql/functions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// SPDX-License-Identifier: AGPL-3.0-only

package streamingpromql

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/streamingpromql/functions"
)

func TestRegisterInstantVectorFunctionOperator(t *testing.T) {
// Register an already existing function
err := RegisterInstantVectorFunctionOperator("acos", LabelManipulationFunctionOperator("acos", functions.DropSeriesName))
require.Error(t, err)
require.Equal(t, "function 'acos' has already been registered", err.Error())

// Register a new function
newFunc := LabelManipulationFunctionOperator("new_function", functions.DropSeriesName)
err = RegisterInstantVectorFunctionOperator("new_function", newFunc)
require.NoError(t, err)
require.Contains(t, instantVectorFunctionOperatorFactories, "new_function")
jhesketh marked this conversation as resolved.
Show resolved Hide resolved

// Register existing function we registered previously
err = RegisterInstantVectorFunctionOperator("new_function", newFunc)
require.Error(t, err)
require.Equal(t, "function 'new_function' has already been registered", err.Error())

// Cleanup changes to instantVectorFunctionOperatorFactories
delete(instantVectorFunctionOperatorFactories, "new_function")
}
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
51 changes: 51 additions & 0 deletions pkg/streamingpromql/operators/function_over_instant_vector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package operators

import (
"context"

"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/pooling"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

// FunctionOverInstantVector performs a function over each series in an instant vector.
type FunctionOverInstantVector struct {
// At the moment no instant-vector promql function takes more than one instant-vector
// as an argument. We can assume this will always be the Inner operator and therefore
// what we use for the SeriesMetadata.
Inner types.InstantVectorOperator
Pool *pooling.LimitingPool

MetadataFunc functions.SeriesMetadataFunction
SeriesDataFunc functions.InstantVectorFunction
}

var _ types.InstantVectorOperator = &FunctionOverInstantVector{}

func (m *FunctionOverInstantVector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
metadata, err := m.Inner.SeriesMetadata(ctx)
if err != nil {
return nil, err
}

return m.MetadataFunc(metadata, m.Pool)
}

func (m *FunctionOverInstantVector) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
series, err := m.Inner.NextSeries(ctx)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

return m.SeriesDataFunc(series, m.Pool)
}

func (m *FunctionOverInstantVector) Close() {
m.Inner.Close()
}
Loading
Loading