Skip to content

Commit

Permalink
Add matchers to LabelNames() ingester RPC
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
  • Loading branch information
harry671003 committed Sep 12, 2024
1 parent f74b4cd commit 2b01472
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 147 deletions.
16 changes: 6 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, t
}, matchers...)
}

func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error)) ([]string, error) {
func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelNames", opentracing.Tags{
"start": from.Unix(),
"end": to.Unix(),
Expand All @@ -1113,11 +1113,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
}

limit := getLimitFromLabelHints(hints)
req := &ingester_client.LabelNamesRequest{
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
Limit: int64(limit),
}
req, err := ingester_client.ToLabelNamesRequest(from, to, limit, matchers)
resps, err := f(ctx, replicationSet, req)
if err != nil {
return nil, err
Expand All @@ -1142,7 +1138,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
return r, nil
}

func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints) ([]string, error) {
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, hints, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelNamesStream(ctx, req)
Expand All @@ -1164,11 +1160,11 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time,

return allLabelNames, nil
})
})
}, matchers...)
}

// LabelNames returns all the label names.
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints) ([]string, error) {
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelNames(ctx, req)
Expand All @@ -1177,7 +1173,7 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint
}
return resp.LabelNames, nil
})
})
}, matchers...)
}

// MetricsForLabelMatchers gets the metrics that match said matchers
Expand Down
30 changes: 30 additions & 0 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,36 @@ func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, int,
return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, int(req.Limit), matchers, nil
}

// ToLabelNamesRequest builds a LabelNamesRequest proto
func ToLabelNamesRequest(from, to model.Time, limit int, matchers []*labels.Matcher) (*LabelNamesRequest, error) {
ms, err := toLabelMatchers(matchers)
if err != nil {
return nil, err
}

return &LabelNamesRequest{
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
Matchers: &LabelMatchers{Matchers: ms},
Limit: int64(limit),
}, nil
}

// FromLabelNamesRequest unpacks a LabelNamesRequest proto
func FromLabelNamesRequest(req *LabelNamesRequest) (int64, int64, int, []*labels.Matcher, error) {
var err error
var matchers []*labels.Matcher

if req.Matchers != nil {
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
if err != nil {
return 0, 0, 0, nil, err
}
}

return req.StartTimestampMs, req.EndTimestampMs, int(req.Limit), matchers, nil
}

func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) {
result := make([]*LabelMatcher, 0, len(matchers))
for _, matcher := range matchers {
Expand Down
244 changes: 156 additions & 88 deletions pkg/ingester/client/ingester.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/ingester/client/ingester.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ message LabelNamesRequest {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
int64 limit = 3;
LabelMatchers matchers = 4;
}

message LabelNamesResponse {
Expand Down
11 changes: 7 additions & 4 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,11 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
return nil, cleanup, err
}

startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelNamesRequest(req)
if err != nil {
return nil, cleanup, err
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, cleanup, err
Expand All @@ -1601,13 +1606,11 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
return &client.LabelNamesResponse{}, cleanup, nil
}

mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.cfg.QueryIngestersWithin)
mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryIngestersWithin)
if err != nil {
return nil, cleanup, err
}

limit := int(req.Limit)

