Skip to content

Commit

Permalink
support store matchers on labels API
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <yb532204897@gmail.com>

Add more unit tests in proxy store

Signed-off-by: Ben Ye <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed Sep 7, 2020
1 parent d33cb7b commit 8037d3a
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 21 deletions.
21 changes: 14 additions & 7 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ import (
"github.com/thanos-io/thanos/pkg/tracing"
)

const (
dedupParam = "dedup"
partialResponseParam = "partial_response"
maxSourceResolutionParam = "max_source_resolution"
replicaLabelsParam = "replicaLabels[]"
storeMatcherParam = "storeMatch[]"
)

// QueryAPI is an API used by Thanos Query.
type QueryAPI struct {
baseAPI *api.BaseAPI
Expand Down Expand Up @@ -139,7 +147,6 @@ type queryData struct {
}

func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplication bool, _ *api.ApiError) {
const dedupParam = "dedup"
enableDeduplication = true

if val := r.FormValue(dedupParam); val != "" {
Expand All @@ -153,7 +160,6 @@ func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplicatio
}

func (qapi *QueryAPI) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *api.ApiError) {
const replicaLabelsParam = "replicaLabels[]"
if err := r.ParseForm(); err != nil {
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")}
}
Expand All @@ -168,7 +174,6 @@ func (qapi *QueryAPI) parseReplicaLabelsParam(r *http.Request) (replicaLabels []
}

func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers [][]storepb.LabelMatcher, _ *api.ApiError) {
const storeMatcherParam = "storeMatch[]"
if err := r.ParseForm(); err != nil {
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")}
}
Expand All @@ -189,7 +194,6 @@ func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers []
}

func (qapi *QueryAPI) parseDownsamplingParamMillis(r *http.Request, defaultVal time.Duration) (maxResolutionMillis int64, _ *api.ApiError) {
const maxSourceResolutionParam = "max_source_resolution"
maxSourceResolution := 0 * time.Second

val := r.FormValue(maxSourceResolutionParam)
Expand All @@ -211,8 +215,6 @@ func (qapi *QueryAPI) parseDownsamplingParamMillis(r *http.Request, defaultVal t
}

func (qapi *QueryAPI) parsePartialResponseParam(r *http.Request, defaultEnablePartialResponse bool) (enablePartialResponse bool, _ *api.ApiError) {
const partialResponseParam = "partial_response"

// Overwrite the cli flag when provided as a query parameter.
if val := r.FormValue(partialResponseParam); val != "" {
var err error
Expand Down Expand Up @@ -438,7 +440,12 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, nil, 0, enablePartialResponse, false).
storeMatchers, apiErr := qapi.parseStoreMatchersParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, storeMatchers, 0, enablePartialResponse, false).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down
47 changes: 46 additions & 1 deletion pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ func TestParseDownsamplingParamMillis(t *testing.T) {
gate: gate.NewKeeper(nil).NewGate(4),
}
v := url.Values{}
v.Set("max_source_resolution", test.maxSourceResolutionParam)
v.Set(maxSourceResolutionParam, test.maxSourceResolutionParam)
r := http.Request{PostForm: v}

// If no max_source_resolution is specified fit at least 5 samples between steps.
Expand All @@ -1062,6 +1062,51 @@ func TestParseDownsamplingParamMillis(t *testing.T) {
}
}

func TestParseStoreMatchersParam(t *testing.T) {
for i, tc := range []struct {
storeMatchers string
fail bool
result [][]storepb.LabelMatcher
}{
{
storeMatchers: "123",
fail: true,
},
{
storeMatchers: "foo",
fail: false,
result: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "foo"}}},
},
{
storeMatchers: `{__address__="localhost:10905"}`,
fail: false,
result: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "localhost:10905"}}},
},
{
storeMatchers: `{__address__="localhost:10905", cluster="test"}`,
fail: false,
result: [][]storepb.LabelMatcher{{
storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "localhost:10905"},
storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "cluster", Value: "test"},
}},
},
} {
api := QueryAPI{
gate: gate.NewKeeper(nil).NewGate(4),
}
v := url.Values{}
v.Set(storeMatcherParam, tc.storeMatchers)
r := &http.Request{PostForm: v}

storeMatchers, err := api.parseStoreMatchersParam(r)
if !tc.fail {
testutil.Assert(t, reflect.DeepEqual(storeMatchers, tc.result), "case %v: expected %v to be equal to %v", i, storeMatchers, tc.result)
} else {
testutil.NotOk(t, err)
}
}
}

