Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix query frontend regression on release v0.17.0 #3480

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ func registerQueryFrontend(app *extkingpin.App) {
MaxBodySize: 10 * 1024 * 1024,
},
QueryRangeConfig: queryfrontend.QueryRangeConfig{
Limits: &cortexvalidation.Limits{},
ResultsCacheConfig: &queryrange.ResultsCacheConfig{},
Limits: &cortexvalidation.Limits{},
},
LabelsConfig: queryfrontend.LabelsConfig{
Limits: &cortexvalidation.Limits{},
ResultsCacheConfig: &queryrange.ResultsCacheConfig{},
Limits: &cortexvalidation.Limits{},
},
},
}
Expand Down
68 changes: 41 additions & 27 deletions pkg/queryfrontend/labels_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,21 @@ func NewThanosLabelsCodec(partialResponse bool, defaultMetadataTimeRange time.Du
}
}

// MergeResponse merges multiple responses into a single Response. It needs to dedup the responses and ensure the order.
func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) {
if len(responses) == 0 {
// Empty response for label_names, label_values and series API.
return &ThanosLabelsResponse{
Status: queryrange.StatusSuccess,
Data: []string{},
}, nil
}

if len(responses) == 1 {
return responses[0], nil
}

switch responses[0].(type) {
case *ThanosLabelsResponse:
if len(responses) == 1 {
return responses[0], nil
}
set := make(map[string]struct{})

for _, res := range responses {
Expand All @@ -83,25 +84,23 @@ func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange
Data: lbls,
}, nil
case *ThanosSeriesResponse:
seriesData := make([]labelpb.LabelSet, 0)
if len(responses) == 1 {
return responses[0], nil
}
seriesData := make(labelpb.ZLabelSets, 0)

// seriesString is used in soring so we don't have to calculate the string of label sets again.
seriesString := make([]string, 0)
uniqueSeries := make(map[string]struct{})
for _, res := range responses {
for _, series := range res.(*ThanosSeriesResponse).Data {
s := labelpb.LabelsToPromLabels(series.Labels).String()
s := series.PromLabels().String()
if _, ok := uniqueSeries[s]; !ok {
seriesData = append(seriesData, series)
seriesString = append(seriesString, s)
uniqueSeries[s] = struct{}{}
}
}
}

sort.Slice(seriesData, func(i, j int) bool {
return seriesString[i] < seriesString[j]
})
sort.Sort(seriesData)
return &ThanosSeriesResponse{
Status: queryrange.StatusSuccess,
Data: seriesData,
Expand Down Expand Up @@ -134,7 +133,8 @@ func (c labelsCodec) DecodeRequest(_ context.Context, r *http.Request) (queryran
}

func (c labelsCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) {
var u *url.URL
var req *http.Request
var err error
switch thanosReq := r.(type) {
case *ThanosLabelsRequest:
var params = url.Values{
Expand All @@ -145,10 +145,29 @@ func (c labelsCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*
if len(thanosReq.StoreMatchers) > 0 {
params[queryv1.StoreMatcherParam] = matchersToStringSlice(thanosReq.StoreMatchers)
}
u = &url.URL{
Path: thanosReq.Path,
RawQuery: params.Encode(),

// If label is not empty, then it is a label values query.
if thanosReq.Label != "" {
u := &url.URL{
Path: thanosReq.Path,
RawQuery: params.Encode(),
}

req = &http.Request{
Method: http.MethodGet,
RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u,
Body: http.NoBody,
Header: http.Header{},
}
} else {
req, err = http.NewRequest(http.MethodPost, thanosReq.Path, bytes.NewBufferString(params.Encode()))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "error creating request: %s", err.Error())
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
}

case *ThanosSeriesRequest:
var params = url.Values{
"start": []string{encodeTime(thanosReq.Start)},
Expand All @@ -163,22 +182,17 @@ func (c labelsCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*
if len(thanosReq.StoreMatchers) > 0 {
params[queryv1.StoreMatcherParam] = matchersToStringSlice(thanosReq.StoreMatchers)
}
u = &url.URL{
Path: thanosReq.Path,
RawQuery: params.Encode(),

req, err = http.NewRequest(http.MethodPost, thanosReq.Path, bytes.NewBufferString(params.Encode()))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "error creating request: %s", err.Error())
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request format")
}

req := &http.Request{
Method: "GET",
RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u,
Body: http.NoBody,
Header: http.Header{},
}

return req.WithContext(ctx), nil
}

Expand Down
179 changes: 166 additions & 13 deletions pkg/queryfrontend/labels_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,14 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) {
name: "thanos labels names request",
req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/labels"},
checkFunc: func(r *http.Request) bool {
return r.URL.Query().Get(start) == startTime &&
r.URL.Query().Get(end) == endTime &&
return r.FormValue(start) == startTime &&
r.FormValue(end) == endTime &&
r.URL.Path == "/api/v1/labels"
},
},
{
name: "thanos labels values request",
req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values"},
req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", Label: "__name__"},
checkFunc: func(r *http.Request) bool {
return r.URL.Query().Get(start) == startTime &&
r.URL.Query().Get(end) == endTime &&
Expand All @@ -243,7 +243,7 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) {
},
{
name: "thanos labels values request, partial response set to true",
req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", PartialResponse: true},
req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", Label: "__name__", PartialResponse: true},
checkFunc: func(r *http.Request) bool {
return r.URL.Query().Get(start) == startTime &&
r.URL.Query().Get(end) == endTime &&
Expand All @@ -255,8 +255,8 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) {
name: "thanos series request with empty matchers",
req: &ThanosSeriesRequest{Start: 123000, End: 456000, Path: "/api/v1/series"},
checkFunc: func(r *http.Request) bool {
return r.URL.Query().Get(start) == startTime &&
r.URL.Query().Get(end) == endTime &&
return r.FormValue(start) == startTime &&
r.FormValue(end) == endTime &&
r.URL.Path == "/api/v1/series"
},
},
Expand All @@ -269,9 +269,9 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) {
Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "cluster", "test")}},
},
checkFunc: func(r *http.Request) bool {
return r.URL.Query().Get(start) == startTime &&
r.URL.Query().Get(end) == endTime &&
r.URL.Query().Get(queryv1.MatcherParam) == `{cluster="test"}` &&
return r.FormValue(start) == startTime &&
r.FormValue(end) == endTime &&
r.FormValue(queryv1.MatcherParam) == `{cluster="test"}` &&
r.URL.Path == "/api/v1/series"
},
},
Expand All @@ -284,9 +284,9 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) {
Dedup: true,
},
checkFunc: func(r *http.Request) bool {
return r.URL.Query().Get(start) == startTime &&
r.URL.Query().Get(end) == endTime &&
r.URL.Query().Get(queryv1.DedupParam) == "true" &&
return r.FormValue(start) == startTime &&
r.FormValue(end) == endTime &&
r.FormValue(queryv1.DedupParam) == "true" &&
r.URL.Path == "/api/v1/series"
},
},
Expand All @@ -313,12 +313,29 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) {
labelsData, err := json.Marshal(labelResponse)
testutil.Ok(t, err)

labelResponseWithHeaders := &ThanosLabelsResponse{
Status: "success",
Data: []string{"__name__"},
Headers: []*ResponseHeader{{Name: cacheControlHeader, Values: []string{noStoreValue}}},
}
labelsDataWithHeaders, err := json.Marshal(labelResponseWithHeaders)
testutil.Ok(t, err)

seriesResponse := &ThanosSeriesResponse{
Status: "success",
Data: []labelpb.LabelSet{{Labels: []labelpb.Label{{Name: "foo", Value: "bar"}}}},
Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}},
}
seriesData, err := json.Marshal(seriesResponse)
testutil.Ok(t, err)

