Skip to content

Commit

Permalink
storegateway: Merge series concurrently (#7456)
Browse files Browse the repository at this point in the history
* storegateway: merge series concurrently

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* storegateway: update BenchmarkMergedSeriesChunkRefsSetIterators

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* Update changelog

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* fixup! storegateway: update BenchmarkMergedSeriesChunkRefsSetIterators

---------

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>
  • Loading branch information
narqo committed Mar 13, 2024
1 parent 08a2bd2 commit 1ba965a
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 71 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Grafana Mimir

* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567

### Mixin
Expand Down
13 changes: 8 additions & 5 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
seriesPerBlock = 1
}

// Create 4 blocks. Each will have seriesPerBlock number of series that have samplesPerSeriesPerBlock samples.
// Create numOfBlocks blocks. Each will have seriesPerBlock number of series that have samplesPerSeriesPerBlock samples.
// Timestamp will be counted for each new series and new sample, so each series will have unique timestamp.
// This allows to pick time range that will correspond to number of series picked 1:1.
for bi := 0; bi < numOfBlocks; bi++ {
Expand Down Expand Up @@ -2951,11 +2951,14 @@ func runTestServerSeries(t test.TB, store *BucketStore, streamingBatchSize int,
t.ResetTimer()
for i := 0; i < t.N(); i++ {
seriesSet, warnings, hints, _, err := srv.Series(context.Background(), c.Req)
require.NoError(t, err)
require.Equal(t, len(c.ExpectedWarnings), len(warnings), "%v", warnings)
require.Equal(t, len(c.ExpectedSeries), len(seriesSet), "Matchers: %v Min time: %d Max time: %d", c.Req.Matchers, c.Req.MinTime, c.Req.MaxTime)

if err != nil {
t.Fatal(err)
}
if !t.IsBenchmark() {
require.NoError(t, err)
require.Equal(t, len(c.ExpectedWarnings), len(warnings), "%v", warnings)
require.Equal(t, len(c.ExpectedSeries), len(seriesSet), "Matchers: %v Min time: %d Max time: %d", c.Req.Matchers, c.Req.MinTime, c.Req.MaxTime)

if len(c.ExpectedSeries) == 1 {
// For bucketStoreAPI chunks are not sorted within response. TODO: Investigate: Is this fine?
sort.Slice(seriesSet[0].Chunks, func(i, j int) bool {
Expand Down
23 changes: 11 additions & 12 deletions pkg/storegateway/series_chunks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func TestPreloadingSetIterator(t *testing.T) {

var source iterator[seriesChunksSet]
source = newSliceSeriesChunksSetIterator(sets...)
source = newDelayedSetIterator[seriesChunksSet](delay, source)
source = newDelayedIterator(delay, source)

preloading := newPreloadingSetIterator[seriesChunksSet](context.Background(), preloadSize, source)

Expand Down Expand Up @@ -348,7 +348,7 @@ func TestPreloadingSetIterator(t *testing.T) {

var source iterator[seriesChunksSet]
source = newSliceSeriesChunksSetIteratorWithError(errors.New("mocked error"), len(sets), sets...)
source = newDelayedSetIterator[seriesChunksSet](delay, source)
source = newDelayedIterator(delay, source)

preloading := newPreloadingSetIterator[seriesChunksSet](context.Background(), preloadSize, source)

Expand All @@ -374,7 +374,7 @@ func TestPreloadingSetIterator(t *testing.T) {

var source iterator[seriesChunksSet]
source = newSliceSeriesChunksSetIteratorWithError(errors.New("mocked error"), len(sets), sets...)
source = newDelayedSetIterator[seriesChunksSet](delay, source)
source = newDelayedIterator(delay, source)

preloading := newPreloadingSetIterator[seriesChunksSet](ctx, 1, source)

Expand Down Expand Up @@ -402,7 +402,7 @@ func TestPreloadingSetIterator(t *testing.T) {

var source iterator[seriesChunksSet]
source = newSliceSeriesChunksSetIteratorWithError(errors.New("mocked error"), len(sets), sets...)
source = newDelayedSetIterator[seriesChunksSet](delay, source)
source = newDelayedIterator(delay, source)

preloading := newPreloadingSetIterator[seriesChunksSet](ctx, 1, source)

Expand Down Expand Up @@ -884,31 +884,30 @@ func (s *sliceSeriesChunksSetIterator) Err() error {
return nil
}

// delayedSetIterator implements iterator and
// introduces an artificial delay before returning from Next() and At().
type delayedSetIterator[S any] struct {
// delayedIterator implements iterator and introduces an artificial delay before returning from Next() and At().
type delayedIterator[S any] struct {
wrapped iterator[S]
delay time.Duration
}

func newDelayedSetIterator[S any](delay time.Duration, wrapped iterator[S]) *delayedSetIterator[S] {
return &delayedSetIterator[S]{
func newDelayedIterator[S any](delay time.Duration, wrapped iterator[S]) *delayedIterator[S] {
return &delayedIterator[S]{
wrapped: wrapped,
delay: delay,
}
}

func (s *delayedSetIterator[S]) Next() bool {
func (s *delayedIterator[S]) Next() bool {
time.Sleep(s.delay)
return s.wrapped.Next()
}

func (s *delayedSetIterator[S]) At() S {
func (s *delayedIterator[S]) At() S {
time.Sleep(s.delay)
return s.wrapped.At()
}

func (s *delayedSetIterator[S]) Err() error {
func (s *delayedIterator[S]) Err() error {
return s.wrapped.Err()
}

Expand Down
32 changes: 24 additions & 8 deletions pkg/storegateway/series_refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/prometheus/tsdb/hashcache"
"github.com/prometheus/prometheus/tsdb/index"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/sharding"
Expand Down Expand Up @@ -345,14 +346,8 @@ func (s *mergedSeriesChunkRefsSet) Next() bool {
next := newSeriesChunkRefsSet(s.batchSize, true)

for i := 0; i < s.batchSize; i++ {
if err := s.ensureItemAvailableToRead(s.aAt, s.a); err != nil {
// Stop iterating on first error encountered.
s.current = seriesChunkRefsSet{}
s.done = true
return false
}

if err := s.ensureItemAvailableToRead(s.bAt, s.b); err != nil {
err := s.ensureCursors(s.aAt, s.bAt, s.a, s.b)
if err != nil {
// Stop iterating on first error encountered.
s.current = seriesChunkRefsSet{}
s.done = true
Expand All @@ -379,6 +374,27 @@ func (s *mergedSeriesChunkRefsSet) Next() bool {
return true
}

func (s *mergedSeriesChunkRefsSet) ensureCursors(curr1, curr2 *seriesChunkRefsIterator, set1, set2 iterator[seriesChunkRefsSet]) error {
// When both cursors are empty, we advance their set iterators concurrently to reduce total waiting time for the
// IO from underlying set iterators (see grafana/mimir#4596).
// If either of cursors are already populated with data (or completed), the cost of goroutine switch outweigh
// the cost of single call to ensureItemAvailableToRead, thus below we call them sequentially.
if curr1.Done() && curr2.Done() {
var g errgroup.Group
g.Go(func() error { return s.ensureItemAvailableToRead(curr1, set1) })
g.Go(func() error { return s.ensureItemAvailableToRead(curr2, set2) })
return g.Wait()
}

if err := s.ensureItemAvailableToRead(curr1, set1); err != nil {
return err
}
if err := s.ensureItemAvailableToRead(curr2, set2); err != nil {
return err
}
return nil
}

// ensureItemAvailableToRead ensures curr has an item available to read, unless we reached the
// end of all sets. If curr has no item available to read, it will advance the iterator, eventually
// picking the next one from the set.
Expand Down
115 changes: 69 additions & 46 deletions pkg/storegateway/series_refs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math/rand"
"sort"
"testing"
"time"

"github.com/go-kit/log"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -662,6 +663,22 @@ func TestMergedSeriesChunkRefsSet_Concurrency(t *testing.T) {
}

func BenchmarkMergedSeriesChunkRefsSetIterators(b *testing.B) {
for numIterators := 1; numIterators <= 64; numIterators *= 2 {
b.Run(fmt.Sprintf("number of iterators = %d", numIterators), func(b *testing.B) {
for _, withDuplicatedSeries := range []bool{true, false} {
b.Run(fmt.Sprintf("with duplicates = %v", withDuplicatedSeries), func(b *testing.B) {
for _, withIO := range []bool{false, true} {
b.Run(fmt.Sprintf("with IO = %v", withIO), func(b *testing.B) {
benchmarkMergedSeriesChunkRefsSetIterators(b, numIterators, withDuplicatedSeries, withIO)
})
}
})
}
})
}
}

func benchmarkMergedSeriesChunkRefsSetIterators(b *testing.B, numIterators int, withDuplicatedSeries, withIO bool) {
const (
numSetsPerIterator = 10
numSeriesPerSet = 10
Expand All @@ -683,60 +700,66 @@ func BenchmarkMergedSeriesChunkRefsSetIterators(b *testing.B) {
return sets
}

for _, withDuplicatedSeries := range []bool{true, false} {
for numIterators := 1; numIterators <= 64; numIterators *= 2 {
// Create empty iterators that we can reuse in each benchmark run.
iterators := make([]iterator[seriesChunkRefsSet], 0, numIterators)
for i := 0; i < numIterators; i++ {
iterators = append(iterators, newSliceSeriesChunkRefsSetIterator(nil))
}
// Create empty iterators that we can reuse in each benchmark run.
iterators := make([]iterator[seriesChunkRefsSet], 0, numIterators)
for i := 0; i < numIterators; i++ {
iterators = append(iterators, newSliceSeriesChunkRefsSetIterator(nil))
}

// Create the sets for each underlying iterator. These sets cannot be released because
// will be used in multiple benchmark runs.
perIteratorSets := make([][]seriesChunkRefsSet, 0, numIterators)
for iteratorIdx := 0; iteratorIdx < numIterators; iteratorIdx++ {
if withDuplicatedSeries {
perIteratorSets = append(perIteratorSets, createUnreleasableSets(0))
} else {
perIteratorSets = append(perIteratorSets, createUnreleasableSets(iteratorIdx))
}
}
batch := make([]iterator[seriesChunkRefsSet], len(iterators))
for i := 0; i < numIterators; i++ {
if withIO {
// The delay represents an IO operation, that happens inside real set iterations.
batch[i] = newDelayedIterator(10*time.Microsecond, iterators[i])
} else {
batch[i] = iterators[i]
}
}

b.Run(fmt.Sprintf("with duplicated series = %t number of iterators = %d", withDuplicatedSeries, numIterators), func(b *testing.B) {
for n := 0; n < b.N; n++ {
// Reset iterators.
for i := 0; i < numIterators; i++ {
iterators[i].(*sliceSeriesChunkRefsSetIterator).reset(perIteratorSets[i])
}
// Create the sets for each underlying iterator. These sets cannot be released because
// will be used in multiple benchmark runs.
perIteratorSets := make([][]seriesChunkRefsSet, 0, numIterators)
for iteratorIdx := 0; iteratorIdx < numIterators; iteratorIdx++ {
if withDuplicatedSeries {
perIteratorSets = append(perIteratorSets, createUnreleasableSets(0))
} else {
perIteratorSets = append(perIteratorSets, createUnreleasableSets(iteratorIdx))
}
}

// Merge the iterators and run through them.
it := mergedSeriesChunkRefsSetIterators(mergedBatchSize, iterators...)
b.ResetTimer()

actualSeries := 0
for it.Next() {
set := it.At()
actualSeries += len(set.series)
for n := 0; n < b.N; n++ {
// Reset batch's underlying iterators.
for i := 0; i < numIterators; i++ {
iterators[i].(*sliceSeriesChunkRefsSetIterator).reset(perIteratorSets[i])
}

set.release()
}
// Merge the iterators and run through them.
it := mergedSeriesChunkRefsSetIterators(mergedBatchSize, batch...)

if err := it.Err(); err != nil {
b.Fatal(it.Err())
}
actualSeries := 0
for it.Next() {
set := it.At()
actualSeries += len(set.series)

// Ensure each benchmark run go through the same data set.
var expectedSeries int
if withDuplicatedSeries {
expectedSeries = numSetsPerIterator * numSeriesPerSet
} else {
expectedSeries = numIterators * numSetsPerIterator * numSeriesPerSet
}
set.release()
}

if actualSeries != expectedSeries {
b.Fatalf("benchmark iterated through an unexpected number of series (expected: %d got: %d)", expectedSeries, actualSeries)
}
}
})
if err := it.Err(); err != nil {
b.Fatal(it.Err())
}

// Ensure each benchmark run go through the same data set.
var expectedSeries int
if withDuplicatedSeries {
expectedSeries = numSetsPerIterator * numSeriesPerSet
} else {
expectedSeries = numIterators * numSetsPerIterator * numSeriesPerSet
}

if actualSeries != expectedSeries {
b.Fatalf("benchmark iterated through an unexpected number of series (expected: %d got: %d)", expectedSeries, actualSeries)
}
}
}
Expand Down

0 comments on commit 1ba965a

Please sign in to comment.