Skip to content

Commit

Permalink
StoreGateway: Implement metadata API limit in queryable
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
  • Loading branch information
harry671003 committed Sep 5, 2024
1 parent 8622767 commit 437cfe5
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 19 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
* [ENHANCEMENT] Ingester/Ring: New `READONLY` status on ring to be used by Ingester. New ingester API to change mode of ingester #6163
* [ENHANCEMENT] Ruler: Add query statistics metrics when --ruler.query-stats-enabled=true. #6173
* [ENHANCEMENT] Ingester: Add new API `/ingester/all_user_stats` which shows loaded blocks, active timeseries and ingestion rate for a specific ingester. #6178
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to to track the number of histogram samples which resolution was reduced. #6182
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to track the number of histogram samples which resolution was reduced. #6182
* [ENHANCEMENT] StoreGateway: Implement metadata API limit in queryable. #6195

## 1.18.0 2024-09-03

Expand Down
43 changes: 30 additions & 13 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,11 @@ func (q *blocksStoreQuerier) LabelNames(ctx context.Context, hints *storage.Labe
spanLog, spanCtx := spanlogger.New(ctx, "blocksStoreQuerier.LabelNames")
defer spanLog.Span.Finish()

minT, maxT := q.minT, q.maxT
minT, maxT, limit := q.minT, q.maxT, int64(0)

if hints != nil {
limit = int64(hints.Limit)
}

var (
resMtx sync.Mutex
Expand All @@ -355,7 +359,7 @@ func (q *blocksStoreQuerier) LabelNames(ctx context.Context, hints *storage.Labe
)

queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) {
nameSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelNamesFromStore(spanCtx, userID, clients, minT, maxT, convertedMatchers)
nameSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelNamesFromStore(spanCtx, userID, clients, minT, maxT, limit, convertedMatchers)
if err != nil {
return nil, err, retryableError
}
Expand All @@ -372,6 +376,7 @@ func (q *blocksStoreQuerier) LabelNames(ctx context.Context, hints *storage.Labe
return nil, nil, err
}

// TODO(johrry): pass limit when merging.
return strutil.MergeSlices(resNameSets...), resWarnings, nil
}

Expand All @@ -384,7 +389,11 @@ func (q *blocksStoreQuerier) LabelValues(ctx context.Context, name string, hints
spanLog, spanCtx := spanlogger.New(ctx, "blocksStoreQuerier.LabelValues")
defer spanLog.Span.Finish()

minT, maxT := q.minT, q.maxT
minT, maxT, limit := q.minT, q.maxT, int64(0)

if hints != nil {
limit = int64(hints.Limit)
}

var (
resValueSets = [][]string{}
Expand All @@ -394,7 +403,7 @@ func (q *blocksStoreQuerier) LabelValues(ctx context.Context, name string, hints
)

queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) {
valueSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelValuesFromStore(spanCtx, userID, name, clients, minT, maxT, matchers...)
valueSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelValuesFromStore(spanCtx, userID, name, clients, minT, maxT, limit, matchers...)
if err != nil {
return nil, err, retryableError
}
Expand All @@ -411,6 +420,7 @@ func (q *blocksStoreQuerier) LabelValues(ctx context.Context, name string, hints
return nil, nil, err
}

// TODO(johrry): pass limit when merging.
return strutil.MergeSlices(resValueSets...), resWarnings, nil
}

Expand All @@ -427,9 +437,9 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec
spanLog, spanCtx := spanlogger.New(ctx, "blocksStoreQuerier.selectSorted")
defer spanLog.Span.Finish()

minT, maxT := q.minT, q.maxT
minT, maxT, limit := q.minT, q.maxT, int64(0)
if sp != nil {
minT, maxT = sp.Start, sp.End
minT, maxT, limit = sp.Start, sp.End, int64(sp.Limit)
}

var (
Expand All @@ -443,7 +453,7 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec
)

queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) {
seriesSets, queriedBlocks, warnings, numChunks, err, retryableError := q.fetchSeriesFromStores(spanCtx, sp, userID, clients, minT, maxT, matchers, maxChunksLimit, leftChunksLimit)
seriesSets, queriedBlocks, warnings, numChunks, err, retryableError := q.fetchSeriesFromStores(spanCtx, sp, userID, clients, minT, maxT, limit, matchers, maxChunksLimit, leftChunksLimit)
if err != nil {
return nil, err, retryableError
}
Expand Down Expand Up @@ -471,6 +481,7 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec
storage.EmptySeriesSet()
}

// TODO(johrry): pass limit when merging.
return series.NewSeriesSetWithWarnings(
storage.NewMergeSeriesSet(resSeriesSets, storage.ChainedSeriesMerge),
resWarnings)
Expand Down Expand Up @@ -593,6 +604,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
clients map[BlocksStoreClient][]ulid.ULID,
minT int64,
maxT int64,
limit int64,
matchers []*labels.Matcher,
maxChunksLimit int,
leftChunksLimit int,
Expand Down Expand Up @@ -635,7 +647,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
seriesQueryStats := &hintspb.QueryStats{}
skipChunks := sp != nil && sp.Func == "series"

req, err := createSeriesRequest(minT, maxT, convertedMatchers, shardingInfo, skipChunks, blockIDs, defaultAggrs)
req, err := createSeriesRequest(minT, maxT, limit, convertedMatchers, shardingInfo, skipChunks, blockIDs, defaultAggrs)
if err != nil {
return errors.Wrapf(err, "failed to create series request")
}
Expand Down Expand Up @@ -825,6 +837,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
clients map[BlocksStoreClient][]ulid.ULID,
minT int64,
maxT int64,
limit int64,
matchers []storepb.LabelMatcher,
) ([][]string, annotations.Annotations, []ulid.ULID, error, error) {
var (
Expand All @@ -846,7 +859,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
blockIDs := blockIDs

g.Go(func() error {
req, err := createLabelNamesRequest(minT, maxT, blockIDs, matchers)
req, err := createLabelNamesRequest(minT, maxT, limit, blockIDs, matchers)
if err != nil {
return errors.Wrapf(err, "failed to create label names request")
}
Expand Down Expand Up @@ -927,6 +940,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
clients map[BlocksStoreClient][]ulid.ULID,
minT int64,
maxT int64,
limit int64,
matchers ...*labels.Matcher,
) ([][]string, annotations.Annotations, []ulid.ULID, error, error) {
var (
Expand All @@ -948,7 +962,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
blockIDs := blockIDs

g.Go(func() error {
req, err := createLabelValuesRequest(minT, maxT, name, blockIDs, matchers...)
req, err := createLabelValuesRequest(minT, maxT, limit, name, blockIDs, matchers...)
if err != nil {
return errors.Wrapf(err, "failed to create label values request")
}
Expand Down Expand Up @@ -1025,7 +1039,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
return valueSets, warnings, queriedBlocks, nil, merr.Err()
}

func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID, aggrs []storepb.Aggr) (*storepb.SeriesRequest, error) {
func createSeriesRequest(minT, maxT, limit int64, matchers []storepb.LabelMatcher, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID, aggrs []storepb.Aggr) (*storepb.SeriesRequest, error) {
// Selectively query only specific blocks.
hints := &hintspb.SeriesRequestHints{
BlockMatchers: []storepb.LabelMatcher{
Expand All @@ -1046,6 +1060,7 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, shar
return &storepb.SeriesRequest{
MinTime: minT,
MaxTime: maxT,
Limit: limit,
Matchers: matchers,
PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT,
Hints: anyHints,
Expand All @@ -1057,10 +1072,11 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, shar
}, nil
}

func createLabelNamesRequest(minT, maxT int64, blockIDs []ulid.ULID, matchers []storepb.LabelMatcher) (*storepb.LabelNamesRequest, error) {
func createLabelNamesRequest(minT, maxT, limit int64, blockIDs []ulid.ULID, matchers []storepb.LabelMatcher) (*storepb.LabelNamesRequest, error) {
req := &storepb.LabelNamesRequest{
Start: minT,
End: maxT,
Limit: limit,
Matchers: matchers,
}

Expand All @@ -1085,10 +1101,11 @@ func createLabelNamesRequest(minT, maxT int64, blockIDs []ulid.ULID, matchers []
return req, nil
}

func createLabelValuesRequest(minT, maxT int64, label string, blockIDs []ulid.ULID, matchers ...*labels.Matcher) (*storepb.LabelValuesRequest, error) {
func createLabelValuesRequest(minT, maxT, limit int64, label string, blockIDs []ulid.ULID, matchers ...*labels.Matcher) (*storepb.LabelValuesRequest, error) {
req := &storepb.LabelValuesRequest{
Start: minT,
End: maxT,
Limit: limit,
Label: label,
Matchers: convertMatchersToLabelMatcher(matchers),
}
Expand Down
Loading

0 comments on commit 437cfe5

Please sign in to comment.