q, err := db.Querier(mint, maxt)
if err != nil {
return nil, cleanup, err
Expand All @@ -1622,7 +1625,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
return nil, cleanup, err
}
defer c()
names, _, err := q.LabelNames(ctx, &storage.LabelHints{Limit: limit})
names, _, err := q.LabelNames(ctx, &storage.LabelHints{Limit: limit}, matchers...)
if err != nil {
return nil, cleanup, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2302,6 +2302,7 @@ func Test_Ingester_LabelValues(t *testing.T) {

tests := map[string]struct {
limit int64
match []*labels.Matcher
}{
"should return all label values if no limit is set": {
limit: 0,
Expand Down
32 changes: 18 additions & 14 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,29 @@ type Distributor interface {
QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*client.ExemplarQueryResponse, error)
LabelValuesForLabelName(ctx context.Context, from, to model.Time, label model.LabelName, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error)
LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, label model.LabelName, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error)
LabelNames(context.Context, model.Time, model.Time, *storage.LabelHints) ([]string, error)
LabelNamesStream(context.Context, model.Time, model.Time, *storage.LabelHints) ([]string, error)
LabelNames(context.Context, model.Time, model.Time, *storage.LabelHints, ...*labels.Matcher) ([]string, error)
LabelNamesStream(context.Context, model.Time, model.Time, *storage.LabelHints, ...*labels.Matcher) ([]string, error)
MetricsForLabelMatchers(ctx context.Context, from, through model.Time, hint *storage.SelectHints, matchers ...*labels.Matcher) ([]model.Metric, error)
MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, hint *storage.SelectHints, matchers ...*labels.Matcher) ([]model.Metric, error)
MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
}

func newDistributorQueryable(distributor Distributor, streamingMetdata bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration) QueryableWithFilter {
func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration) QueryableWithFilter {
return distributorQueryable{
distributor: distributor,
streamingMetdata: streamingMetdata,
iteratorFn: iteratorFn,
queryIngestersWithin: queryIngestersWithin,
distributor: distributor,
streamingMetdata: streamingMetdata,
labelNamesWithMatchers: labelNamesWithMatchers,
iteratorFn: iteratorFn,
queryIngestersWithin: queryIngestersWithin,
}
}

type distributorQueryable struct {
distributor Distributor
streamingMetdata bool
iteratorFn chunkIteratorFunc
queryIngestersWithin time.Duration
distributor Distributor
streamingMetdata bool
labelNamesWithMatchers bool
iteratorFn chunkIteratorFunc
queryIngestersWithin time.Duration
}

func (d distributorQueryable) Querier(mint, maxt int64) (storage.Querier, error) {
Expand All @@ -58,6 +60,7 @@ func (d distributorQueryable) Querier(mint, maxt int64) (storage.Querier, error)
mint: mint,
maxt: maxt,
streamingMetadata: d.streamingMetdata,
labelNamesMatchers: d.labelNamesWithMatchers,
chunkIterFn: d.iteratorFn,
queryIngestersWithin: d.queryIngestersWithin,
}, nil
Expand All @@ -72,6 +75,7 @@ type distributorQuerier struct {
distributor Distributor
mint, maxt int64
streamingMetadata bool
labelNamesMatchers bool
chunkIterFn chunkIteratorFunc
queryIngestersWithin time.Duration
}
Expand Down Expand Up @@ -180,7 +184,7 @@ func (q *distributorQuerier) LabelValues(ctx context.Context, name string, hints
}

func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
if len(matchers) > 0 {
if len(matchers) > 0 && !q.labelNamesMatchers {
return q.labelNamesWithMatchers(ctx, hints, matchers...)
}

Expand All @@ -193,9 +197,9 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe
)

if q.streamingMetadata {
ln, err = q.distributor.LabelNamesStream(ctx, model.Time(q.mint), model.Time(q.maxt), hints)
ln, err = q.distributor.LabelNamesStream(ctx, model.Time(q.mint), model.Time(q.maxt), hints, matchers...)
} else {
ln, err = q.distributor.LabelNames(ctx, model.Time(q.mint), model.Time(q.maxt), hints)
ln, err = q.distributor.LabelNames(ctx, model.Time(q.mint), model.Time(q.maxt), hints, matchers...)
}

return ln, nil, err
Expand Down
32 changes: 21 additions & 11 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]model.Metric{}, nil)

ctx := user.InjectOrgID(context.Background(), "test")
queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, nil, testData.queryIngestersWithin)
queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, testData.queryIngestersWithin)
querier, err := queryable.Querier(testData.queryMinT, testData.queryMaxT)
require.NoError(t, err)

Expand Down Expand Up @@ -128,7 +128,7 @@ func TestDistributorQueryableFilter(t *testing.T) {
t.Parallel()

d := &MockDistributor{}
dq := newDistributorQueryable(d, false, nil, 1*time.Hour)
dq := newDistributorQueryable(d, false, true, nil, 1*time.Hour)

now := time.Now()

Expand Down Expand Up @@ -172,7 +172,7 @@ func TestIngesterStreaming(t *testing.T) {
nil)

ctx := user.InjectOrgID(context.Background(), "0")
queryable := newDistributorQueryable(d, true, batch.NewChunkMergeIterator, 0)
queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0)
querier, err := queryable.Querier(mint, maxt)
require.NoError(t, err)

