Skip to content

Commit

Permalink
Add support for decoding responses sent in protobuf format.
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskorn committed Mar 1, 2023
1 parent 62797e1 commit 499aa59
Show file tree
Hide file tree
Showing 3 changed files with 401 additions and 6 deletions.
4 changes: 3 additions & 1 deletion pkg/ruler/remotequerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func NewRemoteQuerier(
middlewares ...Middleware,
) *RemoteQuerier {
json := jsonDecoder{}
protobuf := protobufDecoder{}

return &RemoteQuerier{
client: client,
Expand All @@ -136,7 +137,8 @@ func NewRemoteQuerier(
logger: logger,
preferredQueryResultResponseFormat: preferredQueryResultResponseFormat,
decoders: map[string]decoder{
json.ContentType(): json,
json.ContentType(): json,
protobuf.ContentType(): protobuf,
},
}
}
Expand Down
107 changes: 106 additions & 1 deletion pkg/ruler/remotequerier_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/mimirpb"
)

type decoder interface {
Expand All @@ -34,7 +36,7 @@ func (d jsonDecoder) Decode(body []byte) (promql.Vector, error) {
return promql.Vector{}, err
}
if apiResp.Status == statusError {
return promql.Vector{}, fmt.Errorf("query response error: %s", apiResp.Error)
return promql.Vector{}, fmt.Errorf("query execution failed with error: %s", apiResp.Error)
}
v := struct {
Type model.ValueType `json:"resultType"`
Expand Down Expand Up @@ -100,3 +102,106 @@ func (jsonDecoder) scalarToPromQLVector(sc *model.Scalar) promql.Vector {
Metric: labels.Labels{},
}}
}

type protobufDecoder struct{}

func (protobufDecoder) ContentType() string {
return mimirpb.QueryResponseMimeType
}

func (d protobufDecoder) Decode(body []byte) (promql.Vector, error) {
resp := mimirpb.QueryResponse{}
if err := resp.Unmarshal(body); err != nil {
return promql.Vector{}, err
}

if resp.Status == mimirpb.QueryResponse_ERROR {
return promql.Vector{}, fmt.Errorf("query execution failed with error: %s", resp.Error)
}

switch data := resp.Data.(type) {
case *mimirpb.QueryResponse_Scalar:
return d.decodeScalar(data.Scalar), nil
case *mimirpb.QueryResponse_Vector:
return d.decodeVector(data.Vector)
default:
return promql.Vector{}, fmt.Errorf("rule result is not a vector or scalar: \"%s\"", d.dataTypeToHumanFriendlyName(resp))
}
}

func (d protobufDecoder) decodeScalar(s *mimirpb.ScalarData) promql.Vector {
return promql.Vector{promql.Sample{
Point: promql.Point{
V: s.Value,
T: s.TimestampMs,
},
Metric: labels.Labels{},
}}
}

func (d protobufDecoder) decodeVector(v *mimirpb.VectorData) (promql.Vector, error) {
floatSampleCount := len(v.Samples)
samples := make(promql.Vector, floatSampleCount+len(v.Histograms))

for i, s := range v.Samples {
m, err := d.metricToLabels(s.Metric)
if err != nil {
return nil, err
}

samples[i] = promql.Sample{
Metric: m,
Point: promql.Point{
V: s.Value,
T: s.TimestampMs,
},
}
}

for i, s := range v.Histograms {
m, err := d.metricToLabels(s.Metric)
if err != nil {
return nil, err
}

samples[floatSampleCount+i] = promql.Sample{
Metric: m,
Point: promql.Point{
H: s.Histogram.ToPrometheusModel(),
T: s.TimestampMs,
},
}
}

return samples, nil
}

func (protobufDecoder) metricToLabels(metric []string) (labels.Labels, error) {
if len(metric)%2 != 0 {
return nil, fmt.Errorf("metric is malformed, it contains an odd number of symbols: %d", len(metric))
}

labelCount := len(metric) / 2
m := make([]labels.Label, labelCount)

for i := 0; i < labelCount; i++ {
m[i] = labels.Label{Name: metric[2*i], Value: metric[2*i+1]}
}

return m, nil
}

func (protobufDecoder) dataTypeToHumanFriendlyName(resp mimirpb.QueryResponse) string {
switch resp.Data.(type) {
case *mimirpb.QueryResponse_Scalar:
return "scalar"
case *mimirpb.QueryResponse_String_:
return "string"
case *mimirpb.QueryResponse_Vector:
return "vector"
case *mimirpb.QueryResponse_Matrix:
return "matrix"
default:
return fmt.Sprintf("%T", resp.Data)
}
}
Loading

0 comments on commit 499aa59

Please sign in to comment.