Skip to content

Commit

Permalink
match external labels in exemplars API (thanos-io#4123)
Browse files Browse the repository at this point in the history
* match external labels in exemplars API

Signed-off-by: yeya24 <yb532204897@gmail.com>

* update changelog

Signed-off-by: yeya24 <yb532204897@gmail.com>

* fix lint

Signed-off-by: yeya24 <yb532204897@gmail.com>

* fix flaky tests

Signed-off-by: yeya24 <yb532204897@gmail.com>

* Fix changelog

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
yeya24 authored and kakkoyun committed Apr 30, 2021
1 parent 116c4bd commit 98bc5b1
Show file tree
Hide file tree
Showing 6 changed files with 416 additions and 14 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
### Removed
-

## [v0.20.1](https://github.com/thanos-io/thanos/releases/tag/v0.20.1) - 2021.04.30

### Fixed

- [#4123](https://github.com/thanos-io/thanos/pull/4123) Query: match external labels for exemplars API.

### Changed
-
### Removed
-
## [v0.20.0](https://github.com/thanos-io/thanos/releases/tag/v0.20.0) - 2021.04.28

### Added
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func runQuery(
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
targetsProxy = targets.NewProxy(logger, stores.GetTargetsClients)
metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients)
exemplarsProxy = exemplars.NewProxy(logger, stores.GetExemplarsClients)
exemplarsProxy = exemplars.NewProxy(logger, stores.GetExemplarsStores, selectorLset)
queryableCreator = query.NewQueryableCreator(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
Expand Down
6 changes: 6 additions & 0 deletions pkg/exemplars/exemplarspb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
)

// ExemplarStore wraps the ExemplarsClient and contains the info of external labels.
type ExemplarStore struct {
ExemplarsClient
LabelSets []labels.Labels
}

// UnmarshalJSON implements json.Unmarshaler.
func (m *Exemplar) UnmarshalJSON(b []byte) error {
v := struct {
Expand Down
111 changes: 103 additions & 8 deletions pkg/exemplars/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"golang.org/x/sync/errgroup"
Expand All @@ -21,8 +23,9 @@ import (
// Proxy implements exemplarspb.Exemplars gRPC that fanouts requests to
// given exemplarspb.Exemplars.
type Proxy struct {
logger log.Logger
exemplars func() []exemplarspb.ExemplarsClient
logger log.Logger
exemplars func() []*exemplarspb.ExemplarStore
selectorLabels labels.Labels
}

// RegisterExemplarsServer register exemplars server.
Expand All @@ -33,10 +36,11 @@ func RegisterExemplarsServer(exemplarsSrv exemplarspb.ExemplarsServer) func(*grp
}

// NewProxy return new exemplars.Proxy.
func NewProxy(logger log.Logger, exemplars func() []exemplarspb.ExemplarsClient) *Proxy {
func NewProxy(logger log.Logger, exemplars func() []*exemplarspb.ExemplarStore, selectorLabels labels.Labels) *Proxy {
return &Proxy{
logger: logger,
exemplars: exemplars,
logger: logger,
exemplars: exemplars,
selectorLabels: selectorLabels,
}
}

Expand All @@ -48,16 +52,80 @@ type exemplarsStream struct {
}

func (s *Proxy) Exemplars(req *exemplarspb.ExemplarsRequest, srv exemplarspb.Exemplars_ExemplarsServer) error {
expr, err := parser.ParseExpr(req.Query)
if err != nil {
return err
}

selectors := parser.ExtractSelectors(expr)

newSelectors := make([][]*labels.Matcher, 0, len(selectors))
for _, matchers := range selectors {
matched, newMatchers := matchesExternalLabels(matchers, s.selectorLabels)
if matched {
newSelectors = append(newSelectors, newMatchers)
}
}
// There is no matched selectors for this thanos query.
if len(newSelectors) == 0 {
return nil
}

var (
g, gctx = errgroup.WithContext(srv.Context())
respChan = make(chan *exemplarspb.ExemplarData, 10)
exemplars []*exemplarspb.ExemplarData
)

for _, exemplarsClient := range s.exemplars() {
for _, st := range s.exemplars() {
query := ""
Matchers:
for _, matchers := range newSelectors {
metricsSelector := ""
for _, m := range matchers {
for _, ls := range st.LabelSets {
if lv := ls.Get(m.Name); lv != "" {
if !m.Matches(lv) {
continue Matchers
} else {
// If the current matcher matches one external label,
// we don't add it to the current metric selector
// as Prometheus' Exemplars API cannot handle external labels.
continue
}
}
if metricsSelector == "" {
metricsSelector += m.String()
} else {
metricsSelector += ", " + m.String()
}
}
}
// Construct the query by concatenating metric selectors with '+'.
// We cannot preserve the original query info, but the returned
// results are the same.
if query == "" {
query += "{" + metricsSelector + "}"
} else {
query += " + {" + metricsSelector + "}"
}
}

// No matchers match this store.
if query == "" {
continue
}
r := &exemplarspb.ExemplarsRequest{
Start: req.Start,
End: req.End,
Query: query,
PartialResponseStrategy: req.PartialResponseStrategy,
}

es := &exemplarsStream{
client: exemplarsClient,
request: req,

client: st.ExemplarsClient,
request: r,
channel: respChan,
server: srv,
}
Expand Down Expand Up @@ -137,3 +205,30 @@ func (stream *exemplarsStream) receive(ctx context.Context) error {
}
}
}

// matchesExternalLabels returns false if given matchers are not matching external labels.
// If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels.
func matchesExternalLabels(ms []*labels.Matcher, externalLabels labels.Labels) (bool, []*labels.Matcher) {
if len(externalLabels) == 0 {
return true, ms
}

var newMatchers []*labels.Matcher
for i, tm := range ms {
// Validate all matchers.
extValue := externalLabels.Get(tm.Name)
if extValue == "" {
// Agnostic to external labels.
ms = append(ms[:i], ms[i:]...)
newMatchers = append(newMatchers, tm)
continue
}

if !tm.Matches(extValue) {
// External label does not match. This should not happen - it should be filtered out on query node,
// but let's do that anyway here.
return false, nil
}
}
return true, newMatchers
}
Loading

0 comments on commit 98bc5b1

Please sign in to comment.