diff --git a/CHANGELOG.md b/CHANGELOG.md index 9427884e4ce..4e696324c19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,7 +24,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Added -- [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page +- [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page. - [#2305](https://github.com/thanos-io/thanos/pull/2305) Receive,Sidecar,Ruler: Propagate correct (stricter) MinTime for no-block TSDBs. - [#2892](https://github.com/thanos-io/thanos/pull/2892) Receive: Receiver fails when the initial upload fails. @@ -32,10 +32,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2893](https://github.com/thanos-io/thanos/pull/2893) Store: Rename metric `thanos_bucket_store_cached_postings_compression_time_seconds` to `thanos_bucket_store_cached_postings_compression_time_seconds_total`. - [#2915](https://github.com/thanos-io/thanos/pull/2915) Receive,Ruler: Enable TSDB directory locking by default. Add a new flag (`--tsdb.no-lockfile`) to override behavior. - -### Changed - - [#2902](https://github.com/thanos-io/thanos/pull/2902) ui: React: Separate dedupe and partial response checkboxes per panel. +- [#2931](https://github.com/thanos-io/thanos/pull/2931) Query: Allow passing a `storeMatcher[]` to select matching stores when debugging the querier. ## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10 diff --git a/docs/components/query.md b/docs/components/query.md index 04757871abd..32a5e393341 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -222,6 +222,24 @@ Thanos Querier has the ability to perform concurrent select request per query. I The maximum number of concurrent requests are being made per query is controller by `query.max-concurrent-select` flag. Keep in mind that the maximum number of concurrent queries that are handled by querier is controlled by `query.max-concurrent`. Please consider implications of combined value while tuning the querier. +### Store filtering + +It's possible to provide a set of matchers to the Querier api to select specific stores to be used during the query using the `storeMatch[]` parameter. It is useful when debugging a slow/broken store. It uses the same format as the matcher of [Prometheus' federate api](https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers). Note that at the moment the querier only support the `__address__` which contains the address of the store. + +Example: +``` +- targets: + - prometheus-foo.thanos-sidecar:10901 + - prometheus-bar.thanos-sidecar:10901 +``` + +``` +http://localhost:10901/api/v1/query?query=up&dedup=true&partial_response=true&storeMatch={__address__=~"rometheus-foo.*"} +``` + +Will only return metrics from `prometheus-foo.thanos-sidecar:10901` + + ## Expose UI on a sub-path It is possible to expose thanos-query UI and optionally API on a sub-path. diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index 73761c4bd77..7b3733bb734 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -269,6 +269,27 @@ func (api *API) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string return replicaLabels, nil } +func (api *API) parseStoreMatchersParam(r *http.Request) (storeMatchers [][]storepb.LabelMatcher, _ *ApiError) { + const storeMatcherParam = "storeMatch[]" + if err := r.ParseForm(); err != nil { + return nil, &ApiError{ErrorInternal, errors.Wrap(err, "parse form")} + } + + for _, s := range r.Form[storeMatcherParam] { + matchers, err := parser.ParseMetricSelector(s) + if err != nil { + return nil, &ApiError{errorBadData, err} + } + stm, err := storepb.TranslatePromMatchers(matchers...) + if err != nil { + return nil, &ApiError{errorBadData, errors.Wrap(err, "convert store matchers")} + } + storeMatchers = append(storeMatchers, stm) + } + + return storeMatchers, nil +} + func (api *API) parseDownsamplingParamMillis(r *http.Request, defaultVal time.Duration) (maxResolutionMillis int64, _ *ApiError) { const maxSourceResolutionParam = "max_source_resolution" maxSourceResolution := 0 * time.Second @@ -343,6 +364,11 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } + storeMatchers, apiErr := api.parseStoreMatchersParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + enablePartialResponse, apiErr := api.parsePartialResponseParam(r, api.enableQueryPartialResponse) if apiErr != nil { return nil, nil, apiErr @@ -357,7 +383,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) { span, ctx := tracing.StartSpan(ctx, "promql_instant_query") defer span.Finish() - qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts) + qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, storeMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts) if err != nil { return nil, nil, &ApiError{errorBadData, err} } @@ -442,6 +468,11 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } + storeMatchers, apiErr := api.parseStoreMatchersParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + // If no max_source_resolution is specified fit at least 5 samples between steps. maxSourceResolution, apiErr := api.parseDownsamplingParamMillis(r, step/5) if apiErr != nil { @@ -458,7 +489,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) { defer span.Finish() qry, err := api.queryEngine.NewRangeQuery( - api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, false), + api.queryableCreate(enableDedup, replicaLabels, storeMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), start, end, @@ -506,7 +537,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } - q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := api.queryableCreate(true, nil, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64) if err != nil { return nil, nil, &ApiError{errorExec, err} } @@ -581,12 +612,17 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } + storeMatchers, apiErr := api.parseStoreMatchersParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + enablePartialResponse, apiErr := api.parsePartialResponseParam(r, api.enableQueryPartialResponse) if apiErr != nil { return nil, nil, apiErr } - q, err := api.queryableCreate(enableDedup, replicaLabels, math.MaxInt64, enablePartialResponse, true). + q, err := api.queryableCreate(enableDedup, replicaLabels, storeMatchers, math.MaxInt64, enablePartialResponse, true). Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &ApiError{errorExec, err} @@ -688,7 +724,12 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } - q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64) + storeMatchers, apiErr := api.parseStoreMatchersParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + + q, err := api.queryableCreate(true, nil, storeMatchers, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64) if err != nil { return nil, nil, &ApiError{errorExec, err} } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 1fcd4f1a72f..df8e8d39e04 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/gate" + "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -27,17 +28,18 @@ import ( // replicaLabels at query time. // maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds). // partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behavior of proxy. -type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable +type QueryableCreator func(deduplicate bool, replicaLabels []string, storeMatchers [][]storepb.LabelMatcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable // NewQueryableCreator creates QueryableCreator. func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator { keeper := gate.NewKeeper(reg) - return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable { + return func(deduplicate bool, replicaLabels []string, storeMatchers [][]storepb.LabelMatcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable { return &queryable{ logger: logger, reg: reg, replicaLabels: replicaLabels, + storeMatchers: storeMatchers, proxy: proxy, deduplicate: deduplicate, maxResolutionMillis: maxResolutionMillis, @@ -54,6 +56,7 @@ type queryable struct { logger log.Logger reg prometheus.Registerer replicaLabels []string + storeMatchers [][]storepb.LabelMatcher proxy storepb.StoreServer deduplicate bool maxResolutionMillis int64 @@ -66,7 +69,7 @@ type queryable struct { // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateKeeper.NewGate(q.maxConcurrentSelects), q.selectTimeout), nil + return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.storeMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateKeeper.NewGate(q.maxConcurrentSelects), q.selectTimeout), nil } type querier struct { @@ -76,6 +79,7 @@ type querier struct { cancel func() mint, maxt int64 replicaLabels map[string]struct{} + storeMatchers [][]storepb.LabelMatcher proxy storepb.StoreServer deduplicate bool maxResolutionMillis int64 @@ -93,6 +97,7 @@ func newQuerier( reg prometheus.Registerer, mint, maxt int64, replicaLabels []string, + storeMatchers [][]storepb.LabelMatcher, proxy storepb.StoreServer, deduplicate bool, maxResolutionMillis int64, @@ -120,6 +125,7 @@ func newQuerier( mint: mint, maxt: maxt, replicaLabels: rl, + storeMatchers: storeMatchers, proxy: proxy, deduplicate: deduplicate, maxResolutionMillis: maxResolutionMillis, @@ -253,6 +259,9 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . aggrs := aggrsFromFunc(hints.Func) + // TODO: Pass it using the SerieRequest instead of relying on context + ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers) + resp := &seriesServer{ctx: ctx} if err := q.proxy.Series(&storepb.SeriesRequest{ MinTime: hints.Start, diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index ff9508a41e6..9ac6196c4e3 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -44,7 +44,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) { queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2, 5*time.Second) oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond) - queryable := queryableCreator(false, nil, oneHourMillis, false, false) + queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false) q, err := queryable.Querier(context.Background(), 0, 42) testutil.Ok(t, err) @@ -72,7 +72,7 @@ func TestQuerier_DownsampledData(t *testing.T) { } timeout := 10 * time.Second - q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, 9999999, false, false) + q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false) engine := promql.NewEngine( promql.EngineOpts{ MaxSamples: math.MaxInt32, @@ -510,7 +510,7 @@ func TestQuerier_Select(t *testing.T) { {dedup: true, expected: []series{tcase.expectedAfterDedup}}, } { g := gate.New(2) - q := newQuerier(context.Background(), nil, nil, tcase.mint, tcase.maxt, tcase.replicaLabels, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout) + q := newQuerier(context.Background(), nil, nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) { @@ -680,7 +680,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 100 * time.Second g := gate.New(2) - q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, s, false, 0, true, false, g, timeout) + q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, g, timeout) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) @@ -754,7 +754,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 5 * time.Second g := gate.New(2) - q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, s, true, 0, true, false, g, timeout) + q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, g, timeout) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 67ae5a908f1..3b1835e9bef 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -31,6 +31,11 @@ import ( "google.golang.org/grpc/status" ) +type ctxKey int + +// StoreMatcherKey is the context key for the store's allow list. +const StoreMatcherKey = ctxKey(0) + // Client holds meta information about a store. type Client interface { // Client to access the store. @@ -255,8 +260,14 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe // NOTE: all matchers are validated in matchesExternalLabels method so we explicitly ignore error. var ok bool tracing.DoInSpan(gctx, "store_matches", func(ctx context.Context) { + storeMatcher := [][]storepb.LabelMatcher{} + if ctxVal := srv.Context().Value(StoreMatcherKey); ctxVal != nil { + if value, ok := ctxVal.([][]storepb.LabelMatcher); ok { + storeMatcher = value + } + } // We can skip error, we already translated matchers once. - ok, _ = storeMatches(st, r.MinTime, r.MaxTime, r.Matchers...) + ok, _ = storeMatches(st, r.MinTime, r.MaxTime, storeMatcher, r.Matchers...) }) if !ok { storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out", st)) @@ -498,14 +509,40 @@ func (s *streamSeriesSet) Err() error { // matchStore returns true if the given store may hold data for the given label // matchers. -func storeMatches(s Client, mint, maxt int64, matchers ...storepb.LabelMatcher) (bool, error) { +func storeMatches(s Client, mint, maxt int64, storeMatcher [][]storepb.LabelMatcher, matchers ...storepb.LabelMatcher) (bool, error) { storeMinTime, storeMaxTime := s.TimeRange() if mint > storeMaxTime || maxt < storeMinTime { return false, nil } + match, err := storeMatchMetadata(s, storeMatcher) + if err != nil || !match { + return match, err + } return labelSetsMatch(s.LabelSets(), matchers) } +// storeMatch return true if the store's metadata labels match the storeMatcher. +func storeMatchMetadata(s Client, storeMatcher [][]storepb.LabelMatcher) (bool, error) { + clientLabels := generateMetadataClientLabels(s) + if len(storeMatcher) == 0 { + return true, nil + } + res := false + for _, stm := range storeMatcher { + stmMatch, err := labelSetMatches(clientLabels, stm) + if err != nil { + return false, err + } + res = res || stmMatch + } + return res, nil +} + +func generateMetadataClientLabels(s Client) storepb.LabelSet { + l := storepb.Label{Name: "__address__", Value: s.Addr()} + return storepb.LabelSet{Labels: []storepb.Label{l}} +} + // labelSetsMatch returns false if all label-set do not match the matchers. func labelSetsMatch(lss []storepb.LabelSet, matchers []storepb.LabelMatcher) (bool, error) { if len(lss) == 0 { diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 50019ced37e..c4c6e46e1fd 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -40,19 +40,19 @@ type testClient struct { maxTime int64 } -func (c *testClient) LabelSets() []storepb.LabelSet { +func (c testClient) LabelSets() []storepb.LabelSet { return c.labelSets } -func (c *testClient) TimeRange() (int64, int64) { +func (c testClient) TimeRange() (int64, int64) { return c.minTime, c.maxTime } -func (c *testClient) String() string { +func (c testClient) String() string { return "test" } -func (c *testClient) Addr() string { +func (c testClient) Addr() string { return "testaddr" } @@ -1215,6 +1215,60 @@ func TestProxyStore_LabelNames(t *testing.T) { } } +func TestProxyStore_storeMatch(t *testing.T) { + storeAPIs := []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{4, 3}}, []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }, + minTime: 1, + maxTime: 300, + }, + } + req := &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + } + expectedSeries := []rawSeries{ + { + lset: []storepb.Label{{Name: "a", Value: "a"}}, + chunks: [][]sample{{{4, 3}}, {{0, 0}, {2, 1}, {3, 2}}}, // No sort merge. + }, + } + q := NewProxyStore(nil, + nil, + func() []Client { return storeAPIs }, + component.Query, + nil, + 0*time.Second, + ) + + ctx := context.Background() + s := newStoreSeriesServer(ctx) + + err := q.Series(req, s) + testutil.Ok(t, err) + seriesEquals(t, expectedSeries, s.SeriesSet) + + matcher := [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "nothisone"}}} + s = newStoreSeriesServer(context.WithValue(ctx, StoreMatcherKey, matcher)) + + err = q.Series(req, s) + testutil.Ok(t, err) + testutil.Equals(t, s.Warnings[0], "No StoreAPIs matched for this query") + + matcher = [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "testaddr"}}} + s = newStoreSeriesServer(context.WithValue(ctx, StoreMatcherKey, matcher)) + + err = q.Series(req, s) + testutil.Ok(t, err) + seriesEquals(t, expectedSeries, s.SeriesSet) + +} + type rawSeries struct { lset []storepb.Label chunks [][]sample @@ -1350,7 +1404,7 @@ func TestStoreMatches(t *testing.T) { } for i, c := range cases { - ok, err := storeMatches(c.s, c.mint, c.maxt, c.ms...) + ok, err := storeMatches(c.s, c.mint, c.maxt, nil, c.ms...) testutil.Ok(t, err) testutil.Assert(t, c.ok == ok, "test case %d failed", i) } @@ -1718,3 +1772,24 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { testutil.NotOk(t, ctx.Err()) }) } + +func TestProxyStore_allowListMatch(t *testing.T) { + c := testClient{} + + lm := storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "testaddr"} + matcher := [][]storepb.LabelMatcher{{lm}} + match, err := storeMatchMetadata(c, matcher) + testutil.Ok(t, err) + testutil.Assert(t, match) + + lm = storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "wrong"} + matcher = [][]storepb.LabelMatcher{{lm}} + match, err = storeMatchMetadata(c, matcher) + testutil.Ok(t, err) + testutil.Assert(t, !match) + + matcher = [][]storepb.LabelMatcher{{}} + match, err = storeMatchMetadata(c, matcher) + testutil.Ok(t, err) + testutil.Assert(t, match) +}