seriesResponseWithHeaders := &ThanosSeriesResponse{
Status: "success",
Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}},
Headers: []*ResponseHeader{{Name: cacheControlHeader, Values: []string{noStoreValue}}},
}
seriesDataWithHeaders, err := json.Marshal(seriesResponseWithHeaders)
testutil.Ok(t, err)

for _, tc := range []struct {
name string
expectedError error
Expand All @@ -344,12 +361,34 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) {
res: http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(labelsData))},
expectedResponse: labelResponse,
},
{
name: "thanos labels request with HTTP headers",
req: &ThanosLabelsRequest{},
res: http.Response{
StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(labelsDataWithHeaders)),
Header: map[string][]string{
cacheControlHeader: {noStoreValue},
},
},
expectedResponse: labelResponseWithHeaders,
},
{
name: "thanos series request",
req: &ThanosSeriesRequest{},
res: http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(seriesData))},
expectedResponse: seriesResponse,
},
{
name: "thanos series request with HTTP headers",
req: &ThanosSeriesRequest{},
res: http.Response{
StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(seriesDataWithHeaders)),
Header: map[string][]string{
cacheControlHeader: {noStoreValue},
},
},
expectedResponse: seriesResponseWithHeaders,
},
} {
t.Run(tc.name, func(t *testing.T) {
// Default partial response value doesn't matter when encoding requests.
Expand All @@ -364,3 +403,117 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) {
})
}
}

