From f783447bb5ce82f547250bf222c650430c6757d6 Mon Sep 17 00:00:00 2001 From: Vladimir Varankin Date: Fri, 23 Feb 2024 15:11:28 +0100 Subject: [PATCH 1/4] storegateway: merge series concurrently Signed-off-by: Vladimir Varankin --- pkg/storegateway/bucket_test.go | 13 ++++++++----- pkg/storegateway/series_refs.go | 32 ++++++++++++++++++++++++-------- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 8bb2c474eba..c68bb012d6a 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -1347,7 +1347,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries seriesPerBlock = 1 } - // Create 4 blocks. Each will have seriesPerBlock number of series that have samplesPerSeriesPerBlock samples. + // Create numOfBlocks blocks. Each will have seriesPerBlock number of series that have samplesPerSeriesPerBlock samples. // Timestamp will be counted for each new series and new sample, so each series will have unique timestamp. // This allows to pick time range that will correspond to number of series picked 1:1. for bi := 0; bi < numOfBlocks; bi++ { @@ -2951,11 +2951,14 @@ func runTestServerSeries(t test.TB, store *BucketStore, streamingBatchSize int, t.ResetTimer() for i := 0; i < t.N(); i++ { seriesSet, warnings, hints, _, err := srv.Series(context.Background(), c.Req) - require.NoError(t, err) - require.Equal(t, len(c.ExpectedWarnings), len(warnings), "%v", warnings) - require.Equal(t, len(c.ExpectedSeries), len(seriesSet), "Matchers: %v Min time: %d Max time: %d", c.Req.Matchers, c.Req.MinTime, c.Req.MaxTime) - + if err != nil { + t.Fatal(err) + } if !t.IsBenchmark() { + require.NoError(t, err) + require.Equal(t, len(c.ExpectedWarnings), len(warnings), "%v", warnings) + require.Equal(t, len(c.ExpectedSeries), len(seriesSet), "Matchers: %v Min time: %d Max time: %d", c.Req.Matchers, c.Req.MinTime, c.Req.MaxTime) + if len(c.ExpectedSeries) == 1 { // For bucketStoreAPI chunks are not sorted within response. TODO: Investigate: Is this fine? sort.Slice(seriesSet[0].Chunks, func(i, j int) bool { diff --git a/pkg/storegateway/series_refs.go b/pkg/storegateway/series_refs.go index b72ddadc527..df23681acb5 100644 --- a/pkg/storegateway/series_refs.go +++ b/pkg/storegateway/series_refs.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/prometheus/tsdb/hashcache" "github.com/prometheus/prometheus/tsdb/index" "golang.org/x/exp/slices" + "golang.org/x/sync/errgroup" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/storage/sharding" @@ -345,14 +346,8 @@ func (s *mergedSeriesChunkRefsSet) Next() bool { next := newSeriesChunkRefsSet(s.batchSize, true) for i := 0; i < s.batchSize; i++ { - if err := s.ensureItemAvailableToRead(s.aAt, s.a); err != nil { - // Stop iterating on first error encountered. - s.current = seriesChunkRefsSet{} - s.done = true - return false - } - - if err := s.ensureItemAvailableToRead(s.bAt, s.b); err != nil { + err := s.ensureCursors(s.aAt, s.bAt, s.a, s.b) + if err != nil { // Stop iterating on first error encountered. s.current = seriesChunkRefsSet{} s.done = true @@ -379,6 +374,27 @@ func (s *mergedSeriesChunkRefsSet) Next() bool { return true } +func (s *mergedSeriesChunkRefsSet) ensureCursors(curr1, curr2 *seriesChunkRefsIterator, set1, set2 iterator[seriesChunkRefsSet]) error { + // When both cursors are empty, we advance their set iterators concurrently to reduce total waiting time for the + // IO from underlying set iterators (see grafana/mimir#4596). + // If either of cursors are already populated with data (or completed), the cost of goroutine switch outweigh + // the cost of single call to ensureItemAvailableToRead, thus below we call them sequentially. + if curr1.Done() && curr2.Done() { + var g errgroup.Group + g.Go(func() error { return s.ensureItemAvailableToRead(curr1, set1) }) + g.Go(func() error { return s.ensureItemAvailableToRead(curr2, set2) }) + return g.Wait() + } + + if err := s.ensureItemAvailableToRead(curr1, set1); err != nil { + return err + } + if err := s.ensureItemAvailableToRead(curr2, set2); err != nil { + return err + } + return nil +} + // ensureItemAvailableToRead ensures curr has an item available to read, unless we reached the // end of all sets. If curr has no item available to read, it will advance the iterator, eventually // picking the next one from the set. From d7d849f6b6f12e83bd690250aa7413fbc515e589 Mon Sep 17 00:00:00 2001 From: Vladimir Varankin Date: Mon, 26 Feb 2024 13:30:50 +0100 Subject: [PATCH 2/4] storegateway: update BenchmarkMergedSeriesChunkRefsSetIterators Signed-off-by: Vladimir Varankin --- pkg/storegateway/series_chunks_test.go | 48 +++++++---- pkg/storegateway/series_refs_test.go | 115 +++++++++++++++---------- 2 files changed, 102 insertions(+), 61 deletions(-) diff --git a/pkg/storegateway/series_chunks_test.go b/pkg/storegateway/series_chunks_test.go index c31b19bbb2c..5f528ea5749 100644 --- a/pkg/storegateway/series_chunks_test.go +++ b/pkg/storegateway/series_chunks_test.go @@ -320,7 +320,7 @@ func TestPreloadingSetIterator(t *testing.T) { var source iterator[seriesChunksSet] source = newSliceSeriesChunksSetIterator(sets...) - source = newDelayedSetIterator[seriesChunksSet](delay, source) + source = newDelayedIterator(delay, source) preloading := newPreloadingSetIterator[seriesChunksSet](context.Background(), preloadSize, source) @@ -348,7 +348,7 @@ func TestPreloadingSetIterator(t *testing.T) { var source iterator[seriesChunksSet] source = newSliceSeriesChunksSetIteratorWithError(errors.New("mocked error"), len(sets), sets...) - source = newDelayedSetIterator[seriesChunksSet](delay, source) + source = newDelayedIterator(delay, source) preloading := newPreloadingSetIterator[seriesChunksSet](context.Background(), preloadSize, source) @@ -374,7 +374,7 @@ func TestPreloadingSetIterator(t *testing.T) { var source iterator[seriesChunksSet] source = newSliceSeriesChunksSetIteratorWithError(errors.New("mocked error"), len(sets), sets...) - source = newDelayedSetIterator[seriesChunksSet](delay, source) + source = newDelayedIterator(delay, source) preloading := newPreloadingSetIterator[seriesChunksSet](ctx, 1, source) @@ -402,7 +402,7 @@ func TestPreloadingSetIterator(t *testing.T) { var source iterator[seriesChunksSet] source = newSliceSeriesChunksSetIteratorWithError(errors.New("mocked error"), len(sets), sets...) - source = newDelayedSetIterator[seriesChunksSet](delay, source) + source = newDelayedIterator(delay, source) preloading := newPreloadingSetIterator[seriesChunksSet](ctx, 1, source) @@ -884,31 +884,49 @@ func (s *sliceSeriesChunksSetIterator) Err() error { return nil } -// delayedSetIterator implements iterator and -// introduces an artificial delay before returning from Next() and At(). -type delayedSetIterator[S any] struct { +// delayedIterator implements iterator and introduces an artificial delay before returning from Next() and At(). +type delayedIterator[S any] struct { wrapped iterator[S] - delay time.Duration + + chance int // a chance for the delay to happen + delay time.Duration +} + +func newDelayedIterator[S any](delay time.Duration, wrapped iterator[S]) *delayedIterator[S] { + return &delayedIterator[S]{ + wrapped: wrapped, + delay: delay, + } } -func newDelayedSetIterator[S any](delay time.Duration, wrapped iterator[S]) *delayedSetIterator[S] { - return &delayedSetIterator[S]{ +func newDelayedMaybeIterator[S any](chance int, delay time.Duration, wrapped iterator[S]) *delayedIterator[S] { + if chance < 0 || chance > 100 { + panic("delayedIterator: probability must be between 0 and 100") + } + return &delayedIterator[S]{ wrapped: wrapped, + chance: chance, delay: delay, } } -func (s *delayedSetIterator[S]) Next() bool { - time.Sleep(s.delay) +func (s *delayedIterator[S]) delayMaybe() { + if s.chance == 0 || rand.Intn(100) <= s.chance { + time.Sleep(s.delay) + } +} + +func (s *delayedIterator[S]) Next() bool { + s.delayMaybe() return s.wrapped.Next() } -func (s *delayedSetIterator[S]) At() S { - time.Sleep(s.delay) +func (s *delayedIterator[S]) At() S { + s.delayMaybe() return s.wrapped.At() } -func (s *delayedSetIterator[S]) Err() error { +func (s *delayedIterator[S]) Err() error { return s.wrapped.Err() } diff --git a/pkg/storegateway/series_refs_test.go b/pkg/storegateway/series_refs_test.go index fbcf7c623ed..53cdd00d923 100644 --- a/pkg/storegateway/series_refs_test.go +++ b/pkg/storegateway/series_refs_test.go @@ -10,6 +10,7 @@ import ( "math/rand" "sort" "testing" + "time" "github.com/go-kit/log" "github.com/oklog/ulid" @@ -662,6 +663,22 @@ func TestMergedSeriesChunkRefsSet_Concurrency(t *testing.T) { } func BenchmarkMergedSeriesChunkRefsSetIterators(b *testing.B) { + for numIterators := 1; numIterators <= 64; numIterators *= 2 { + b.Run(fmt.Sprintf("number of iterators = %d", numIterators), func(b *testing.B) { + for _, withDuplicatedSeries := range []bool{true, false} { + b.Run(fmt.Sprintf("with duplicates = %v", withDuplicatedSeries), func(b *testing.B) { + for _, withIO := range []bool{false, true} { + b.Run(fmt.Sprintf("with IO = %v", withIO), func(b *testing.B) { + benchmarkMergedSeriesChunkRefsSetIterators(b, numIterators, withDuplicatedSeries, withIO) + }) + } + }) + } + }) + } +} + +func benchmarkMergedSeriesChunkRefsSetIterators(b *testing.B, numIterators int, withDuplicatedSeries, withIO bool) { const ( numSetsPerIterator = 10 numSeriesPerSet = 10 @@ -683,60 +700,66 @@ func BenchmarkMergedSeriesChunkRefsSetIterators(b *testing.B) { return sets } - for _, withDuplicatedSeries := range []bool{true, false} { - for numIterators := 1; numIterators <= 64; numIterators *= 2 { - // Create empty iterators that we can reuse in each benchmark run. - iterators := make([]iterator[seriesChunkRefsSet], 0, numIterators) - for i := 0; i < numIterators; i++ { - iterators = append(iterators, newSliceSeriesChunkRefsSetIterator(nil)) - } + // Create empty iterators that we can reuse in each benchmark run. + iterators := make([]iterator[seriesChunkRefsSet], 0, numIterators) + for i := 0; i < numIterators; i++ { + iterators = append(iterators, newSliceSeriesChunkRefsSetIterator(nil)) + } - // Create the sets for each underlying iterator. These sets cannot be released because - // will be used in multiple benchmark runs. - perIteratorSets := make([][]seriesChunkRefsSet, 0, numIterators) - for iteratorIdx := 0; iteratorIdx < numIterators; iteratorIdx++ { - if withDuplicatedSeries { - perIteratorSets = append(perIteratorSets, createUnreleasableSets(0)) - } else { - perIteratorSets = append(perIteratorSets, createUnreleasableSets(iteratorIdx)) - } - } + batch := make([]iterator[seriesChunkRefsSet], len(iterators)) + for i := 0; i < numIterators; i++ { + if withIO { + // Delay only 2% of iterations to mimic IO ops, happening during real set iterations. + batch[i] = newDelayedMaybeIterator(2, time.Millisecond, iterators[i]) + } else { + batch[i] = iterators[i] + } + } - b.Run(fmt.Sprintf("with duplicated series = %t number of iterators = %d", withDuplicatedSeries, numIterators), func(b *testing.B) { - for n := 0; n < b.N; n++ { - // Reset iterators. - for i := 0; i < numIterators; i++ { - iterators[i].(*sliceSeriesChunkRefsSetIterator).reset(perIteratorSets[i]) - } + // Create the sets for each underlying iterator. These sets cannot be released because + // will be used in multiple benchmark runs. + perIteratorSets := make([][]seriesChunkRefsSet, 0, numIterators) + for iteratorIdx := 0; iteratorIdx < numIterators; iteratorIdx++ { + if withDuplicatedSeries { + perIteratorSets = append(perIteratorSets, createUnreleasableSets(0)) + } else { + perIteratorSets = append(perIteratorSets, createUnreleasableSets(iteratorIdx)) + } + } - // Merge the iterators and run through them. - it := mergedSeriesChunkRefsSetIterators(mergedBatchSize, iterators...) + b.ResetTimer() - actualSeries := 0 - for it.Next() { - set := it.At() - actualSeries += len(set.series) + for n := 0; n < b.N; n++ { + // Reset batch's underlying iterators. + for i := 0; i < numIterators; i++ { + iterators[i].(*sliceSeriesChunkRefsSetIterator).reset(perIteratorSets[i]) + } - set.release() - } + // Merge the iterators and run through them. + it := mergedSeriesChunkRefsSetIterators(mergedBatchSize, batch...) - if err := it.Err(); err != nil { - b.Fatal(it.Err()) - } + actualSeries := 0 + for it.Next() { + set := it.At() + actualSeries += len(set.series) - // Ensure each benchmark run go through the same data set. - var expectedSeries int - if withDuplicatedSeries { - expectedSeries = numSetsPerIterator * numSeriesPerSet - } else { - expectedSeries = numIterators * numSetsPerIterator * numSeriesPerSet - } + set.release() + } - if actualSeries != expectedSeries { - b.Fatalf("benchmark iterated through an unexpected number of series (expected: %d got: %d)", expectedSeries, actualSeries) - } - } - }) + if err := it.Err(); err != nil { + b.Fatal(it.Err()) + } + + // Ensure each benchmark run go through the same data set. + var expectedSeries int + if withDuplicatedSeries { + expectedSeries = numSetsPerIterator * numSeriesPerSet + } else { + expectedSeries = numIterators * numSetsPerIterator * numSeriesPerSet + } + + if actualSeries != expectedSeries { + b.Fatalf("benchmark iterated through an unexpected number of series (expected: %d got: %d)", expectedSeries, actualSeries) } } } From c296e8e81081c32a39739d7e0d273a6dccd71dd6 Mon Sep 17 00:00:00 2001 From: Vladimir Varankin Date: Mon, 26 Feb 2024 13:42:03 +0100 Subject: [PATCH 3/4] Update changelog Signed-off-by: Vladimir Varankin --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea4a72cadc9..ab96dbabc6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Grafana Mimir +* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456 * [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567 ### Mixin From 7a5b62815d76769651a90edb9cec5fad03625fcb Mon Sep 17 00:00:00 2001 From: Vladimir Varankin Date: Wed, 28 Feb 2024 13:04:07 +0100 Subject: [PATCH 4/4] fixup! storegateway: update BenchmarkMergedSeriesChunkRefsSetIterators --- pkg/storegateway/series_chunks_test.go | 25 +++---------------------- pkg/storegateway/series_refs_test.go | 4 ++-- 2 files changed, 5 insertions(+), 24 deletions(-) diff --git a/pkg/storegateway/series_chunks_test.go b/pkg/storegateway/series_chunks_test.go index 5f528ea5749..5cdfb6cea43 100644 --- a/pkg/storegateway/series_chunks_test.go +++ b/pkg/storegateway/series_chunks_test.go @@ -887,9 +887,7 @@ func (s *sliceSeriesChunksSetIterator) Err() error { // delayedIterator implements iterator and introduces an artificial delay before returning from Next() and At(). type delayedIterator[S any] struct { wrapped iterator[S] - - chance int // a chance for the delay to happen - delay time.Duration + delay time.Duration } func newDelayedIterator[S any](delay time.Duration, wrapped iterator[S]) *delayedIterator[S] { @@ -899,30 +897,13 @@ func newDelayedIterator[S any](delay time.Duration, wrapped iterator[S]) *delaye } } -func newDelayedMaybeIterator[S any](chance int, delay time.Duration, wrapped iterator[S]) *delayedIterator[S] { - if chance < 0 || chance > 100 { - panic("delayedIterator: probability must be between 0 and 100") - } - return &delayedIterator[S]{ - wrapped: wrapped, - chance: chance, - delay: delay, - } -} - -func (s *delayedIterator[S]) delayMaybe() { - if s.chance == 0 || rand.Intn(100) <= s.chance { - time.Sleep(s.delay) - } -} - func (s *delayedIterator[S]) Next() bool { - s.delayMaybe() + time.Sleep(s.delay) return s.wrapped.Next() } func (s *delayedIterator[S]) At() S { - s.delayMaybe() + time.Sleep(s.delay) return s.wrapped.At() } diff --git a/pkg/storegateway/series_refs_test.go b/pkg/storegateway/series_refs_test.go index 53cdd00d923..5ce1f159736 100644 --- a/pkg/storegateway/series_refs_test.go +++ b/pkg/storegateway/series_refs_test.go @@ -709,8 +709,8 @@ func benchmarkMergedSeriesChunkRefsSetIterators(b *testing.B, numIterators int, batch := make([]iterator[seriesChunkRefsSet], len(iterators)) for i := 0; i < numIterators; i++ { if withIO { - // Delay only 2% of iterations to mimic IO ops, happening during real set iterations. - batch[i] = newDelayedMaybeIterator(2, time.Millisecond, iterators[i]) + // The delay represents an IO operation, that happens inside real set iterations. + batch[i] = newDelayedIterator(10*time.Microsecond, iterators[i]) } else { batch[i] = iterators[i] }