Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed Sep 8, 2020
1 parent 8e51bb6 commit 15f4951
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 56 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#3133](https://github.com/thanos-io/thanos/pull/3133) Query: Allow passing a `storeMatch[]` to Labels APIs. Also supports time range metadata based store filtering.
- [#3133](https://github.com/thanos-io/thanos/pull/3133) Query: Allow passing a `storeMatch[]` to Labels APIs. Also time range metadata based store filtering is supported on Labels APIs.

## [v0.15.0](https://github.com/thanos-io/thanos/releases) - 2020.09.07

Expand Down
30 changes: 15 additions & 15 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ import (
)

const (
dedupParam = "dedup"
partialResponseParam = "partial_response"
maxSourceResolutionParam = "max_source_resolution"
replicaLabelsParam = "replicaLabels[]"
storeMatcherParam = "storeMatch[]"
DedupParam = "dedup"
PartialResponseParam = "partial_response"
MaxSourceResolutionParam = "max_source_resolution"
ReplicaLabelsParam = "replicaLabels[]"
StoreMatcherParam = "storeMatch[]"
)

// QueryAPI is an API used by Thanos Query.
Expand Down Expand Up @@ -149,11 +149,11 @@ type queryData struct {
func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplication bool, _ *api.ApiError) {
enableDeduplication = true

if val := r.FormValue(dedupParam); val != "" {
if val := r.FormValue(DedupParam); val != "" {
var err error
enableDeduplication, err = strconv.ParseBool(val)
if err != nil {
return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", dedupParam)}
return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", DedupParam)}
}
}
return enableDeduplication, nil
Expand All @@ -166,8 +166,8 @@ func (qapi *QueryAPI) parseReplicaLabelsParam(r *http.Request) (replicaLabels []

replicaLabels = qapi.replicaLabels
// Overwrite the cli flag when provided as a query parameter.
if len(r.Form[replicaLabelsParam]) > 0 {
replicaLabels = r.Form[replicaLabelsParam]
if len(r.Form[ReplicaLabelsParam]) > 0 {
replicaLabels = r.Form[ReplicaLabelsParam]
}

return replicaLabels, nil
Expand All @@ -178,7 +178,7 @@ func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers []
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")}
}

for _, s := range r.Form[storeMatcherParam] {
for _, s := range r.Form[StoreMatcherParam] {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
Expand All @@ -196,32 +196,32 @@ func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers []
func (qapi *QueryAPI) parseDownsamplingParamMillis(r *http.Request, defaultVal time.Duration) (maxResolutionMillis int64, _ *api.ApiError) {
maxSourceResolution := 0 * time.Second

val := r.FormValue(maxSourceResolutionParam)
val := r.FormValue(MaxSourceResolutionParam)
if qapi.enableAutodownsampling || (val == "auto") {
maxSourceResolution = defaultVal
}
if val != "" && val != "auto" {
var err error
maxSourceResolution, err = parseDuration(val)
if err != nil {
return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", maxSourceResolutionParam)}
return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", MaxSourceResolutionParam)}
}
}

if maxSourceResolution < 0 {
return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("negative '%s' is not accepted. Try a positive integer", maxSourceResolutionParam)}
return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("negative '%s' is not accepted. Try a positive integer", MaxSourceResolutionParam)}
}

return int64(maxSourceResolution / time.Millisecond), nil
}

func (qapi *QueryAPI) parsePartialResponseParam(r *http.Request, defaultEnablePartialResponse bool) (enablePartialResponse bool, _ *api.ApiError) {
// Overwrite the cli flag when provided as a query parameter.
if val := r.FormValue(partialResponseParam); val != "" {
if val := r.FormValue(PartialResponseParam); val != "" {
var err error
defaultEnablePartialResponse, err = strconv.ParseBool(val)
if err != nil {
return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", partialResponseParam)}
return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", PartialResponseParam)}
}
}
return defaultEnablePartialResponse, nil
Expand Down
28 changes: 15 additions & 13 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ func TestParseDownsamplingParamMillis(t *testing.T) {
gate: gate.NewKeeper(nil).NewGate(4),
}
v := url.Values{}
v.Set(maxSourceResolutionParam, test.maxSourceResolutionParam)
v.Set(MaxSourceResolutionParam, test.maxSourceResolutionParam)
r := http.Request{PostForm: v}

// If no max_source_resolution is specified fit at least 5 samples between steps.
Expand Down Expand Up @@ -1099,19 +1099,21 @@ func TestParseStoreMatchersParam(t *testing.T) {
}},
},
} {
api := QueryAPI{
gate: gate.NewKeeper(nil).NewGate(4),
}
v := url.Values{}
v.Set(storeMatcherParam, tc.storeMatchers)
r := &http.Request{PostForm: v}
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
api := QueryAPI{
gate: gate.NewKeeper(nil).NewGate(4),
}
v := url.Values{}
v.Set(StoreMatcherParam, tc.storeMatchers)
r := &http.Request{PostForm: v}

storeMatchers, err := api.parseStoreMatchersParam(r)
if !tc.fail {
testutil.Assert(t, reflect.DeepEqual(storeMatchers, tc.result), "case %v: expected %v to be equal to %v", i, storeMatchers, tc.result)
} else {
testutil.NotOk(t, err)
}
storeMatchers, err := api.parseStoreMatchersParam(r)
if !tc.fail {
testutil.Equals(t, tc.result, storeMatchers)
} else {
testutil.NotOk(t, err)
}
})
}
}

