Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New internal query result payload format: add support for remote rule evaluation path #4331

Merged
merged 20 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4ce23ac
Push decodeQueryResponse into query.
charleskorn Mar 1, 2023
fa14adb
Extract createRequest method.
charleskorn Mar 1, 2023
0547089
Extract JSON decoding to its own type.
charleskorn Mar 1, 2023
5637a4e
Select decoder based on response content type.
charleskorn Mar 1, 2023
aa38bcf
Add configuration flag to enable requesting the new internal query re…
charleskorn Mar 1, 2023
3818947
Send appropriate Accept header based on configured format.
charleskorn Mar 1, 2023
973b77a
Add support for decoding responses sent in protobuf format.
charleskorn Mar 1, 2023
08437c6
Update changelog.
charleskorn Mar 2, 2023
02d622b
Fix typo in comment.
charleskorn Mar 2, 2023
085a6d4
Add support for encoding responses in the protobuf query result paylo…
charleskorn Mar 2, 2023
9de8d01
Refactor integration test to run a new ruler instance for each test c…
charleskorn Mar 2, 2023
89df038
Add integration test for use of protobuf format between ruler and que…
charleskorn Mar 2, 2023
3fbe179
Add benchmark.
charleskorn Mar 2, 2023
c336e1b
Implement content negotiation correctly.
charleskorn Mar 2, 2023
465c545
Rename types.
charleskorn Mar 2, 2023
3302c9f
Address PR feedback: fix typos
charleskorn Mar 2, 2023
573598b
Address PR feedback: no need to take receiver
charleskorn Mar 3, 2023
f5bc9f8
Address PR feedback: take MIME types from decoders.
charleskorn Mar 3, 2023
e31cb99
Address PR feedback: rename `defaultFormatter`
charleskorn Mar 3, 2023
285b4b2
Fix issue introduced during rebase on `main`.
charleskorn Mar 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ Querying with using `{__mimir_storage__="ephemeral"}` selector no longer works.
* [ENHANCEMENT] Store-gateway: Reduce memory allocation rate when loading TSDB chunks from Memcached. #4074
* [ENHANCEMENT] Query-frontend: track `cortex_frontend_query_response_codec_duration_seconds` and `cortex_frontend_query_response_codec_payload_bytes` metrics to measure the time taken and bytes read / written while encoding and decoding query result payloads. #4110
* [ENHANCEMENT] Alertmanager: expose additional upstream metrics `cortex_alertmanager_dispatcher_aggregation_groups`, `cortex_alertmanager_dispatcher_alert_processing_duration_seconds`. #4151
* [ENHANCEMENT] Querier and query-frontend: add experimental, more performant protobuf query result response format enabled with `-query-frontend.query-result-response-format=protobuf`. #4153 #4304 #4318 #4375
* [ENHANCEMENT] Querier and query-frontend: add experimental, more performant protobuf internal query result response format enabled with `-query-frontend.query-result-response-format=protobuf`. #4153 #4304 #4318 #4375
* [ENHANCEMENT] Query-frontend and ruler: add experimental, more performant protobuf internal query result response format enabled with `-ruler.query-frontend.query-result-response-format=protobuf`. #4331
* [ENHANCEMENT] Store-gateway: use more efficient chunks fetching and caching. This should reduce CPU, memory utilization, and receive bandwidth of a store-gateway. Enable with `-blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled=true`. #4163 #4174 #4227 #4255
* [ENHANCEMENT] Query-frontend: Wait for in-flight queries to finish before shutting down. #4073 #4170
* [ENHANCEMENT] Store-gateway: added `encode` and `other` stage to `cortex_bucket_store_series_request_stage_duration_seconds` metric. #4179
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -9515,6 +9515,17 @@
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "query_result_response_format",
"required": false,
"desc": "Format to use when retrieving query results from query-frontends. Supported values: json, protobuf",
"fieldValue": null,
"fieldDefaultValue": "json",
"fieldFlag": "ruler.query-frontend.query-result-response-format",
"fieldType": "string",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2013,6 +2013,8 @@ Usage of ./cmd/mimir/mimir:
Override the default minimum TLS version. Allowed values: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13
-ruler.query-frontend.grpc-client-config.tls-server-name string
Override the expected name on the server certificate.
-ruler.query-frontend.query-result-response-format string
[experimental] Format to use when retrieving query results from query-frontends. Supported values: json, protobuf (default "json")
-ruler.query-stats-enabled
Report the wall time for ruler queries to complete as a per-tenant metric and as an info level log message.
-ruler.recording-rules-evaluation-enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1571,6 +1571,11 @@ query_frontend:
# ruler.query-frontend.grpc-client-config
[grpc_client_config: <grpc_client>]