Expand Down Expand Up @@ -202,23 +202,33 @@ func TestDistributorQuerier_LabelNames(t *testing.T) {
someMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
labelNames := []string{"foo", "job"}

for _, streamingEnabled := range []bool{false, true} {
streamingEnabled := streamingEnabled
for _, enabled := range []bool{false, true} {
streamingEnabled := enabled
labelNamesWithMatchers := enabled
t.Run("with matchers", func(t *testing.T) {
t.Parallel()
//t.Parallel()

metrics := []model.Metric{
{"foo": "bar"},
{"job": "baz"},
{"job": "baz", "foo": "boom"},
}
d := &MockDistributor{}
d.On("MetricsForLabelMatchers", mock.Anything, model.Time(mint), model.Time(maxt), mock.Anything, someMatchers).
Return(metrics, nil)
d.On("MetricsForLabelMatchersStream", mock.Anything, model.Time(mint), model.Time(maxt), mock.Anything, someMatchers).
Return(metrics, nil)

queryable := newDistributorQueryable(d, streamingEnabled, nil, 0)
if labelNamesWithMatchers {
// ln, err = q.distributor.LabelNames(ctx, model.Time(q.mint), model.Time(q.maxt), hints, matchers...)
d.On("LabelNames", mock.Anything, model.Time(mint), model.Time(maxt), mock.Anything, someMatchers).
Return(labelNames, nil)
d.On("LabelNamesStream", mock.Anything, model.Time(mint), model.Time(maxt), mock.Anything, someMatchers).
Return(labelNames, nil)
} else {
d.On("MetricsForLabelMatchers", mock.Anything, model.Time(mint), model.Time(maxt), mock.Anything, someMatchers).
Return(metrics, nil)
d.On("MetricsForLabelMatchersStream", mock.Anything, model.Time(mint), model.Time(maxt), mock.Anything, someMatchers).
Return(metrics, nil)
}

queryable := newDistributorQueryable(d, streamingEnabled, labelNamesWithMatchers, nil, 0)
querier, err := queryable.Querier(mint, maxt)
require.NoError(t, err)

Expand Down
18 changes: 10 additions & 8 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ import (

// Config contains the configuration require to create a querier
type Config struct {
MaxConcurrent int `yaml:"max_concurrent"`
Timeout time.Duration `yaml:"timeout"`
IngesterStreaming bool `yaml:"ingester_streaming" doc:"hidden"`
IngesterMetadataStreaming bool `yaml:"ingester_metadata_streaming"`
MaxSamples int `yaml:"max_samples"`
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"`
EnablePerStepStats bool `yaml:"per_step_stats_enabled"`
MaxConcurrent int `yaml:"max_concurrent"`
Timeout time.Duration `yaml:"timeout"`
IngesterStreaming bool `yaml:"ingester_streaming" doc:"hidden"`
IngesterMetadataStreaming bool `yaml:"ingester_metadata_streaming"`
IngesterLabelNamesWithMatchers bool `yaml:"ingester_label_names_with_matchers"`
MaxSamples int `yaml:"max_samples"`
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"`
EnablePerStepStats bool `yaml:"per_step_stats_enabled"`

// QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters.
QueryStoreAfter time.Duration `yaml:"query_store_after"`
Expand Down Expand Up @@ -106,6 +107,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.")
f.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.")
f.BoolVar(&cfg.IngesterMetadataStreaming, "querier.ingester-metadata-streaming", true, "Deprecated (This feature will be always on after v1.18): Use streaming RPCs for metadata APIs from ingester.")
f.BoolVar(&cfg.IngesterLabelNamesWithMatchers, "querier.ingester-label-names-with-matchers", false, "Use LabelNames ingester RPCs with match params.")
f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.")
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.BoolVar(&cfg.EnablePerStepStats, "querier.per-step-stats-enabled", false, "Enable returning samples stats per steps in query response.")
Expand Down Expand Up @@ -156,7 +158,7 @@ func getChunksIteratorFunction(_ Config) chunkIteratorFunc {
func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, promql.QueryEngine) {
iteratorFunc := getChunksIteratorFunction(cfg)

distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, iteratorFunc, cfg.QueryIngestersWithin)
distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, cfg.QueryIngestersWithin)

ns := make([]QueryableWithFilter, len(stores))
for ix, s := range stores {
Expand Down
Loading

0 comments on commit 2b01472

Please sign in to comment.