Expand Down
43 changes: 22 additions & 21 deletions pkg/queryfrontend/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"

queryv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
Expand Down Expand Up @@ -83,7 +84,7 @@ func (c codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Req
return nil, errStepTooSmall
}

result.Dedup, err = parseEnableDedupParam(r.FormValue("dedup"))
result.Dedup, err = parseEnableDedupParam(r.FormValue(queryv1.DedupParam))
if err != nil {
return nil, err
}
Expand All @@ -92,22 +93,22 @@ func (c codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Req
result.AutoDownsampling = true
result.MaxSourceResolution = result.Step / 5
} else {
result.MaxSourceResolution, err = parseDownsamplingParamMillis(r.FormValue("max_source_resolution"))
result.MaxSourceResolution, err = parseDownsamplingParamMillis(r.FormValue(queryv1.MaxSourceResolutionParam))
if err != nil {
return nil, err
}
}

result.PartialResponse, err = parsePartialResponseParam(r.FormValue("partial_response"), c.partialResponse)
result.PartialResponse, err = parsePartialResponseParam(r.FormValue(queryv1.PartialResponseParam), c.partialResponse)
if err != nil {
return nil, err
}

if len(r.Form["replicaLabels[]"]) > 0 {
result.ReplicaLabels = r.Form["replicaLabels[]"]
if len(r.Form[queryv1.ReplicaLabelsParam]) > 0 {
result.ReplicaLabels = r.Form[queryv1.ReplicaLabelsParam]
}

result.StoreMatchers, err = parseStoreMatchersParam(r.Form["storeMatch[]"])
result.StoreMatchers, err = parseStoreMatchersParam(r.Form[queryv1.StoreMatcherParam])
if err != nil {
return nil, err
}
Expand All @@ -131,29 +132,29 @@ func (c codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.R
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format")
}
params := url.Values{
"start": []string{encodeTime(thanosReq.Start)},
"end": []string{encodeTime(thanosReq.End)},
"step": []string{encodeDurationMillis(thanosReq.Step)},
"query": []string{thanosReq.Query},
"dedup": []string{strconv.FormatBool(thanosReq.Dedup)},
"partial_response": []string{strconv.FormatBool(thanosReq.PartialResponse)},
"replicaLabels[]": thanosReq.ReplicaLabels,
"start": []string{encodeTime(thanosReq.Start)},
"end": []string{encodeTime(thanosReq.End)},
"step": []string{encodeDurationMillis(thanosReq.Step)},
"query": []string{thanosReq.Query},
queryv1.DedupParam: []string{strconv.FormatBool(thanosReq.Dedup)},
queryv1.PartialResponseParam: []string{strconv.FormatBool(thanosReq.PartialResponse)},
queryv1.ReplicaLabelsParam: thanosReq.ReplicaLabels,
}

if thanosReq.AutoDownsampling {
params["max_source_resolution"] = []string{"auto"}
params[queryv1.MaxSourceResolutionParam] = []string{"auto"}
} else if thanosReq.MaxSourceResolution != 0 {
// Add this param only if it is set. Set to 0 will impact
// auto-downsampling in the querier.
params["max_source_resolution"] = []string{encodeDurationMillis(thanosReq.MaxSourceResolution)}
params[queryv1.MaxSourceResolutionParam] = []string{encodeDurationMillis(thanosReq.MaxSourceResolution)}
}

if len(thanosReq.StoreMatchers) > 0 {
storeMatchers, err := matchersToStringSlice(thanosReq.StoreMatchers)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format")
}
params["storeMatch[]"] = storeMatchers
params[queryv1.StoreMatcherParam] = storeMatchers
}

