Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: pull in fixes #65

Merged
merged 2 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,24 @@ func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats {
return stats
}

type sortedMatchers []*labels.Matcher

func newSortedMatchers(matchers []*labels.Matcher) sortedMatchers {
sort.Slice(matchers, func(i, j int) bool {
if matchers[i].Type == matchers[j].Type {
if matchers[i].Name == matchers[j].Name {
return matchers[i].Value < matchers[j].Value
}
return matchers[i].Name < matchers[j].Name
}
return matchers[i].Type < matchers[j].Type
})

return matchers
Comment on lines +968 to +978
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could reduce nesting by:

sort.Slice(matchers, func(i, j int) bool {
   if matchers[i].Type != matchers[j].Type {
      return matchers[i].Type < matchers[j].Type
   }
			
   if matchers[i].Name == matchers[j].Name {
      return matchers[i].Value < matchers[j].Value
   }

   return matchers[i].Name < matchers[j].Name
})

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original PR I copied/pasted the same logic thanos-io#6575 so I'd prefer to keep the same to reduce discrepancy 😄

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could update both PR's 😏 😄

}

func (b *blockSeriesClient) ExpandPostings(
matchers []*labels.Matcher,
matchers sortedMatchers,
seriesLimiter SeriesLimiter,
) error {
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter)
Expand Down Expand Up @@ -1284,6 +1300,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
continue
}

sortedBlockMatchers := newSortedMatchers(blockMatchers)

blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers)

if s.debugLogging {
Expand Down Expand Up @@ -1326,7 +1344,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
"block.resolution": blk.meta.Thanos.Downsample.Resolution,
})

if err := blockClient.ExpandPostings(blockMatchers, seriesLimiter); err != nil {
if err := blockClient.ExpandPostings(sortedBlockMatchers, seriesLimiter); err != nil {
span.Finish()
return errors.Wrapf(err, "fetch series for block %s", blk.meta.ULID)
}
Expand Down Expand Up @@ -1527,6 +1545,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
continue
}

sortedReqSeriesMatchersNoExtLabels := newSortedMatchers(reqSeriesMatchersNoExtLabels)

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()
Expand Down Expand Up @@ -1581,7 +1601,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
defer blockClient.Close()

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
sortedReqSeriesMatchersNoExtLabels,
seriesLimiter,
); err != nil {
return err
Expand Down Expand Up @@ -1727,6 +1747,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
reqSeriesMatchersNoExtLabels = append(reqSeriesMatchersNoExtLabels, m)
}

sortedReqSeriesMatchersNoExtLabels := newSortedMatchers(reqSeriesMatchersNoExtLabels)

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()
Expand Down Expand Up @@ -1775,7 +1797,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
defer blockClient.Close()

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
sortedReqSeriesMatchersNoExtLabels,
seriesLimiter,
); err != nil {
return err
Expand Down Expand Up @@ -2196,23 +2218,13 @@ func (r *bucketIndexReader) reset() {
// Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) {
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) {
// Shortcut the case of `len(postingGroups) == 0`. It will only happen when no
// matchers specified, and we don't need to fetch expanded postings from cache.
if len(ms) == 0 {
return nil, nil
}

// Sort matchers to make sure we generate the same cache key.
sort.Slice(ms, func(i, j int) bool {
if ms[i].Type == ms[j].Type {
if ms[i].Name == ms[j].Name {
return ms[i].Value < ms[j].Value
}
return ms[i].Name < ms[j].Name
}
return ms[i].Type < ms[j].Type
})
hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3214,9 +3226,14 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
// loadChunks will read range [start, end] from the segment file with sequence number seq.
// This data range covers chunks starting at supplied offsets.
func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error {
var locked bool
fetchBegin := time.Now()
defer func() {
if !locked {
r.mtx.Lock()
}
r.stats.ChunksFetchDurationSum += time.Since(fetchBegin)
r.mtx.Unlock()
}()

// Get a reader for the required range.
Expand All @@ -3227,15 +3244,9 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
defer runutil.CloseWithLogOnErr(r.block.logger, reader, "readChunkRange close range reader")
bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize)

locked := true
locked = true
r.mtx.Lock()

defer func() {
if locked {
r.mtx.Unlock()
}
}()

r.stats.chunksFetchCount++
r.stats.chunksFetched += len(pIdxs)
r.stats.ChunksFetchedSizeSum += units.Base2Bytes(int(part.End - part.Start))
Expand Down
7 changes: 4 additions & 3 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1231,7 +1231,7 @@ func benchmarkExpandedPostings(

t.ResetTimer()
for i := 0; i < t.N(); i++ {
p, err := indexr.ExpandedPostings(context.Background(), c.matchers, NewBytesLimiterFactory(0)(nil))
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil))
testutil.Ok(t, err)
testutil.Equals(t, c.expectedLen, len(p))
}
Expand Down Expand Up @@ -1264,7 +1264,7 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) {
matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo")
// Match nothing.
matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*")
ps, err := indexr.ExpandedPostings(context.Background(), []*labels.Matcher{matcher1, matcher2}, NewBytesLimiterFactory(0)(nil))
ps, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil))
testutil.Ok(t, err)
testutil.Equals(t, len(ps), 0)
// Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings.
Expand Down Expand Up @@ -2565,6 +2565,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
// TODO FIXME! testutil.Ok calls b.Fatalf under the hood, which
// must be called only from the goroutine running the Benchmark function.
testutil.Ok(b, err)
sortedMatchers := newSortedMatchers(matchers)

dummyHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{})
blockClient := newBlockSeriesClient(
Expand All @@ -2580,7 +2581,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
dummyHistogram,
nil,
)
testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter))
testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter))
defer blockClient.Close()

// Ensure at least 1 series has been returned (as expected).
Expand Down
Loading