From 3f8d9fb9062ad4601108a99f9317b226e3b141d1 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 1 Mar 2023 15:31:37 +1100 Subject: [PATCH] Add support for decoding responses sent in protobuf format. --- pkg/ruler/remotequerier.go | 4 +- pkg/ruler/remotequerier_decoder.go | 107 ++++++++++- pkg/ruler/remotequerier_test.go | 296 ++++++++++++++++++++++++++++- 3 files changed, 401 insertions(+), 6 deletions(-) diff --git a/pkg/ruler/remotequerier.go b/pkg/ruler/remotequerier.go index e491146e20a..533b274575b 100644 --- a/pkg/ruler/remotequerier.go +++ b/pkg/ruler/remotequerier.go @@ -127,6 +127,7 @@ func NewRemoteQuerier( middlewares ...Middleware, ) *RemoteQuerier { json := jsonDecoder{} + protobuf := protobufDecoder{} return &RemoteQuerier{ client: client, @@ -136,7 +137,8 @@ func NewRemoteQuerier( logger: logger, preferredQueryResultResponseFormat: preferredQueryResultResponseFormat, decoders: map[string]decoder{ - json.ContentType(): json, + json.ContentType(): json, + protobuf.ContentType(): protobuf, }, } } diff --git a/pkg/ruler/remotequerier_decoder.go b/pkg/ruler/remotequerier_decoder.go index 8e5a24bd2c0..4a4c20339c3 100644 --- a/pkg/ruler/remotequerier_decoder.go +++ b/pkg/ruler/remotequerier_decoder.go @@ -10,6 +10,8 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + + "github.com/grafana/mimir/pkg/mimirpb" ) type decoder interface { @@ -34,7 +36,7 @@ func (d jsonDecoder) Decode(body []byte) (promql.Vector, error) { return promql.Vector{}, err } if apiResp.Status == statusError { - return promql.Vector{}, fmt.Errorf("query response error: %s", apiResp.Error) + return promql.Vector{}, fmt.Errorf("query execution failed with error: %s", apiResp.Error) } v := struct { Type model.ValueType `json:"resultType"` @@ -100,3 +102,106 @@ func (jsonDecoder) scalarToPromQLVector(sc *model.Scalar) promql.Vector { Metric: labels.Labels{}, }} } + +type protobufDecoder struct{} + +func (protobufDecoder) ContentType() string { + return mimirpb.QueryResponseMimeType +} + +func (d protobufDecoder) Decode(body []byte) (promql.Vector, error) { + resp := mimirpb.QueryResponse{} + if err := resp.Unmarshal(body); err != nil { + return promql.Vector{}, err + } + + if resp.Status == mimirpb.QueryResponse_ERROR { + return promql.Vector{}, fmt.Errorf("query execution failed with error: %s", resp.Error) + } + + switch data := resp.Data.(type) { + case *mimirpb.QueryResponse_Scalar: + return d.decodeScalar(data.Scalar), nil + case *mimirpb.QueryResponse_Vector: + return d.decodeVector(data.Vector) + default: + return promql.Vector{}, fmt.Errorf("rule result is not a vector or scalar: \"%s\"", d.dataTypeToHumanFriendlyName(resp)) + } +} + +func (d protobufDecoder) decodeScalar(s *mimirpb.ScalarData) promql.Vector { + return promql.Vector{promql.Sample{ + Point: promql.Point{ + V: s.Value, + T: s.TimestampMs, + }, + Metric: labels.Labels{}, + }} +} + +func (d protobufDecoder) decodeVector(v *mimirpb.VectorData) (promql.Vector, error) { + floatSampleCount := len(v.Samples) + samples := make(promql.Vector, floatSampleCount+len(v.Histograms)) + + for i, s := range v.Samples { + m, err := d.metricToLabels(s.Metric) + if err != nil { + return nil, err + } + + samples[i] = promql.Sample{ + Metric: m, + Point: promql.Point{ + V: s.Value, + T: s.TimestampMs, + }, + } + } + + for i, s := range v.Histograms { + m, err := d.metricToLabels(s.Metric) + if err != nil { + return nil, err + } + + samples[floatSampleCount+i] = promql.Sample{ + Metric: m, + Point: promql.Point{ + H: s.Histogram.ToPrometheusModel(), + T: s.TimestampMs, + }, + } + } + + return samples, nil +} + +func (protobufDecoder) metricToLabels(metric []string) (labels.Labels, error) { + if len(metric)%2 != 0 { + return nil, fmt.Errorf("metric is malformed, it contains an odd number of symbols: %d", len(metric)) + } + + labelCount := len(metric) / 2 + m := make([]labels.Label, labelCount) + + for i := 0; i < labelCount; i++ { + m[i] = labels.Label{Name: metric[2*i], Value: metric[2*i+1]} + } + + return m, nil +} + +func (protobufDecoder) dataTypeToHumanFriendlyName(resp mimirpb.QueryResponse) string { + switch resp.Data.(type) { + case *mimirpb.QueryResponse_Scalar: + return "scalar" + case *mimirpb.QueryResponse_String_: + return "string" + case *mimirpb.QueryResponse_Vector: + return "vector" + case *mimirpb.QueryResponse_Matrix: + return "matrix" + default: + return fmt.Sprintf("%T", resp.Data) + } +} diff --git a/pkg/ruler/remotequerier_test.go b/pkg/ruler/remotequerier_test.go index 834f1cffee6..be5f290b65e 100644 --- a/pkg/ruler/remotequerier_test.go +++ b/pkg/ruler/remotequerier_test.go @@ -4,6 +4,7 @@ package ruler import ( "context" + "errors" "fmt" "net/http" "net/url" @@ -14,6 +15,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/gogo/status" "github.com/golang/snappy" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/promql" @@ -21,6 +23,8 @@ import ( "github.com/weaveworks/common/httpgrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" + + "github.com/grafana/mimir/pkg/mimirpb" ) type mockHTTPGRPCClient func(ctx context.Context, req *httpgrpc.HTTPRequest, _ ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) @@ -115,8 +119,9 @@ func TestRemoteQuerier_QueryReq(t *testing.T) { func TestRemoteQuerier_QueryJSONDecoding(t *testing.T) { scenarios := map[string]struct { - body string - expected promql.Vector + body string + expected promql.Vector + expectedError error }{ "vector response with no series": { body: `{ @@ -185,6 +190,21 @@ func TestRemoteQuerier_QueryJSONDecoding(t *testing.T) { }, }, }, + "matrix response": { + body: `{ + "status": "success", + "data": {"resultType":"matrix","result":[]} + }`, + expectedError: errors.New("rule result is not a vector or scalar: \"matrix\""), + }, + "execution error": { + body: `{ + "status": "error", + "errorType": "errorExec", + "error": "something went wrong" + }`, + expectedError: errors.New("query execution failed with error: something went wrong"), + }, } for name, scenario := range scenarios { @@ -202,8 +222,276 @@ func TestRemoteQuerier_QueryJSONDecoding(t *testing.T) { tm := time.Unix(1649092025, 515834) actual, err := q.Query(context.Background(), "qs", tm) - require.NoError(t, err) - require.Equal(t, scenario.expected, actual) + require.Equal(t, scenario.expectedError, err) + + if scenario.expectedError == nil { + require.Equal(t, scenario.expected, actual) + } + }) + } +} + +func TestRemoteQuerier_QueryProtobufDecoding(t *testing.T) { + protobufHistogram := mimirpb.FloatHistogram{ + CounterResetHint: histogram.GaugeType, + Schema: 3, + ZeroThreshold: 1.23, + ZeroCount: 456, + Count: 9001, + Sum: 789.1, + PositiveSpans: []mimirpb.BucketSpan{ + {Offset: 4, Length: 1}, + {Offset: 3, Length: 2}, + }, + NegativeSpans: []mimirpb.BucketSpan{ + {Offset: 7, Length: 3}, + {Offset: 9, Length: 1}, + }, + PositiveBuckets: []float64{100, 200, 300}, + NegativeBuckets: []float64{400, 500, 600, 700}, + } + + promqlHistogram := histogram.FloatHistogram{ + CounterResetHint: histogram.GaugeType, + Schema: 3, + ZeroThreshold: 1.23, + ZeroCount: 456, + Count: 9001, + Sum: 789.1, + PositiveSpans: []histogram.Span{ + {Offset: 4, Length: 1}, + {Offset: 3, Length: 2}, + }, + NegativeSpans: []histogram.Span{ + {Offset: 7, Length: 3}, + {Offset: 9, Length: 1}, + }, + PositiveBuckets: []float64{100, 200, 300}, + NegativeBuckets: []float64{400, 500, 600, 700}, + } + + scenarios := map[string]struct { + body mimirpb.QueryResponse + expected promql.Vector + expectedError error + }{ + "vector response with no series": { + body: mimirpb.QueryResponse{ + Status: mimirpb.QueryResponse_SUCCESS, + Data: &mimirpb.QueryResponse_Vector{ + Vector: &mimirpb.VectorData{}, + }, + }, + expected: promql.Vector{}, + }, + "vector response with one series": { + body: mimirpb.QueryResponse{ + Status: mimirpb.QueryResponse_SUCCESS, + Data: &mimirpb.QueryResponse_Vector{ + Vector: &mimirpb.VectorData{ + Samples: []mimirpb.VectorSample{ + { + Metric: []string{"foo", "bar"}, + TimestampMs: 1649092025515, + Value: 1.23, + }, + }, + }, + }, + }, + expected: promql.Vector{ + { + Metric: labels.FromStrings("foo", "bar"), + Point: promql.Point{T: 1649092025515, V: 1.23}, + }, + }, + }, + "vector response with many series": { + body: mimirpb.QueryResponse{ + Status: mimirpb.QueryResponse_SUCCESS, + Data: &mimirpb.QueryResponse_Vector{ + Vector: &mimirpb.VectorData{ + Samples: []mimirpb.VectorSample{ + { + Metric: []string{"foo", "bar"}, + TimestampMs: 1649092025515, + Value: 1.23, + }, + { + Metric: []string{"bar", "baz"}, + TimestampMs: 1649092025515, + Value: 4.56, + }, + }, + }, + }, + }, + expected: promql.Vector{ + { + Metric: labels.FromStrings("foo", "bar"), + Point: promql.Point{T: 1649092025515, V: 1.23}, + }, + { + Metric: labels.FromStrings("bar", "baz"), + Point: promql.Point{T: 1649092025515, V: 4.56}, + }, + }, + }, + "vector response with many labels": { + body: mimirpb.QueryResponse{ + Status: mimirpb.QueryResponse_SUCCESS, + Data: &mimirpb.QueryResponse_Vector{ + Vector: &mimirpb.VectorData{ + Samples: []mimirpb.VectorSample{ + { + Metric: []string{"bar", "baz", "foo", "blah"}, + TimestampMs: 1649092025515, + Value: 1.23, + }, + }, + }, + }, + }, + expected: promql.Vector{ + { + Metric: labels.FromStrings("bar", "baz", "foo", "blah"), + Point: promql.Point{T: 1649092025515, V: 1.23}, + }, + }, + }, + "vector response with histogram value": { + body: mimirpb.QueryResponse{ + Status: mimirpb.QueryResponse_SUCCESS, + Data: &mimirpb.QueryResponse_Vector{ + Vector: &mimirpb.VectorData{ + Histograms: []mimirpb.VectorHistogram{ + { + Metric: []string{"foo", "baz"}, + TimestampMs: 1649092025515, + Histogram: protobufHistogram, + }, + }, + }, + }, + }, + expected: promql.Vector{ + { + Metric: labels.FromStrings("foo", "baz"), + Point: promql.Point{T: 1649092025515, H: &promqlHistogram}, + }, + }, + }, + "vector response with float and histogram values": { + body: mimirpb.QueryResponse{ + Status: mimirpb.QueryResponse_SUCCESS, + Data: &mimirpb.QueryResponse_Vector{ + Vector: &mimirpb.VectorData{ + Samples: []mimirpb.VectorSample{ + { + Metric: []string{"foo", "baz"}, + TimestampMs: 1649092025515, + Value: 1.23, + }, + }, + Histograms: []mimirpb.VectorHistogram{ + { + Metric: []string{"foo", "bar"}, + TimestampMs: 1649092025515, + Histogram: protobufHistogram, + }, + }, + }, + }, + }, + expected: promql.Vector{ + { + Metric: labels.FromStrings("foo", "baz"), + Point: promql.Point{T: 1649092025515, V: 1.23}, + }, + { + Metric: labels.FromStrings("foo", "bar"), + Point: promql.Point{T: 1649092025515, H: &promqlHistogram}, + }, + }, + }, + "vector response with malformed metric": { + body: mimirpb.QueryResponse{ + Status: mimirpb.QueryResponse_SUCCESS, + Data: &mimirpb.QueryResponse_Vector{ + Vector: &mimirpb.VectorData{ + Samples: []mimirpb.VectorSample{ + { + Metric: []string{"foo", "bar", "baz"}, + TimestampMs: 1649092025515, + Value: 1.23, + }, + }, + }, + }, + }, + expectedError: errors.New("metric is malformed, it contains an odd number of symbols: 3"), + }, + "scalar response": { + body: mimirpb.QueryResponse{ + Status: mimirpb.QueryResponse_SUCCESS, + Data: &mimirpb.QueryResponse_Scalar{ + Scalar: &mimirpb.ScalarData{ + TimestampMs: 1649092025515, + Value: 1.23, + }, + }, + }, + expected: promql.Vector{ + { + Metric: labels.EmptyLabels(), + Point: promql.Point{T: 1649092025515, V: 1.23}, + }, + }, + }, + "matrix response": { + body: mimirpb.QueryResponse{ + Status: mimirpb.QueryResponse_SUCCESS, + Data: &mimirpb.QueryResponse_Matrix{ + Matrix: &mimirpb.MatrixData{}, + }, + }, + expectedError: errors.New("rule result is not a vector or scalar: \"matrix\""), + }, + "execution error": { + body: mimirpb.QueryResponse{ + Status: mimirpb.QueryResponse_ERROR, + ErrorType: mimirpb.QueryResponse_EXECUTION, + Error: "something went wrong", + }, + expectedError: errors.New("query execution failed with error: something went wrong"), + }, + } + + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { + mockClientFn := func(ctx context.Context, req *httpgrpc.HTTPRequest, _ ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + b, err := scenario.body.Marshal() + if err != nil { + return nil, err + } + + return &httpgrpc.HTTPResponse{ + Code: http.StatusOK, + Headers: []*httpgrpc.Header{ + {Key: "Content-Type", Values: []string{mimirpb.QueryResponseMimeType}}, + }, + Body: b, + }, nil + } + q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatProtobuf, "/prometheus", log.NewNopLogger()) + + tm := time.Unix(1649092025, 515834) + actual, err := q.Query(context.Background(), "qs", tm) + require.Equal(t, scenario.expectedError, err) + + if scenario.expectedError == nil { + require.Equal(t, scenario.expected, actual) + } }) } }