u := &url.URL{
Expand Down Expand Up @@ -191,7 +192,7 @@ func parseEnableDedupParam(s string) (bool, error) {
var err error
enableDeduplication, err = strconv.ParseBool(s)
if err != nil {
return enableDeduplication, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "dedup")
return enableDeduplication, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.DedupParam)
}
}

Expand All @@ -204,7 +205,7 @@ func parseDownsamplingParamMillis(s string) (int64, error) {
var err error
maxSourceResolution, err = parseDurationMillis(s)
if err != nil {
return maxSourceResolution, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "max_source_resolution")
return maxSourceResolution, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.MaxSourceResolutionParam)
}
}

Expand All @@ -220,7 +221,7 @@ func parsePartialResponseParam(s string, defaultEnablePartialResponse bool) (boo
var err error
defaultEnablePartialResponse, err = strconv.ParseBool(s)
if err != nil {
return defaultEnablePartialResponse, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "partial_response")
return defaultEnablePartialResponse, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.PartialResponseParam)
}
}

Expand All @@ -232,11 +233,11 @@ func parseStoreMatchersParam(ss []string) ([][]storepb.LabelMatcher, error) {
for _, s := range ss {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "storeMatch[]")
return nil, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.StoreMatcherParam)
}
stm, err := storepb.TranslatePromMatchers(matchers...)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "storeMatch[]")
return nil, httpgrpc.Errorf(http.StatusBadRequest, queryv1.StoreMatcherParam)
}
storeMatchers = append(storeMatchers, stm)
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/queryfrontend/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"testing"

"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/weaveworks/common/httpgrpc"

queryv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/weaveworks/common/httpgrpc"
)

func TestCodec_DecodeRequest(t *testing.T) {
Expand Down Expand Up @@ -218,7 +220,7 @@ func TestCodec_EncodeRequest(t *testing.T) {
return r.URL.Query().Get("start") == "123" &&
r.URL.Query().Get("end") == "456" &&
r.URL.Query().Get("step") == "1" &&
r.URL.Query().Get("dedup") == "true"
r.URL.Query().Get(queryv1.DedupParam) == "true"
},
},
{
Expand All @@ -233,7 +235,7 @@ func TestCodec_EncodeRequest(t *testing.T) {
return r.URL.Query().Get("start") == "123" &&
r.URL.Query().Get("end") == "456" &&
r.URL.Query().Get("step") == "1" &&
r.URL.Query().Get("partial_response") == "true"
r.URL.Query().Get(queryv1.PartialResponseParam) == "true"
},
},
{
Expand All @@ -248,7 +250,7 @@ func TestCodec_EncodeRequest(t *testing.T) {
return r.URL.Query().Get("start") == "123" &&
r.URL.Query().Get("end") == "456" &&
r.URL.Query().Get("step") == "1" &&
r.URL.Query().Get("max_source_resolution") == "300"
r.URL.Query().Get(queryv1.MaxSourceResolutionParam) == "300"
},
},
{
Expand All @@ -263,7 +265,7 @@ func TestCodec_EncodeRequest(t *testing.T) {
return r.URL.Query().Get("start") == "123" &&
r.URL.Query().Get("end") == "456" &&
r.URL.Query().Get("step") == "1" &&
r.URL.Query().Get("max_source_resolution") == "3600"
r.URL.Query().Get(queryv1.MaxSourceResolutionParam) == "3600"
},
},
} {
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
ok, _ = storeMatches(st, r.MinTime, r.MaxTime, storeMatcher, r.Matchers...)
})
if !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out", st))
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out", st))
continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
Expand Down

0 comments on commit 15f4951

Please sign in to comment.