Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitions ring support to more Distributor querying functions #7393

Merged
merged 4 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 51 additions & 20 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,8 @@ func (d *Distributor) sendToStorage(ctx context.Context, userID string, partitio
}

// forReplicationSet runs f, in parallel, for all ingesters in the input replication set.
//
// Deprecated: use forReplicationSets() instead.
func forReplicationSet[T any](ctx context.Context, d *Distributor, replicationSet ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (T, error)) ([]T, error) {
wrappedF := func(ctx context.Context, ing *ring.InstanceDesc) (T, error) {
client, err := d.ingesterPool.GetClientForInstance(*ing)
Expand All @@ -1505,6 +1507,29 @@ func forReplicationSet[T any](ctx context.Context, d *Distributor, replicationSe
return ring.DoUntilQuorum(ctx, replicationSet, d.queryQuorumConfig(ctx, replicationSet), wrappedF, cleanup)
}

// forReplicationSets runs f, in parallel, for all ingesters in the input replication sets.
func forReplicationSets[R any](ctx context.Context, d *Distributor, replicationSets []ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (R, error)) ([]R, error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: this function contains some duplicate code from forReplicationSet(), but forReplicationSet() will disappear in few PRs.

wrappedF := func(ctx context.Context, ingester *ring.InstanceDesc) (R, error) {
client, err := d.ingesterPool.GetClientForInstance(*ingester)
if err != nil {
var empty R
return empty, err
}

return f(ctx, client.(ingester_client.IngesterClient))
}

cleanup := func(_ R) {
// Nothing to do.
}

quorumConfig := d.queryQuorumConfigForReplicationSets(ctx, replicationSets)

return concurrency.ForEachJobMergeResults[ring.ReplicationSet, R](ctx, replicationSets, 0, func(ctx context.Context, set ring.ReplicationSet) ([]R, error) {
return ring.DoUntilQuorum(ctx, set, quorumConfig, wrappedF, cleanup)
})
}

// queryQuorumConfig returns the config to use with "do until quorum" functions when running queries.
//
// Deprecated: use queryQuorumConfigForReplicationSets() instead.
Expand Down Expand Up @@ -1581,10 +1606,10 @@ func queryIngesterPartitionsRingZoneSorter(preferredZone string) ring.ZoneSorter
}
}

// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
// LabelValuesForLabelName returns the label values associated with the given labelName, among all series with samples
// timestamp between from and to, and series labels matching the optional matchers.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx)
if err != nil {
return nil, err
}
Expand All @@ -1594,7 +1619,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
return nil, err
}

resps, err := forReplicationSet(ctx, d, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.LabelValuesResponse, error) {
resps, err := forReplicationSets(ctx, d, replicationSets, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.LabelValuesResponse, error) {
return client.LabelValues(ctx, req)
})
if err != nil {
Expand All @@ -1619,10 +1644,13 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
return values, nil
}

// LabelNamesAndValues query ingesters for label names and values and returns labels with distinct list of values.
// LabelNamesAndValues returns the label name and value pairs for series matching the input label matchers.
//
// The actual series considered eligible depend on countMethod:
// - inmemory: in-memory series in ingesters.
// - active: in-memory series in ingesters which are also tracked as active ones.
func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*labels.Matcher, countMethod cardinality.CountMethod) (*ingester_client.LabelNamesAndValuesResponse, error) {
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx)
if err != nil {
return nil, err
}
Expand All @@ -1631,13 +1659,16 @@ func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*label
if err != nil {
return nil, err
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

sizeLimitBytes := d.limits.LabelNamesAndValuesResultsMaxSizeBytes(userID)
merger := &labelNamesAndValuesResponseMerger{result: map[string]map[string]struct{}{}, sizeLimitBytes: sizeLimitBytes}
_, err = forReplicationSet(ctx, d, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (ingester_client.Ingester_LabelNamesAndValuesClient, error) {

_, err = forReplicationSets(ctx, d, replicationSets, func(ctx context.Context, client ingester_client.IngesterClient) (ingester_client.Ingester_LabelNamesAndValuesClient, error) {
stream, err := client.LabelNamesAndValues(ctx, req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -2000,6 +2031,7 @@ func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Match
return ignored{}, nil
}

//nolint:staticcheck
_, err = forReplicationSet(ctx, d, replicationSet, ingesterQuery)
if err != nil {
return nil, err
Expand Down Expand Up @@ -2133,10 +2165,10 @@ func approximateFromZones[T ~float64 | ~uint64](zoneCount int, replicationFactor
return sum / T(replicationFactor)
}

// LabelNames returns all of the label names.
// LabelNames returns the names of all labels from series with samples timestamp between from and to, and matching
// the input optional series label matchers. The returned label names are sorted.
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]string, error) {
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx)
if err != nil {
return nil, err
}
Expand All @@ -2146,7 +2178,7 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, match
return nil, err
}

resps, err := forReplicationSet(ctx, d, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.LabelNamesResponse, error) {
resps, err := forReplicationSets(ctx, d, replicationSets, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.LabelNamesResponse, error) {
return client.LabelNames(ctx, req)
})
if err != nil {
Expand All @@ -2170,10 +2202,10 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, match
return values, nil
}

// MetricsForLabelMatchers gets the metrics that match said matchers
// MetricsForLabelMatchers returns a list of series with samples timestamps between from and through, and series labels
// matching the optional label matchers. The returned series are not sorted.
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx)
if err != nil {
return nil, err
}
Expand All @@ -2183,7 +2215,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
return nil, err
}

resps, err := forReplicationSet(ctx, d, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.MetricsForLabelMatchersResponse, error) {
resps, err := forReplicationSets(ctx, d, replicationSets, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.MetricsForLabelMatchersResponse, error) {
return client.MetricsForLabelMatchers(ctx, req)
})
if err != nil {
Expand All @@ -2210,15 +2242,14 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
return result, nil
}

// MetricsMetadata returns all metric metadata of a user.
// MetricsMetadata returns the metrics metadata based on the provided req.
func (d *Distributor) MetricsMetadata(ctx context.Context, req *ingester_client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error) {
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx)
if err != nil {
return nil, err
}

resps, err := forReplicationSet(ctx, d, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.MetricsMetadataResponse, error) {
resps, err := forReplicationSets(ctx, d, replicationSets, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.MetricsMetadataResponse, error) {
return client.MetricsMetadata(ctx, req)
})
if err != nil {
Expand Down
Loading
Loading