diff --git a/CHANGELOG.md b/CHANGELOG.md index ea4a72cadc9..ab96dbabc6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Grafana Mimir +* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456 * [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567 ### Mixin diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 8bb2c474eba..c68bb012d6a 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -1347,7 +1347,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries seriesPerBlock = 1 } - // Create 4 blocks. Each will have seriesPerBlock number of series that have samplesPerSeriesPerBlock samples. + // Create numOfBlocks blocks. Each will have seriesPerBlock number of series that have samplesPerSeriesPerBlock samples. // Timestamp will be counted for each new series and new sample, so each series will have unique timestamp. // This allows to pick time range that will correspond to number of series picked 1:1. for bi := 0; bi < numOfBlocks; bi++ { @@ -2951,11 +2951,14 @@ func runTestServerSeries(t test.TB, store *BucketStore, streamingBatchSize int, t.ResetTimer() for i := 0; i < t.N(); i++ { seriesSet, warnings, hints, _, err := srv.Series(context.Background(), c.Req) - require.NoError(t, err) - require.Equal(t, len(c.ExpectedWarnings), len(warnings), "%v", warnings) - require.Equal(t, len(c.ExpectedSeries), len(seriesSet), "Matchers: %v Min time: %d Max time: %d", c.Req.Matchers, c.Req.MinTime, c.Req.MaxTime) - + if err != nil { + t.Fatal(err) + } if !t.IsBenchmark() { + require.NoError(t, err) + require.Equal(t, len(c.ExpectedWarnings), len(warnings), "%v", warnings) + require.Equal(t, len(c.ExpectedSeries), len(seriesSet), "Matchers: %v Min time: %d Max time: %d", c.Req.Matchers, c.Req.MinTime, c.Req.MaxTime) + if len(c.ExpectedSeries) == 1 { // For bucketStoreAPI chunks are not sorted within response. TODO: Investigate: Is this fine? sort.Slice(seriesSet[0].Chunks, func(i, j int) bool { diff --git a/pkg/storegateway/series_chunks_test.go b/pkg/storegateway/series_chunks_test.go index c31b19bbb2c..5cdfb6cea43 100644 --- a/pkg/storegateway/series_chunks_test.go +++ b/pkg/storegateway/series_chunks_test.go @@ -320,7 +320,7 @@ func TestPreloadingSetIterator(t *testing.T) { var source iterator[seriesChunksSet] source = newSliceSeriesChunksSetIterator(sets...) - source = newDelayedSetIterator[seriesChunksSet](delay, source) + source = newDelayedIterator(delay, source) preloading := newPreloadingSetIterator[seriesChunksSet](context.Background(), preloadSize, source) @@ -348,7 +348,7 @@ func TestPreloadingSetIterator(t *testing.T) { var source iterator[seriesChunksSet] source = newSliceSeriesChunksSetIteratorWithError(errors.New("mocked error"), len(sets), sets...) - source = newDelayedSetIterator[seriesChunksSet](delay, source) + source = newDelayedIterator(delay, source) preloading := newPreloadingSetIterator[seriesChunksSet](context.Background(), preloadSize, source) @@ -374,7 +374,7 @@ func TestPreloadingSetIterator(t *testing.T) { var source iterator[seriesChunksSet] source = newSliceSeriesChunksSetIteratorWithError(errors.New("mocked error"), len(sets), sets...) - source = newDelayedSetIterator[seriesChunksSet](delay, source) + source = newDelayedIterator(delay, source) preloading := newPreloadingSetIterator[seriesChunksSet](ctx, 1, source) @@ -402,7 +402,7 @@ func TestPreloadingSetIterator(t *testing.T) { var source iterator[seriesChunksSet] source = newSliceSeriesChunksSetIteratorWithError(errors.New("mocked error"), len(sets), sets...) - source = newDelayedSetIterator[seriesChunksSet](delay, source) + source = newDelayedIterator(delay, source) preloading := newPreloadingSetIterator[seriesChunksSet](ctx, 1, source) @@ -884,31 +884,30 @@ func (s *sliceSeriesChunksSetIterator) Err() error { return nil } -// delayedSetIterator implements iterator and -// introduces an artificial delay before returning from Next() and At(). -type delayedSetIterator[S any] struct { +// delayedIterator implements iterator and introduces an artificial delay before returning from Next() and At(). +type delayedIterator[S any] struct { wrapped iterator[S] delay time.Duration } -func newDelayedSetIterator[S any](delay time.Duration, wrapped iterator[S]) *delayedSetIterator[S] { - return &delayedSetIterator[S]{ +func newDelayedIterator[S any](delay time.Duration, wrapped iterator[S]) *delayedIterator[S] { + return &delayedIterator[S]{ wrapped: wrapped, delay: delay, } } -func (s *delayedSetIterator[S]) Next() bool { +func (s *delayedIterator[S]) Next() bool { time.Sleep(s.delay) return s.wrapped.Next() } -func (s *delayedSetIterator[S]) At() S { +func (s *delayedIterator[S]) At() S { time.Sleep(s.delay) return s.wrapped.At() } -func (s *delayedSetIterator[S]) Err() error { +func (s *delayedIterator[S]) Err() error { return s.wrapped.Err() } diff --git a/pkg/storegateway/series_refs.go b/pkg/storegateway/series_refs.go index b72ddadc527..df23681acb5 100644 --- a/pkg/storegateway/series_refs.go +++ b/pkg/storegateway/series_refs.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/prometheus/tsdb/hashcache" "github.com/prometheus/prometheus/tsdb/index" "golang.org/x/exp/slices" + "golang.org/x/sync/errgroup" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/storage/sharding" @@ -345,14 +346,8 @@ func (s *mergedSeriesChunkRefsSet) Next() bool { next := newSeriesChunkRefsSet(s.batchSize, true) for i := 0; i < s.batchSize; i++ { - if err := s.ensureItemAvailableToRead(s.aAt, s.a); err != nil { - // Stop iterating on first error encountered. - s.current = seriesChunkRefsSet{} - s.done = true - return false - } - - if err := s.ensureItemAvailableToRead(s.bAt, s.b); err != nil { + err := s.ensureCursors(s.aAt, s.bAt, s.a, s.b) + if err != nil { // Stop iterating on first error encountered. s.current = seriesChunkRefsSet{} s.done = true @@ -379,6 +374,27 @@ func (s *mergedSeriesChunkRefsSet) Next() bool { return true } +func (s *mergedSeriesChunkRefsSet) ensureCursors(curr1, curr2 *seriesChunkRefsIterator, set1, set2 iterator[seriesChunkRefsSet]) error { + // When both cursors are empty, we advance their set iterators concurrently to reduce total waiting time for the + // IO from underlying set iterators (see grafana/mimir#4596). + // If either of cursors are already populated with data (or completed), the cost of goroutine switch outweigh + // the cost of single call to ensureItemAvailableToRead, thus below we call them sequentially. + if curr1.Done() && curr2.Done() { + var g errgroup.Group + g.Go(func() error { return s.ensureItemAvailableToRead(curr1, set1) }) + g.Go(func() error { return s.ensureItemAvailableToRead(curr2, set2) }) + return g.Wait() + } + + if err := s.ensureItemAvailableToRead(curr1, set1); err != nil { + return err + } + if err := s.ensureItemAvailableToRead(curr2, set2); err != nil { + return err + } + return nil +} + // ensureItemAvailableToRead ensures curr has an item available to read, unless we reached the // end of all sets. If curr has no item available to read, it will advance the iterator, eventually // picking the next one from the set. diff --git a/pkg/storegateway/series_refs_test.go b/pkg/storegateway/series_refs_test.go index fbcf7c623ed..5ce1f159736 100644 --- a/pkg/storegateway/series_refs_test.go +++ b/pkg/storegateway/series_refs_test.go @@ -10,6 +10,7 @@ import ( "math/rand" "sort" "testing" + "time" "github.com/go-kit/log" "github.com/oklog/ulid" @@ -662,6 +663,22 @@ func TestMergedSeriesChunkRefsSet_Concurrency(t *testing.T) { } func BenchmarkMergedSeriesChunkRefsSetIterators(b *testing.B) { + for numIterators := 1; numIterators <= 64; numIterators *= 2 { + b.Run(fmt.Sprintf("number of iterators = %d", numIterators), func(b *testing.B) { + for _, withDuplicatedSeries := range []bool{true, false} { + b.Run(fmt.Sprintf("with duplicates = %v", withDuplicatedSeries), func(b *testing.B) { + for _, withIO := range []bool{false, true} { + b.Run(fmt.Sprintf("with IO = %v", withIO), func(b *testing.B) { + benchmarkMergedSeriesChunkRefsSetIterators(b, numIterators, withDuplicatedSeries, withIO) + }) + } + }) + } + }) + } +} + +func benchmarkMergedSeriesChunkRefsSetIterators(b *testing.B, numIterators int, withDuplicatedSeries, withIO bool) { const ( numSetsPerIterator = 10 numSeriesPerSet = 10 @@ -683,60 +700,66 @@ func BenchmarkMergedSeriesChunkRefsSetIterators(b *testing.B) { return sets } - for _, withDuplicatedSeries := range []bool{true, false} { - for numIterators := 1; numIterators <= 64; numIterators *= 2 { - // Create empty iterators that we can reuse in each benchmark run. - iterators := make([]iterator[seriesChunkRefsSet], 0, numIterators) - for i := 0; i < numIterators; i++ { - iterators = append(iterators, newSliceSeriesChunkRefsSetIterator(nil)) - } + // Create empty iterators that we can reuse in each benchmark run. + iterators := make([]iterator[seriesChunkRefsSet], 0, numIterators) + for i := 0; i < numIterators; i++ { + iterators = append(iterators, newSliceSeriesChunkRefsSetIterator(nil)) + } - // Create the sets for each underlying iterator. These sets cannot be released because - // will be used in multiple benchmark runs. - perIteratorSets := make([][]seriesChunkRefsSet, 0, numIterators) - for iteratorIdx := 0; iteratorIdx < numIterators; iteratorIdx++ { - if withDuplicatedSeries { - perIteratorSets = append(perIteratorSets, createUnreleasableSets(0)) - } else { - perIteratorSets = append(perIteratorSets, createUnreleasableSets(iteratorIdx)) - } - } + batch := make([]iterator[seriesChunkRefsSet], len(iterators)) + for i := 0; i < numIterators; i++ { + if withIO { + // The delay represents an IO operation, that happens inside real set iterations. + batch[i] = newDelayedIterator(10*time.Microsecond, iterators[i]) + } else { + batch[i] = iterators[i] + } + } - b.Run(fmt.Sprintf("with duplicated series = %t number of iterators = %d", withDuplicatedSeries, numIterators), func(b *testing.B) { - for n := 0; n < b.N; n++ { - // Reset iterators. - for i := 0; i < numIterators; i++ { - iterators[i].(*sliceSeriesChunkRefsSetIterator).reset(perIteratorSets[i]) - } + // Create the sets for each underlying iterator. These sets cannot be released because + // will be used in multiple benchmark runs. + perIteratorSets := make([][]seriesChunkRefsSet, 0, numIterators) + for iteratorIdx := 0; iteratorIdx < numIterators; iteratorIdx++ { + if withDuplicatedSeries { + perIteratorSets = append(perIteratorSets, createUnreleasableSets(0)) + } else { + perIteratorSets = append(perIteratorSets, createUnreleasableSets(iteratorIdx)) + } + } - // Merge the iterators and run through them. - it := mergedSeriesChunkRefsSetIterators(mergedBatchSize, iterators...) + b.ResetTimer() - actualSeries := 0 - for it.Next() { - set := it.At() - actualSeries += len(set.series) + for n := 0; n < b.N; n++ { + // Reset batch's underlying iterators. + for i := 0; i < numIterators; i++ { + iterators[i].(*sliceSeriesChunkRefsSetIterator).reset(perIteratorSets[i]) + } - set.release() - } + // Merge the iterators and run through them. + it := mergedSeriesChunkRefsSetIterators(mergedBatchSize, batch...) - if err := it.Err(); err != nil { - b.Fatal(it.Err()) - } + actualSeries := 0 + for it.Next() { + set := it.At() + actualSeries += len(set.series) - // Ensure each benchmark run go through the same data set. - var expectedSeries int - if withDuplicatedSeries { - expectedSeries = numSetsPerIterator * numSeriesPerSet - } else { - expectedSeries = numIterators * numSetsPerIterator * numSeriesPerSet - } + set.release() + } - if actualSeries != expectedSeries { - b.Fatalf("benchmark iterated through an unexpected number of series (expected: %d got: %d)", expectedSeries, actualSeries) - } - } - }) + if err := it.Err(); err != nil { + b.Fatal(it.Err()) + } + + // Ensure each benchmark run go through the same data set. + var expectedSeries int + if withDuplicatedSeries { + expectedSeries = numSetsPerIterator * numSeriesPerSet + } else { + expectedSeries = numIterators * numSetsPerIterator * numSeriesPerSet + } + + if actualSeries != expectedSeries { + b.Fatalf("benchmark iterated through an unexpected number of series (expected: %d got: %d)", expectedSeries, actualSeries) } } }