func TestLabelsCodec_MergeResponse(t *testing.T) {
for _, tc := range []struct {
name string
expectedError error
responses []queryrange.Response
expectedResponse queryrange.Response
}{
{
name: "Prometheus range query response format, not valid",
responses: []queryrange.Response{
&queryrange.PrometheusResponse{Status: "success"},
},
expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format"),
},
{
name: "Empty response",
responses: nil,
expectedResponse: &ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}},
},
{
name: "One label response",
responses: []queryrange.Response{
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
},
expectedResponse: &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
},
{
name: "One label response and two empty responses",
responses: []queryrange.Response{
&ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}},
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
&ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}},
},
expectedResponse: &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
},
{
name: "Multiple duplicate label responses",
responses: []queryrange.Response{
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9091", "localhost:9092"}},
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9092", "localhost:9093"}},
},
expectedResponse: &ThanosLabelsResponse{Status: "success",
Data: []string{"localhost:9090", "localhost:9091", "localhost:9092", "localhost:9093"}},
},
// This case shouldn't happen because the responses from Querier are sorted.
{
name: "Multiple unordered label responses",
responses: []queryrange.Response{
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9093", "localhost:9092"}},
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9091", "localhost:9090"}},
},
expectedResponse: &ThanosLabelsResponse{Status: "success",
Data: []string{"localhost:9090", "localhost:9091", "localhost:9092", "localhost:9093"}},
},
{
name: "One series response",
responses: []queryrange.Response{
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
{
name: "One series response and two empty responses",
responses: []queryrange.Response{
&ThanosSeriesResponse{Status: queryrange.StatusSuccess},
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
&ThanosSeriesResponse{Status: queryrange.StatusSuccess},
},
expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
{
name: "Multiple duplicate series responses",
responses: []queryrange.Response{
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
{
name: "Multiple unordered series responses",
responses: []queryrange.Response{
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{
{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}},
{Labels: []labelpb.ZLabel{{Name: "test", Value: "aaa"}, {Name: "instance", Value: "localhost:9090"}}},
}},
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{
{Labels: []labelpb.ZLabel{{Name: "foo", Value: "aaa"}}},
{Labels: []labelpb.ZLabel{{Name: "test", Value: "bbb"}, {Name: "instance", Value: "localhost:9091"}}},
}},
},
expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{
{Labels: []labelpb.ZLabel{{Name: "foo", Value: "aaa"}}},
{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}},
{Labels: []labelpb.ZLabel{{Name: "test", Value: "aaa"}, {Name: "instance", Value: "localhost:9090"}}},
{Labels: []labelpb.ZLabel{{Name: "test", Value: "bbb"}, {Name: "instance", Value: "localhost:9091"}}},
}},
},
} {
t.Run(tc.name, func(t *testing.T) {
// Default partial response value doesn't matter when encoding requests.
codec := NewThanosLabelsCodec(false, time.Hour*2)
r, err := codec.MergeResponse(tc.responses...)
if tc.expectedError != nil {
testutil.Equals(t, err, tc.expectedError)
} else {
testutil.Ok(t, err)
testutil.Equals(t, tc.expectedResponse, r)
}
})
}
}
Loading