Skip to content

Commit

Permalink
store/bucket: remove sort.Slice data race
Browse files Browse the repository at this point in the history
The matchers slice is now sorted in each call but that introduces a data
race because the slice is shared between all calls. Do the sorting once
on the outermost function.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Aug 1, 2023
1 parent a35a5b2 commit 9becefb
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
40 changes: 26 additions & 14 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,24 @@ func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats {
return stats
}

type sortedMatchers []*labels.Matcher

func newSortedMatchers(matchers []*labels.Matcher) sortedMatchers {
sort.Slice(matchers, func(i, j int) bool {
if matchers[i].Type == matchers[j].Type {
if matchers[i].Name == matchers[j].Name {
return matchers[i].Value < matchers[j].Value
}
return matchers[i].Name < matchers[j].Name
}
return matchers[i].Type < matchers[j].Type
})

return matchers
}

func (b *blockSeriesClient) ExpandPostings(
matchers []*labels.Matcher,
matchers sortedMatchers,
seriesLimiter SeriesLimiter,
) error {
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter)
Expand Down Expand Up @@ -1284,6 +1300,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
continue
}

sortedBlockMatchers := newSortedMatchers(blockMatchers)

blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers)

if s.debugLogging {
Expand Down Expand Up @@ -1326,7 +1344,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
"block.resolution": blk.meta.Thanos.Downsample.Resolution,
})

if err := blockClient.ExpandPostings(blockMatchers, seriesLimiter); err != nil {
if err := blockClient.ExpandPostings(sortedBlockMatchers, seriesLimiter); err != nil {
span.Finish()
return errors.Wrapf(err, "fetch series for block %s", blk.meta.ULID)
}
Expand Down Expand Up @@ -1527,6 +1545,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
continue
}

sortedReqSeriesMatchersNoExtLabels := newSortedMatchers(reqSeriesMatchersNoExtLabels)

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()
Expand Down Expand Up @@ -1581,7 +1601,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
defer blockClient.Close()

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
sortedReqSeriesMatchersNoExtLabels,
seriesLimiter,
); err != nil {
return err
Expand Down Expand Up @@ -1727,6 +1747,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
reqSeriesMatchersNoExtLabels = append(reqSeriesMatchersNoExtLabels, m)
}

sortedReqSeriesMatchersNoExtLabels := newSortedMatchers(reqSeriesMatchersNoExtLabels)

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()
Expand Down Expand Up @@ -1775,7 +1797,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
defer blockClient.Close()

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
sortedReqSeriesMatchersNoExtLabels,
seriesLimiter,
); err != nil {
return err
Expand Down Expand Up @@ -2203,16 +2225,6 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
return nil, nil
}

// Sort matchers to make sure we generate the same cache key.
sort.Slice(ms, func(i, j int) bool {
if ms[i].Type == ms[j].Type {
if ms[i].Name == ms[j].Name {
return ms[i].Value < ms[j].Value
}
return ms[i].Name < ms[j].Name
}
return ms[i].Type < ms[j].Type
})
hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2562,6 +2562,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
// TODO FIXME! testutil.Ok calls b.Fatalf under the hood, which
// must be called only from the goroutine running the Benchmark function.
testutil.Ok(b, err)
sortedMatchers := newSortedMatchers(matchers)

dummyHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{})
blockClient := newBlockSeriesClient(
Expand All @@ -2577,7 +2578,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
dummyHistogram,
nil,
)
testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter))
testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter))
defer blockClient.Close()

// Ensure at least 1 series has been returned (as expected).
Expand Down

0 comments on commit 9becefb

Please sign in to comment.