type mockedRulesClient struct {
g map[rulespb.RulesRequest_Type][]*rulespb.RuleGroup
w storage.Warnings
Expand Down
8 changes: 7 additions & 1 deletion pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .

aggrs := aggrsFromFunc(hints.Func)

// TODO: Pass it using the SerieRequest instead of relying on context
// TODO: Pass it using the SeriesRequest instead of relying on context
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers)

resp := &seriesServer{ctx: ctx}
Expand Down Expand Up @@ -333,6 +333,9 @@ func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()

// TODO: Pass it using the SeriesRequest instead of relying on context
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers)

resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: name,
PartialResponseDisabled: !q.partialResponse,
Expand All @@ -356,6 +359,9 @@ func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
defer span.Finish()

// TODO: Pass it using the SeriesRequest instead of relying on context
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers)

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
PartialResponseDisabled: !q.partialResponse,
Start: q.mint,
Expand Down
61 changes: 50 additions & 11 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,10 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
ok, _ = storeMatches(st, r.MinTime, r.MaxTime, storeMatcher, r.Matchers...)
})
if !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out", st))
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out", st))
continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))

// This is used to cancel this stream when one operations takes too long.
seriesCtx, closeSeries := context.WithCancel(gctx)
Expand Down Expand Up @@ -515,7 +515,8 @@ func storeMatches(s Client, mint, maxt int64, storeMatcher [][]storepb.LabelMatc
return false, nil
}
match, err := storeMatchMetadata(s, storeMatcher)
if err != nil || !match {
// Return result here if no matchers set.
if len(matchers) == 0 || err != nil || !match {
return match, err
}
return labelSetsMatch(s.LabelSets(), matchers)
Expand Down Expand Up @@ -587,14 +588,32 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques
*storepb.LabelNamesResponse, error,
) {
var (
warnings []string
names [][]string
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
warnings []string
names [][]string
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
storeDebugMsgs []string
)

for _, st := range s.stores() {
st := st
var ok bool
tracing.DoInSpan(gctx, "store_matches", func(ctx context.Context) {
storeMatcher := [][]storepb.LabelMatcher{}
if ctxVal := ctx.Value(StoreMatcherKey); ctxVal != nil {
if value, ok := ctxVal.([][]storepb.LabelMatcher); ok {
storeMatcher = value
}
}
// We can skip error, we already translated matchers once.
ok, _ = storeMatches(st, r.Start, r.End, storeMatcher)
})
if !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out", st))
continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))

