diff --git a/CHANGELOG.md b/CHANGELOG.md index 61b058a72a..d199c2181c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,16 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan We use _breaking :warning:_ to mark changes that are not backward compatible (relates only to v0.y.z releases.) +## Unreleased + +### Added +- +### Fixed +- [#4123](https://github.com/thanos-io/thanos/pull/4123) Query: match external labels for exemplars API. +### Changed +- +### Removed + ## [v0.20.0](https://github.com/thanos-io/thanos/releases/tag/v0.20.0) - 2021.04.28 ### Added diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index a2c3550a26..afe677106c 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -424,7 +424,7 @@ func runQuery( rulesProxy = rules.NewProxy(logger, stores.GetRulesClients) targetsProxy = targets.NewProxy(logger, stores.GetTargetsClients) metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients) - exemplarsProxy = exemplars.NewProxy(logger, stores.GetExemplarsClients) + exemplarsProxy = exemplars.NewProxy(logger, stores.GetExemplarsStores, selectorLset) queryableCreator = query.NewQueryableCreator( logger, extprom.WrapRegistererWithPrefix("thanos_query_", reg), diff --git a/pkg/exemplars/exemplarspb/custom.go b/pkg/exemplars/exemplarspb/custom.go index 72ed6db047..ac17cb2157 100644 --- a/pkg/exemplars/exemplarspb/custom.go +++ b/pkg/exemplars/exemplarspb/custom.go @@ -12,6 +12,12 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" ) +// ExemplarStore wraps the ExemplarsClient and contains the info of external labels. +type ExemplarStore struct { + ExemplarsClient + LabelSets []labels.Labels +} + // UnmarshalJSON implements json.Unmarshaler. func (m *Exemplar) UnmarshalJSON(b []byte) error { v := struct { diff --git a/pkg/exemplars/proxy.go b/pkg/exemplars/proxy.go index 407f2adad9..0386aa816c 100644 --- a/pkg/exemplars/proxy.go +++ b/pkg/exemplars/proxy.go @@ -10,6 +10,8 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql/parser" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" "github.com/thanos-io/thanos/pkg/store/storepb" "golang.org/x/sync/errgroup" @@ -21,8 +23,9 @@ import ( // Proxy implements exemplarspb.Exemplars gRPC that fanouts requests to // given exemplarspb.Exemplars. type Proxy struct { - logger log.Logger - exemplars func() []exemplarspb.ExemplarsClient + logger log.Logger + exemplars func() []*exemplarspb.ExemplarStore + selectorLabels labels.Labels } // RegisterExemplarsServer register exemplars server. @@ -33,10 +36,11 @@ func RegisterExemplarsServer(exemplarsSrv exemplarspb.ExemplarsServer) func(*grp } // NewProxy return new exemplars.Proxy. -func NewProxy(logger log.Logger, exemplars func() []exemplarspb.ExemplarsClient) *Proxy { +func NewProxy(logger log.Logger, exemplars func() []*exemplarspb.ExemplarStore, selectorLabels labels.Labels) *Proxy { return &Proxy{ - logger: logger, - exemplars: exemplars, + logger: logger, + exemplars: exemplars, + selectorLabels: selectorLabels, } } @@ -48,16 +52,80 @@ type exemplarsStream struct { } func (s *Proxy) Exemplars(req *exemplarspb.ExemplarsRequest, srv exemplarspb.Exemplars_ExemplarsServer) error { + expr, err := parser.ParseExpr(req.Query) + if err != nil { + return err + } + + selectors := parser.ExtractSelectors(expr) + + newSelectors := make([][]*labels.Matcher, 0, len(selectors)) + for _, matchers := range selectors { + matched, newMatchers := matchesExternalLabels(matchers, s.selectorLabels) + if matched { + newSelectors = append(newSelectors, newMatchers) + } + } + // There is no matched selectors for this thanos query. + if len(newSelectors) == 0 { + return nil + } + var ( g, gctx = errgroup.WithContext(srv.Context()) respChan = make(chan *exemplarspb.ExemplarData, 10) exemplars []*exemplarspb.ExemplarData ) - for _, exemplarsClient := range s.exemplars() { + for _, st := range s.exemplars() { + query := "" + Matchers: + for _, matchers := range newSelectors { + metricsSelector := "" + for _, m := range matchers { + for _, ls := range st.LabelSets { + if lv := ls.Get(m.Name); lv != "" { + if !m.Matches(lv) { + continue Matchers + } else { + // If the current matcher matches one external label, + // we don't add it to the current metric selector + // as Prometheus' Exemplars API cannot handle external labels. + continue + } + } + if metricsSelector == "" { + metricsSelector += m.String() + } else { + metricsSelector += ", " + m.String() + } + } + } + // Construct the query by concatenating metric selectors with '+'. + // We cannot preserve the original query info, but the returned + // results are the same. + if query == "" { + query += "{" + metricsSelector + "}" + } else { + query += " + {" + metricsSelector + "}" + } + } + + // No matchers match this store. + if query == "" { + continue + } + r := &exemplarspb.ExemplarsRequest{ + Start: req.Start, + End: req.End, + Query: query, + PartialResponseStrategy: req.PartialResponseStrategy, + } + es := &exemplarsStream{ - client: exemplarsClient, - request: req, + + client: st.ExemplarsClient, + request: r, channel: respChan, server: srv, } @@ -137,3 +205,30 @@ func (stream *exemplarsStream) receive(ctx context.Context) error { } } } + +// matchesExternalLabels returns false if given matchers are not matching external labels. +// If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels. +func matchesExternalLabels(ms []*labels.Matcher, externalLabels labels.Labels) (bool, []*labels.Matcher) { + if len(externalLabels) == 0 { + return true, ms + } + + var newMatchers []*labels.Matcher + for i, tm := range ms { + // Validate all matchers. + extValue := externalLabels.Get(tm.Name) + if extValue == "" { + // Agnostic to external labels. + ms = append(ms[:i], ms[i:]...) + newMatchers = append(newMatchers, tm) + continue + } + + if !tm.Matches(extValue) { + // External label does not match. This should not happen - it should be filtered out on query node, + // but let's do that anyway here. + return false, nil + } + } + return true, newMatchers +} diff --git a/pkg/exemplars/proxy_test.go b/pkg/exemplars/proxy_test.go new file mode 100644 index 0000000000..ba73ec8907 --- /dev/null +++ b/pkg/exemplars/proxy_test.go @@ -0,0 +1,288 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package exemplars + +import ( + "context" + "io" + "os" + "reflect" + "testing" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/testutil" + "google.golang.org/grpc" +) + +type testExemplarClient struct { + grpc.ClientStream + exemplarErr, recvErr error + response *exemplarspb.ExemplarsResponse + sentResponse bool +} + +func (t *testExemplarClient) String() string { + return "test" +} + +func (t *testExemplarClient) Recv() (*exemplarspb.ExemplarsResponse, error) { + // A simulation of underlying grpc Recv behavior as per https://github.com/grpc/grpc-go/blob/7f2581f910fc21497091c4109b56d310276fc943/stream.go#L117-L125. + if t.recvErr != nil { + return nil, t.recvErr + } + + if t.sentResponse { + return nil, io.EOF + } + t.sentResponse = true + + return t.response, nil +} + +func (t *testExemplarClient) Exemplars(ctx context.Context, in *exemplarspb.ExemplarsRequest, opts ...grpc.CallOption) (exemplarspb.Exemplars_ExemplarsClient, error) { + return t, t.exemplarErr +} + +var _ exemplarspb.ExemplarsClient = &testExemplarClient{} + +type testExemplarServer struct { + grpc.ServerStream + sendErr error + responses []*exemplarspb.ExemplarsResponse +} + +func (t *testExemplarServer) String() string { + return "test" +} + +func (t *testExemplarServer) Send(response *exemplarspb.ExemplarsResponse) error { + if t.sendErr != nil { + return t.sendErr + } + t.responses = append(t.responses, response) + return nil +} + +func (t *testExemplarServer) Context() context.Context { + return context.Background() +} + +func TestProxy(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stderr) + + for _, tc := range []struct { + name string + request *exemplarspb.ExemplarsRequest + clients []*exemplarspb.ExemplarStore + server *testExemplarServer + selectorLabels labels.Labels + wantResponses []*exemplarspb.ExemplarsResponse + wantError error + }{ + { + name: "proxy success", + request: &exemplarspb.ExemplarsRequest{ + Query: "http_request_duration_bucket", + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + clients: []*exemplarspb.ExemplarStore{ + { + ExemplarsClient: &testExemplarClient{ + response: exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{ + SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"__name__": "http_request_duration_bucket"}))}, + Exemplars: []*exemplarspb.Exemplar{{Value: 1}}, + }), + }, + LabelSets: []labels.Labels{labels.FromMap(map[string]string{"cluster": "A"})}, + }, + }, + server: &testExemplarServer{}, + wantResponses: []*exemplarspb.ExemplarsResponse{ + exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{ + SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"__name__": "http_request_duration_bucket"}))}, + Exemplars: []*exemplarspb.Exemplar{{Value: 1}}, + }), + }, + }, + { + name: "warning proxy success", + request: &exemplarspb.ExemplarsRequest{ + Query: "http_request_duration_bucket", + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + clients: []*exemplarspb.ExemplarStore{ + { + ExemplarsClient: &testExemplarClient{ + response: exemplarspb.NewWarningExemplarsResponse(errors.New("warning from client")), + }, + LabelSets: []labels.Labels{labels.FromMap(map[string]string{"cluster": "A"})}, + }, + }, + server: &testExemplarServer{}, + wantResponses: []*exemplarspb.ExemplarsResponse{ + exemplarspb.NewWarningExemplarsResponse(errors.New("warning from client")), + }, + }, + { + // The input query external label doesn't match with the current querier, return null. + name: "external label doesn't match selector labels", + request: &exemplarspb.ExemplarsRequest{ + Query: `http_request_duration_bucket{query="foo"}`, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + clients: []*exemplarspb.ExemplarStore{ + { + ExemplarsClient: &testExemplarClient{ + response: exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{ + SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"__name__": "http_request_duration_bucket"}))}, + Exemplars: []*exemplarspb.Exemplar{{Value: 1}}, + }), + }, + LabelSets: []labels.Labels{labels.FromMap(map[string]string{"cluster": "A"})}, + }, + }, + selectorLabels: labels.FromMap(map[string]string{"query": "bar"}), + server: &testExemplarServer{}, + wantResponses: nil, + }, + { + // The input query external label matches with the current querier. + name: "external label matches selector labels", + request: &exemplarspb.ExemplarsRequest{ + Query: `http_request_duration_bucket{query="foo"}`, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + clients: []*exemplarspb.ExemplarStore{ + { + ExemplarsClient: &testExemplarClient{ + response: exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{ + SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"__name__": "http_request_duration_bucket"}))}, + Exemplars: []*exemplarspb.Exemplar{{Value: 1}}, + }), + }, + LabelSets: []labels.Labels{labels.FromMap(map[string]string{"cluster": "A"})}, + }, + }, + selectorLabels: labels.FromMap(map[string]string{"query": "foo"}), + server: &testExemplarServer{}, + wantResponses: []*exemplarspb.ExemplarsResponse{ + exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{ + SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"__name__": "http_request_duration_bucket"}))}, + Exemplars: []*exemplarspb.Exemplar{{Value: 1}}, + }), + }, + }, + { + name: "external label selects stores", + request: &exemplarspb.ExemplarsRequest{ + Query: `http_request_duration_bucket{cluster="A"}`, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + clients: []*exemplarspb.ExemplarStore{ + { + ExemplarsClient: &testExemplarClient{ + response: exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{ + SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"foo": "bar"}))}, + Exemplars: []*exemplarspb.Exemplar{{Value: 1}}, + }), + }, + LabelSets: []labels.Labels{labels.FromMap(map[string]string{"cluster": "A"})}, + }, + { + ExemplarsClient: &testExemplarClient{ + response: exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{ + SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"foo": "baz"}))}, + Exemplars: []*exemplarspb.Exemplar{{Value: 2}}, + }), + }, + LabelSets: []labels.Labels{labels.FromMap(map[string]string{"cluster": "B"})}, + }, + }, + server: &testExemplarServer{}, + wantResponses: []*exemplarspb.ExemplarsResponse{ + exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{ + SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"foo": "bar"}))}, + Exemplars: []*exemplarspb.Exemplar{{Value: 1}}, + }), + }, + }, + { + name: "external label matches different stores", + request: &exemplarspb.ExemplarsRequest{ + Query: `http_request_duration_bucket{cluster="A"} + http_request_duration_bucket{cluster="B"}`, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + clients: []*exemplarspb.ExemplarStore{ + { + ExemplarsClient: &testExemplarClient{ + response: exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{ + SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"foo": "bar"}))}, + Exemplars: []*exemplarspb.Exemplar{{Value: 1}}, + }), + }, + LabelSets: []labels.Labels{labels.FromMap(map[string]string{"cluster": "A"})}, + }, + { + ExemplarsClient: &testExemplarClient{ + response: exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{ + SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"foo": "baz"}))}, + Exemplars: []*exemplarspb.Exemplar{{Value: 2}}, + }), + }, + LabelSets: []labels.Labels{labels.FromMap(map[string]string{"cluster": "B"})}, + }, + }, + server: &testExemplarServer{}, + wantResponses: []*exemplarspb.ExemplarsResponse{ + exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{ + SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"foo": "bar"}))}, + Exemplars: []*exemplarspb.Exemplar{{Value: 1}}, + }), + exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{ + SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"foo": "baz"}))}, + Exemplars: []*exemplarspb.Exemplar{{Value: 2}}, + }), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + p := NewProxy(logger, func() []*exemplarspb.ExemplarStore { + return tc.clients + }, tc.selectorLabels) + + err := p.Exemplars(tc.request, tc.server) + gotErr := "" + if err != nil { + gotErr = err.Error() + } + wantErr := "" + if tc.wantError != nil { + wantErr = tc.wantError.Error() + } + + if gotErr != wantErr { + t.Errorf("want error %q, got %q", wantErr, gotErr) + } + + testutil.Equals(t, len(tc.wantResponses), len(tc.server.responses)) + + // Actual responses are unordered so we search + // for matched response for simplicity. + Outer: + for _, exp := range tc.wantResponses { + for _, res := range tc.server.responses { + if reflect.DeepEqual(exp, res) { + continue Outer + } + } + t.Errorf("miss expected response %v", exp) + } + }) + } +} diff --git a/pkg/query/storeset.go b/pkg/query/storeset.go index cbe2fa52ec..58a4d2d16e 100644 --- a/pkg/query/storeset.go +++ b/pkg/query/storeset.go @@ -698,18 +698,21 @@ func (s *StoreSet) GetMetadataClients() []metadatapb.MetadataClient { return metadataClients } -// GetExemplarsClients returns a list of all active exemplars clients. -func (s *StoreSet) GetExemplarsClients() []exemplarspb.ExemplarsClient { +// GetExemplarsStores returns a list of all active exemplars stores. +func (s *StoreSet) GetExemplarsStores() []*exemplarspb.ExemplarStore { s.storesMtx.RLock() defer s.storesMtx.RUnlock() - exemplars := make([]exemplarspb.ExemplarsClient, 0, len(s.stores)) + exemplarStores := make([]*exemplarspb.ExemplarStore, 0, len(s.stores)) for _, st := range s.stores { if st.HasExemplarsAPI() { - exemplars = append(exemplars, st.exemplar) + exemplarStores = append(exemplarStores, &exemplarspb.ExemplarStore{ + ExemplarsClient: st.exemplar, + LabelSets: st.labelSets, + }) } } - return exemplars + return exemplarStores } func (s *StoreSet) Close() {