Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

match external labels in exemplars API #4123

Merged
merged 4 commits into from
Apr 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan

We use _breaking :warning:_ to mark changes that are not backward compatible (relates only to v0.y.z releases.)

## Unreleased

### Added
-
### Fixed
- [#4123](https://github.com/thanos-io/thanos/pull/4123) Query: match external labels for exemplars API.
### Changed
-
### Removed

## [v0.20.0](https://github.com/thanos-io/thanos/releases/tag/v0.20.0) - 2021.04.28

### Added
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func runQuery(
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
targetsProxy = targets.NewProxy(logger, stores.GetTargetsClients)
metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients)
exemplarsProxy = exemplars.NewProxy(logger, stores.GetExemplarsClients)
exemplarsProxy = exemplars.NewProxy(logger, stores.GetExemplarsStores, selectorLset)
queryableCreator = query.NewQueryableCreator(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
Expand Down
6 changes: 6 additions & 0 deletions pkg/exemplars/exemplarspb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
)

// ExemplarStore wraps the ExemplarsClient and contains the info of external labels.
type ExemplarStore struct {
ExemplarsClient
LabelSets []labels.Labels
}

// UnmarshalJSON implements json.Unmarshaler.
func (m *Exemplar) UnmarshalJSON(b []byte) error {
v := struct {
Expand Down
111 changes: 103 additions & 8 deletions pkg/exemplars/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"golang.org/x/sync/errgroup"
Expand All @@ -21,8 +23,9 @@ import (
// Proxy implements exemplarspb.Exemplars gRPC that fanouts requests to
// given exemplarspb.Exemplars.
type Proxy struct {
logger log.Logger
exemplars func() []exemplarspb.ExemplarsClient
logger log.Logger
exemplars func() []*exemplarspb.ExemplarStore
selectorLabels labels.Labels
}

// RegisterExemplarsServer register exemplars server.
Expand All @@ -33,10 +36,11 @@ func RegisterExemplarsServer(exemplarsSrv exemplarspb.ExemplarsServer) func(*grp
}

// NewProxy return new exemplars.Proxy.
func NewProxy(logger log.Logger, exemplars func() []exemplarspb.ExemplarsClient) *Proxy {
func NewProxy(logger log.Logger, exemplars func() []*exemplarspb.ExemplarStore, selectorLabels labels.Labels) *Proxy {
return &Proxy{
logger: logger,
exemplars: exemplars,
logger: logger,
exemplars: exemplars,
selectorLabels: selectorLabels,
}
}

Expand All @@ -48,16 +52,80 @@ type exemplarsStream struct {
}

func (s *Proxy) Exemplars(req *exemplarspb.ExemplarsRequest, srv exemplarspb.Exemplars_ExemplarsServer) error {
expr, err := parser.ParseExpr(req.Query)
if err != nil {
return err
}

selectors := parser.ExtractSelectors(expr)

newSelectors := make([][]*labels.Matcher, 0, len(selectors))
for _, matchers := range selectors {
matched, newMatchers := matchesExternalLabels(matchers, s.selectorLabels)
if matched {
newSelectors = append(newSelectors, newMatchers)
}
}
// There is no matched selectors for this thanos query.
if len(newSelectors) == 0 {
return nil
}

var (
g, gctx = errgroup.WithContext(srv.Context())
respChan = make(chan *exemplarspb.ExemplarData, 10)
exemplars []*exemplarspb.ExemplarData
)

for _, exemplarsClient := range s.exemplars() {
for _, st := range s.exemplars() {
query := ""
Matchers:
for _, matchers := range newSelectors {
metricsSelector := ""
for _, m := range matchers {
for _, ls := range st.LabelSets {
if lv := ls.Get(m.Name); lv != "" {
if !m.Matches(lv) {
continue Matchers
} else {
// If the current matcher matches one external label,
// we don't add it to the current metric selector
// as Prometheus' Exemplars API cannot handle external labels.
continue
}
}
if metricsSelector == "" {
metricsSelector += m.String()
} else {
metricsSelector += ", " + m.String()
}
}
}
// Construct the query by concatenating metric selectors with '+'.
// We cannot preserve the original query info, but the returned
// results are the same.
if query == "" {
query += "{" + metricsSelector + "}"
} else {
query += " + {" + metricsSelector + "}"
}
}

// No matchers match this store.
if query == "" {
continue
}
r := &exemplarspb.ExemplarsRequest{
Start: req.Start,
End: req.End,
Query: query,
PartialResponseStrategy: req.PartialResponseStrategy,
}

es := &exemplarsStream{
client: exemplarsClient,
request: req,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

nit

client: st.ExemplarsClient,
request: r,
channel: respChan,
server: srv,
}
Expand Down Expand Up @@ -137,3 +205,30 @@ func (stream *exemplarsStream) receive(ctx context.Context) error {
}
}
}

// matchesExternalLabels returns false if given matchers are not matching external labels.
// If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels.
func matchesExternalLabels(ms []*labels.Matcher, externalLabels labels.Labels) (bool, []*labels.Matcher) {
if len(externalLabels) == 0 {
return true, ms
}

var newMatchers []*labels.Matcher
for i, tm := range ms {
// Validate all matchers.
extValue := externalLabels.Get(tm.Name)
if extValue == "" {
// Agnostic to external labels.
ms = append(ms[:i], ms[i:]...)
newMatchers = append(newMatchers, tm)
continue
}

if !tm.Matches(extValue) {
// External label does not match. This should not happen - it should be filtered out on query node,
// but let's do that anyway here.
return false, nil
}
}
return true, newMatchers
}
Loading