# (experimental) Format to use when retrieving query results from
# query-frontends. Supported values: json, protobuf
# CLI flag: -ruler.query-frontend.query-result-response-format
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also mention it in the list of experimental features under docs/sources/mimir/operators-guide/configure/about-versioning.md? It's non blocking, you can do it in a follow up PR too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I've created #4429 to do this.

[query_result_response_format: <string> | default = "json"]

tenant_federation:
# Enable running rule groups against multiple tenants. The tenant IDs involved
# need to be in the rule group's 'source_tenants' field. If this flag is set
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ require (
github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.69.0
github.com/thanos-io/objstore v0.0.0-20230201072718-11ffbc490204
github.com/xlab/treeprint v1.1.0
Expand Down Expand Up @@ -177,7 +178,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/oklog/run v1.1.0 // indirect
Expand Down
43 changes: 33 additions & 10 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,11 +829,12 @@ func TestRulerFederatedRules(t *testing.T) {

func TestRulerRemoteEvaluation(t *testing.T) {
tcs := map[string]struct {
tenantsWithMetrics []string
groupSourceTenants []string
ruleGroupOwner string
ruleExpression string
assertEvalResult func(model.Vector)
tenantsWithMetrics []string
groupSourceTenants []string
ruleGroupOwner string
ruleExpression string
queryResultPayloadFormat string
assertEvalResult func(model.Vector)
}{
"non federated rule group": {
tenantsWithMetrics: []string{"tenant-1"},
Expand All @@ -854,6 +855,16 @@ func TestRulerRemoteEvaluation(t *testing.T) {
require.Equal(t, evalResult[0].Value, model.SampleValue(2))
},
},
"protobuf query result payload format": {
tenantsWithMetrics: []string{"tenant-4"},
ruleGroupOwner: "tenant-4",
ruleExpression: "count(sum_over_time(metric[1h]))",
queryResultPayloadFormat: "protobuf",
assertEvalResult: func(evalResult model.Vector) {
require.Len(t, evalResult, 1)
require.Equal(t, evalResult[0].Value, model.SampleValue(1))
},
},
}

s, err := e2e.NewScenario(networkName)
Expand Down Expand Up @@ -887,20 +898,32 @@ func TestRulerRemoteEvaluation(t *testing.T) {
// Start up services
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
ruler := e2emimir.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)

require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler, querier))
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))
require.NoError(t, s.WaitReady(queryFrontend))

// Wait until both the distributor and ruler are ready
// Wait until the distributor is ready
// The distributor should have 512 tokens for the ingester ring and 1 for the distributor ring
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512+1), "cortex_ring_tokens_total"))
// Ruler will see 512 tokens from ingester, and 128 tokens from itself.
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(512+128), "cortex_ring_tokens_total"))

for tName, tc := range tcs {
t.Run(tName, func(t *testing.T) {
if tc.queryResultPayloadFormat == "" {
delete(flags, "-ruler.query-frontend.query-result-response-format")
} else {
flags["-ruler.query-frontend.query-result-response-format"] = tc.queryResultPayloadFormat
}

ruler := e2emimir.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(ruler))
t.Cleanup(func() {
_ = s.Stop(ruler)
})

// Ruler will see 512 tokens from ingester, and 128 tokens from itself.
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(512+128), "cortex_ring_tokens_total"))