g.Go(func() error {
resp, err := st.LabelNames(gctx, &storepb.LabelNamesRequest{
PartialResponseDisabled: r.PartialResponseDisabled,
Expand Down Expand Up @@ -626,6 +645,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques
return nil, err
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
return &storepb.LabelNamesResponse{
Names: strutil.MergeUnsortedSlices(names...),
Warnings: warnings,
Expand All @@ -637,14 +657,32 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
*storepb.LabelValuesResponse, error,
) {
var (
warnings []string
all [][]string
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
warnings []string
all [][]string
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
storeDebugMsgs []string
)

for _, st := range s.stores() {
store := st
var ok bool
tracing.DoInSpan(gctx, "store_matches", func(ctx context.Context) {
storeMatcher := [][]storepb.LabelMatcher{}
if ctxVal := ctx.Value(StoreMatcherKey); ctxVal != nil {
if value, ok := ctxVal.([][]storepb.LabelMatcher); ok {
storeMatcher = value
}
}
// We can skip error, we already translated matchers once.
ok, _ = storeMatches(st, r.Start, r.End, storeMatcher)
})
if !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out", st))
continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))

g.Go(func() error {
resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{
Label: r.Label,
Expand Down Expand Up @@ -677,6 +715,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
return nil, err
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
return &storepb.LabelValuesResponse{
Values: strutil.MergeUnsortedSlices(all...),
Warnings: warnings,
Expand Down
77 changes: 76 additions & 1 deletion pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,13 @@ func TestProxyStore_LabelValues(t *testing.T) {
Values: []string{"3", "4"},
},
}},
&testClient{StoreClient: &mockedStoreAPI{
RespLabelValues: &storepb.LabelValuesResponse{
Values: []string{"5", "6"},
}},
minTime: timestamp.FromTime(time.Now().Add(-1 * time.Minute)),
maxTime: timestamp.FromTime(time.Now()),
},
}
q := NewProxyStore(nil,
nil,
Expand All @@ -1107,6 +1114,20 @@ func TestProxyStore_LabelValues(t *testing.T) {
testutil.Ok(t, err)
testutil.Assert(t, proto.Equal(req, m1.LastLabelValuesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelValuesReq)

testutil.Equals(t, []string{"1", "2", "3", "4", "5", "6"}, resp.Values)
testutil.Equals(t, 1, len(resp.Warnings))

// Request outside the time range of the last store client.
req = &storepb.LabelValuesRequest{
Label: "a",
PartialResponseDisabled: true,
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(time.Now().Add(-1 * time.Hour)),
}
resp, err = q.LabelValues(ctx, req)
testutil.Ok(t, err)
testutil.Assert(t, proto.Equal(req, m1.LastLabelValuesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelValuesReq)

testutil.Equals(t, []string{"1", "2", "3", "4"}, resp.Values)
testutil.Equals(t, 1, len(resp.Warnings))
}
Expand All @@ -1118,7 +1139,8 @@ func TestProxyStore_LabelNames(t *testing.T) {
title string
storeAPIs []Client

req *storepb.LabelNamesRequest
req *storepb.LabelNamesRequest
storeMatchers [][]storepb.LabelMatcher

expectedNames []string
expectedErr error
Expand Down Expand Up @@ -1197,6 +1219,56 @@ func TestProxyStore_LabelNames(t *testing.T) {
expectedNames: []string{"a", "b"},
expectedWarningsLen: 1,
},
{
title: "stores filtered by time range",
storeAPIs: []Client{
&testClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"a", "b"},
},
},
minTime: timestamp.FromTime(time.Now().Add(-4 * time.Hour)),
maxTime: timestamp.FromTime(time.Now().Add(-3 * time.Hour)),
},
&testClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"c", "d"},
},
},
minTime: timestamp.FromTime(time.Now().Add(-2 * time.Hour)),
maxTime: timestamp.FromTime(time.Now().Add(-1 * time.Hour)),
},
},
req: &storepb.LabelNamesRequest{
Start: timestamp.FromTime(time.Now().Add(-1 * time.Minute)),
End: timestamp.FromTime(time.Now()),
PartialResponseDisabled: false,
},
expectedNames: nil,
expectedWarningsLen: 0,
},
{
title: "store matchers specified",
storeAPIs: []Client{
&testClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"a", "b"},
},
},
},
},
req: &storepb.LabelNamesRequest{
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
PartialResponseDisabled: false,
},
storeMatchers: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "foo"}}},
expectedNames: nil,
expectedWarningsLen: 0,
},
} {
if ok := t.Run(tc.title, func(t *testing.T) {
q := NewProxyStore(
Expand All @@ -1209,6 +1281,9 @@ func TestProxyStore_LabelNames(t *testing.T) {
)

ctx := context.Background()
if len(tc.storeMatchers) > 0 {
ctx = context.WithValue(ctx, StoreMatcherKey, tc.storeMatchers)
}
resp, err := q.LabelNames(ctx, tc.req)
if tc.expectedErr != nil {
testutil.NotOk(t, err)
Expand Down

0 comments on commit 8037d3a

Please sign in to comment.