Skip to content

Commit

Permalink
tsdb: Implement limit in block querier
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
  • Loading branch information
harry671003 committed Sep 13, 2024
1 parent b93ed7b commit 3cfe2d1
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 0 deletions.
29 changes: 29 additions & 0 deletions tsdb/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, er

func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
res, err := q.index.SortedLabelValues(ctx, name, matchers...)
res = truncateToLimit(res, hints)
return res, nil, err
}

func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
res, err := q.index.LabelNames(ctx, matchers...)
res = truncateToLimit(res, hints)
return res, nil, err
}

Expand All @@ -101,6 +103,13 @@ func (q *blockBaseQuerier) Close() error {
return errs.Err()
}

func truncateToLimit(s []string, hints *storage.LabelHints) []string {
if hints != nil && hints.Limit > 0 && len(s) > hints.Limit {
s = s[:hints.Limit]
}
return s
}

type blockQuerier struct {
*blockBaseQuerier
}
Expand Down Expand Up @@ -139,6 +148,7 @@ func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.Select
mint = hints.Start
maxt = hints.End
disableTrimming = hints.DisableTrimming
p = truncatePostingsToLimit(hints.Limit, p)
if hints.Func == "series" {
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
return newBlockSeriesSet(index, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming)
Expand All @@ -148,6 +158,25 @@ func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.Select
return newBlockSeriesSet(index, chunks, tombstones, p, mint, maxt, disableTrimming)
}

// truncatePostingsToLimit truncates the postings returned by p to the limit.
func truncatePostingsToLimit(limit int, p index.Postings) index.Postings {
if limit <= 0 {
return p
}

out := make([]storage.SeriesRef, 0, 128)

for p.Next() {
id := p.At()
out = append(out, id)
if len(out) >= limit {
break
}
}

return index.NewListPostings(out)
}

// blockChunkQuerier provides chunk querying access to a single block database.
type blockChunkQuerier struct {
*blockBaseQuerier
Expand Down
90 changes: 90 additions & 0 deletions tsdb/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,96 @@ func TestBlockQuerierDelete(t *testing.T) {
}
}

func TestBlockQuerierLimit(t *testing.T) {
tmpdir := t.TempDir()
ctx := context.Background()
var (
allValues []string
allNames = []string{"__name__"}
seriesEntries []storage.Series
)

for i := 0; i < 5; i++ {
value := fmt.Sprintf("value%d", i)
name := fmt.Sprintf("labelName%d", i)
allValues = append(allValues, value)
allNames = append(allNames, name)

seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings(
"__name__", value, name, value,
), []chunks.Sample{sample{100, 0, nil, nil}}))
}

blockDir := createBlock(t, tmpdir, seriesEntries)

// Check open err.
block, err := OpenBlock(nil, blockDir, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, block.Close()) })

q, err := NewBlockQuerier(block, 0, 100)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, q.Close()) })

type testCase struct {
limit int
expectedSeries []storage.Series
expectedLabelValues []string
expectedLabelNames []string
}

testCases := map[string]testCase{
"without limit": {
expectedSeries: seriesEntries,
expectedLabelValues: allValues,
expectedLabelNames: allNames,
},
"with limit": {
limit: 2,
expectedSeries: seriesEntries[:2],
expectedLabelValues: allValues[:2],
expectedLabelNames: allNames[:2],
},
}

for tName, tc := range testCases {
t.Run(fmt.Sprintf("label values %s", tName), func(t *testing.T) {
values, _, err := q.LabelValues(ctx, "__name__", &storage.LabelHints{
Limit: tc.limit,
})
require.NoError(t, err)
require.Equal(t, tc.expectedLabelValues, values)
})

t.Run(fmt.Sprintf("label names %s", tName), func(t *testing.T) {
names, _, err := q.LabelNames(ctx, &storage.LabelHints{
Limit: tc.limit,
})
require.NoError(t, err)
require.Equal(t, tc.expectedLabelNames, names)
})

t.Run(fmt.Sprintf("select %s", tName), func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchRegexp, "__name__", "value.*")
set := q.Select(ctx, true, &storage.SelectHints{
Start: 0,
End: 100,
Limit: tc.limit,
}, matcher)

var s []storage.Series
for set.Next() {
s = append(s, set.At())
}
require.NoError(t, err)
require.Equal(t, len(tc.expectedSeries), len(s))
for i, exp := range tc.expectedSeries {
require.Equal(t, exp.Labels(), s[i].Labels())
}
})
}
}

type fakeChunksReader struct {
ChunkReader
chks map[chunks.ChunkRef]chunkenc.Chunk
Expand Down

0 comments on commit 3cfe2d1

Please sign in to comment.