// Generate some series under different tenants
sampleTime := time.Now()
for _, tenantID := range tc.tenantsWithMetrics {
Expand Down
66 changes: 49 additions & 17 deletions pkg/frontend/querymiddleware/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/gogo/status"
"github.com/munnerz/goautoneg"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/weaveworks/common/httpgrpc"
"golang.org/x/exp/slices"

Expand Down Expand Up @@ -72,7 +74,7 @@ type Codec interface {
// EncodeRequest encodes a Request into an http request.
EncodeRequest(context.Context, Request) (*http.Request, error)
// EncodeResponse encodes a Response into an http response.
EncodeResponse(context.Context, Response) (*http.Response, error)
EncodeResponse(context.Context, *http.Request, Response) (*http.Response, error)
}

// Merger is used by middlewares making multiple requests to merge back all responses into a single one.
Expand Down Expand Up @@ -155,11 +157,14 @@ type formatter interface {
EncodeResponse(resp *PrometheusResponse) ([]byte, error)
DecodeResponse([]byte) (*PrometheusResponse, error)
Name() string
ContentType() v1.MIMEType
}

var knownFormats = map[string]formatter{
jsonMimeType: jsonFormat{},
mimirpb.QueryResponseMimeType: protobufFormat{},
var jsonFormatterInstance = jsonFormatter{}

var knownFormats = []formatter{
jsonFormatterInstance,
protobufFormatter{},
}

func NewPrometheusCodec(registerer prometheus.Registerer, queryResultResponseFormat string) Codec {
Expand Down Expand Up @@ -366,20 +371,19 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _
log.LogFields(otlog.Int("bytes", len(buf)))

contentType := r.Header.Get("Content-Type")
f, ok := knownFormats[contentType]

if !ok {
formatter := findFormatter(contentType)
if formatter == nil {
return nil, apierror.Newf(apierror.TypeInternal, "unknown response content type '%v'", contentType)
}

start := time.Now()
resp, err := f.DecodeResponse(buf)
resp, err := formatter.DecodeResponse(buf)
if err != nil {
return nil, apierror.Newf(apierror.TypeInternal, "error decoding response: %v", err)
}

c.metrics.duration.WithLabelValues(operationDecode, f.Name()).Observe(time.Since(start).Seconds())
c.metrics.size.WithLabelValues(operationDecode, f.Name()).Observe(float64(len(buf)))
c.metrics.duration.WithLabelValues(operationDecode, formatter.Name()).Observe(time.Since(start).Seconds())
c.metrics.size.WithLabelValues(operationDecode, formatter.Name()).Observe(float64(len(buf)))

if resp.Status == statusError {
return nil, apierror.New(apierror.Type(resp.ErrorType), resp.Error)
Expand All @@ -391,7 +395,17 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _
return resp, nil
}

func (c prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.Response, error) {
func findFormatter(contentType string) formatter {
for _, f := range knownFormats {
if f.ContentType().String() == contentType {
return f
}
}

return nil
}

func (c prometheusCodec) EncodeResponse(ctx context.Context, req *http.Request, res Response) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse")
defer sp.Finish()

Expand All @@ -403,22 +417,24 @@ func (c prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*htt
sp.LogFields(otlog.Int("series", len(a.Data.Result)))
}

// TODO: select format based on Accept header
f := knownFormats[jsonMimeType]
selectedContentType, formatter := c.negotiateContentType(req.Header.Get("Accept"))
if formatter == nil {
return nil, apierror.New(apierror.TypeNotAcceptable, "none of the content types in the Accept header are supported")
}

start := time.Now()
b, err := f.EncodeResponse(a)
b, err := formatter.EncodeResponse(a)
if err != nil {
return nil, apierror.Newf(apierror.TypeInternal, "error encoding response: %v", err)
}

c.metrics.duration.WithLabelValues(operationEncode, f.Name()).Observe(time.Since(start).Seconds())
c.metrics.size.WithLabelValues(operationEncode, f.Name()).Observe(float64(len(b)))
c.metrics.duration.WithLabelValues(operationEncode, formatter.Name()).Observe(time.Since(start).Seconds())
c.metrics.size.WithLabelValues(operationEncode, formatter.Name()).Observe(float64(len(b)))
sp.LogFields(otlog.Int("bytes", len(b)))

resp := http.Response{
Header: http.Header{
"Content-Type": []string{"application/json"},
"Content-Type": []string{selectedContentType},
},
Body: io.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
Expand All @@ -427,6 +443,22 @@ func (c prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*htt
return &resp, nil
}

func (prometheusCodec) negotiateContentType(acceptHeader string) (string, formatter) {
if acceptHeader == "" {
return jsonMimeType, jsonFormatterInstance
}

for _, clause := range goautoneg.ParseAccept(acceptHeader) {
for _, formatter := range knownFormats {
if formatter.ContentType().Satisfies(clause) {
return formatter.ContentType().String(), formatter
}
}
}

return "", nil
}

func matrixMerge(resps []*PrometheusResponse) []SampleStream {
output := map[string]*SampleStream{}
for _, resp := range resps {
Expand Down
14 changes: 10 additions & 4 deletions pkg/frontend/querymiddleware/codec_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@

package querymiddleware

import v1 "github.com/prometheus/prometheus/web/api/v1"

const jsonMimeType = "application/json"

type jsonFormat struct{}
type jsonFormatter struct{}

func (j jsonFormat) EncodeResponse(resp *PrometheusResponse) ([]byte, error) {
func (j jsonFormatter) EncodeResponse(resp *PrometheusResponse) ([]byte, error) {
return json.Marshal(resp)
}

func (j jsonFormat) DecodeResponse(buf []byte) (*PrometheusResponse, error) {
func (j jsonFormatter) DecodeResponse(buf []byte) (*PrometheusResponse, error) {
var resp PrometheusResponse

if err := json.Unmarshal(buf, &resp); err != nil {
Expand All @@ -23,6 +25,10 @@ func (j jsonFormat) DecodeResponse(buf []byte) (*PrometheusResponse, error) {
return &resp, nil
}

func (j jsonFormat) Name() string {
func (j jsonFormatter) Name() string {
return formatJSON
}

func (j jsonFormatter) ContentType() v1.MIMEType {
return v1.MIMEType{Type: "application", SubType: "json"}
}
11 changes: 9 additions & 2 deletions pkg/frontend/querymiddleware/codec_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,18 @@ func TestPrometheusCodec_JSONResponse(t *testing.T) {
require.Equal(t, uint64(1), *payloadSizeHistogram.SampleCount)
require.Equal(t, float64(len(body)), *payloadSizeHistogram.SampleSum)

httpRequest := &http.Request{
Header: http.Header{"Accept": []string{jsonMimeType}},
}

// Reset response, as the above call will have consumed the body reader.
httpResponse = &http.Response{
StatusCode: 200,
Header: headers,
Body: io.NopCloser(bytes.NewBuffer(body)),
ContentLength: int64(len(body)),
}
encoded, err := codec.EncodeResponse(context.Background(), decoded)
encoded, err := codec.EncodeResponse(context.Background(), httpRequest, decoded)
require.NoError(t, err)

expectedJSON, err := bodyBuffer(httpResponse)
Expand Down Expand Up @@ -354,8 +358,11 @@ func TestPrometheusCodec_JSONEncoding(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
codec := NewPrometheusCodec(reg, formatJSON)
httpRequest := &http.Request{
Header: http.Header{"Accept": []string{jsonMimeType}},
}

encoded, err := codec.EncodeResponse(context.Background(), tc.response)
encoded, err := codec.EncodeResponse(context.Background(), httpRequest, tc.response)
require.NoError(t, err)
require.Equal(t, http.StatusOK, encoded.StatusCode)
require.Equal(t, "application/json", encoded.Header.Get("Content-Type"))
Expand Down
Loading