Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storegateway: Merge series concurrently #7456

Merged
merged 4 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Grafana Mimir

* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567

### Mixin
Expand Down
13 changes: 8 additions & 5 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
seriesPerBlock = 1
}

// Create 4 blocks. Each will have seriesPerBlock number of series that have samplesPerSeriesPerBlock samples.
// Create numOfBlocks blocks. Each will have seriesPerBlock number of series that have samplesPerSeriesPerBlock samples.
// Timestamp will be counted for each new series and new sample, so each series will have unique timestamp.
// This allows to pick time range that will correspond to number of series picked 1:1.
for bi := 0; bi < numOfBlocks; bi++ {
Expand Down Expand Up @@ -2951,11 +2951,14 @@ func runTestServerSeries(t test.TB, store *BucketStore, streamingBatchSize int,
t.ResetTimer()
for i := 0; i < t.N(); i++ {
seriesSet, warnings, hints, _, err := srv.Series(context.Background(), c.Req)
require.NoError(t, err)
require.Equal(t, len(c.ExpectedWarnings), len(warnings), "%v", warnings)
require.Equal(t, len(c.ExpectedSeries), len(seriesSet), "Matchers: %v Min time: %d Max time: %d", c.Req.Matchers, c.Req.MinTime, c.Req.MaxTime)

if err != nil {
t.Fatal(err)
}
if !t.IsBenchmark() {
require.NoError(t, err)
require.Equal(t, len(c.ExpectedWarnings), len(warnings), "%v", warnings)
require.Equal(t, len(c.ExpectedSeries), len(seriesSet), "Matchers: %v Min time: %d Max time: %d", c.Req.Matchers, c.Req.MinTime, c.Req.MaxTime)

if len(c.ExpectedSeries) == 1 {
// For bucketStoreAPI chunks are not sorted within response. TODO: Investigate: Is this fine?
sort.Slice(seriesSet[0].Chunks, func(i, j int) bool {
Expand Down
23 changes: 11 additions & 12 deletions pkg/storegateway/series_chunks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func TestPreloadingSetIterator(t *testing.T) {

var source iterator[seriesChunksSet]
source = newSliceSeriesChunksSetIterator(sets...)
source = newDelayedSetIterator[seriesChunksSet](delay, source)
source = newDelayedIterator(delay, source)

preloading := newPreloadingSetIterator[seriesChunksSet](context.Background(), preloadSize, source)

Expand Down Expand Up @@ -348,7 +348,7 @@ func TestPreloadingSetIterator(t *testing.T) {

var source iterator[seriesChunksSet]
source = newSliceSeriesChunksSetIteratorWithError(errors.New("mocked error"), len(sets), sets...)
source = newDelayedSetIterator[seriesChunksSet](delay, source)
source = newDelayedIterator(delay, source)

preloading := newPreloadingSetIterator[seriesChunksSet](context.Background(), preloadSize, source)

Expand All @@ -374,7 +374,7 @@ func TestPreloadingSetIterator(t *testing.T) {

var source iterator[seriesChunksSet]
source = newSliceSeriesChunksSetIteratorWithError(errors.New("mocked error"), len(sets), sets...)
source = newDelayedSetIterator[seriesChunksSet](delay, source)
source = newDelayedIterator(delay, source)

preloading := newPreloadingSetIterator[seriesChunksSet](ctx, 1, source)

Expand Down Expand Up @@ -402,7 +402,7 @@ func TestPreloadingSetIterator(t *testing.T) {

var source iterator[seriesChunksSet]
source = newSliceSeriesChunksSetIteratorWithError(errors.New("mocked error"), len(sets), sets...)
source = newDelayedSetIterator[seriesChunksSet](delay, source)
source = newDelayedIterator(delay, source)

preloading := newPreloadingSetIterator[seriesChunksSet](ctx, 1, source)

Expand Down Expand Up @@ -884,31 +884,30 @@ func (s *sliceSeriesChunksSetIterator) Err() error {
return nil
}

// delayedSetIterator implements iterator and
// introduces an artificial delay before returning from Next() and At().
type delayedSetIterator[S any] struct {
// delayedIterator implements iterator and introduces an artificial delay before returning from Next() and At().
type delayedIterator[S any] struct {
wrapped iterator[S]
delay time.Duration
}

func newDelayedSetIterator[S any](delay time.Duration, wrapped iterator[S]) *delayedSetIterator[S] {
return &delayedSetIterator[S]{
func newDelayedIterator[S any](delay time.Duration, wrapped iterator[S]) *delayedIterator[S] {
return &delayedIterator[S]{
wrapped: wrapped,
delay: delay,
}
}

func (s *delayedSetIterator[S]) Next() bool {
func (s *delayedIterator[S]) Next() bool {
time.Sleep(s.delay)
return s.wrapped.Next()
}

func (s *delayedSetIterator[S]) At() S {
func (s *delayedIterator[S]) At() S {
time.Sleep(s.delay)
return s.wrapped.At()
}

func (s *delayedSetIterator[S]) Err() error {
func (s *delayedIterator[S]) Err() error {
return s.wrapped.Err()
}

Expand Down
32 changes: 24 additions & 8 deletions pkg/storegateway/series_refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/prometheus/tsdb/hashcache"
"github.com/prometheus/prometheus/tsdb/index"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/sharding"
Expand Down Expand Up @@ -345,14 +346,8 @@ func (s *mergedSeriesChunkRefsSet) Next() bool {
next := newSeriesChunkRefsSet(s.batchSize, true)

for i := 0; i < s.batchSize; i++ {
if err := s.ensureItemAvailableToRead(s.aAt, s.a); err != nil {
// Stop iterating on first error encountered.
s.current = seriesChunkRefsSet{}
s.done = true
return false
}

if err := s.ensureItemAvailableToRead(s.bAt, s.b); err != nil {
err := s.ensureCursors(s.aAt, s.bAt, s.a, s.b)
if err != nil {
// Stop iterating on first error encountered.
s.current = seriesChunkRefsSet{}
s.done = true
Expand All @@ -379,6 +374,27 @@ func (s *mergedSeriesChunkRefsSet) Next() bool {
return true
}

func (s *mergedSeriesChunkRefsSet) ensureCursors(curr1, curr2 *seriesChunkRefsIterator, set1, set2 iterator[seriesChunkRefsSet]) error {
// When both cursors are empty, we advance their set iterators concurrently to reduce total waiting time for the
// IO from underlying set iterators (see grafana/mimir#4596).
// If either of cursors are already populated with data (or completed), the cost of goroutine switch outweigh
// the cost of single call to ensureItemAvailableToRead, thus below we call them sequentially.
if curr1.Done() && curr2.Done() {
var g errgroup.Group
g.Go(func() error { return s.ensureItemAvailableToRead(curr1, set1) })
narqo marked this conversation as resolved.
Show resolved Hide resolved
g.Go(func() error { return s.ensureItemAvailableToRead(curr2, set2) })
return g.Wait()
}

if err := s.ensureItemAvailableToRead(curr1, set1); err != nil {
return err
}
if err := s.ensureItemAvailableToRead(curr2, set2); err != nil {
return err
}
return nil
}

// ensureItemAvailableToRead ensures curr has an item available to read, unless we reached the
// end of all sets. If curr has no item available to read, it will advance the iterator, eventually
// picking the next one from the set.
Expand Down
115 changes: 69 additions & 46 deletions pkg/storegateway/series_refs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math/rand"
"sort"
"testing"
"time"

"github.com/go-kit/log"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -662,6 +663,22 @@ func TestMergedSeriesChunkRefsSet_Concurrency(t *testing.T) {
}

func BenchmarkMergedSeriesChunkRefsSetIterators(b *testing.B) {
for numIterators := 1; numIterators <= 64; numIterators *= 2 {
b.Run(fmt.Sprintf("number of iterators = %d", numIterators), func(b *testing.B) {
for _, withDuplicatedSeries := range []bool{true, false} {
b.Run(fmt.Sprintf("with duplicates = %v", withDuplicatedSeries), func(b *testing.B) {
for _, withIO := range []bool{false, true} {
b.Run(fmt.Sprintf("with IO = %v", withIO), func(b *testing.B) {
benchmarkMergedSeriesChunkRefsSetIterators(b, numIterators, withDuplicatedSeries, withIO)
})
}
})
}
})
}
}

func benchmarkMergedSeriesChunkRefsSetIterators(b *testing.B, numIterators int, withDuplicatedSeries, withIO bool) {
const (
numSetsPerIterator = 10
numSeriesPerSet = 10
Expand All @@ -683,60 +700,66 @@ func BenchmarkMergedSeriesChunkRefsSetIterators(b *testing.B) {
return sets
}

for _, withDuplicatedSeries := range []bool{true, false} {
for numIterators := 1; numIterators <= 64; numIterators *= 2 {
// Create empty iterators that we can reuse in each benchmark run.
iterators := make([]iterator[seriesChunkRefsSet], 0, numIterators)
for i := 0; i < numIterators; i++ {
iterators = append(iterators, newSliceSeriesChunkRefsSetIterator(nil))
}
// Create empty iterators that we can reuse in each benchmark run.
iterators := make([]iterator[seriesChunkRefsSet], 0, numIterators)
for i := 0; i < numIterators; i++ {
iterators = append(iterators, newSliceSeriesChunkRefsSetIterator(nil))
}

// Create the sets for each underlying iterator. These sets cannot be released because
// will be used in multiple benchmark runs.
perIteratorSets := make([][]seriesChunkRefsSet, 0, numIterators)
for iteratorIdx := 0; iteratorIdx < numIterators; iteratorIdx++ {
if withDuplicatedSeries {
perIteratorSets = append(perIteratorSets, createUnreleasableSets(0))
} else {
perIteratorSets = append(perIteratorSets, createUnreleasableSets(iteratorIdx))
}
}
batch := make([]iterator[seriesChunkRefsSet], len(iterators))
for i := 0; i < numIterators; i++ {
if withIO {
// The delay represents an IO operation, that happens inside real set iterations.
batch[i] = newDelayedIterator(10*time.Microsecond, iterators[i])
} else {
batch[i] = iterators[i]
}
}

b.Run(fmt.Sprintf("with duplicated series = %t number of iterators = %d", withDuplicatedSeries, numIterators), func(b *testing.B) {
for n := 0; n < b.N; n++ {
// Reset iterators.
for i := 0; i < numIterators; i++ {
iterators[i].(*sliceSeriesChunkRefsSetIterator).reset(perIteratorSets[i])
}
// Create the sets for each underlying iterator. These sets cannot be released because
// will be used in multiple benchmark runs.
perIteratorSets := make([][]seriesChunkRefsSet, 0, numIterators)
for iteratorIdx := 0; iteratorIdx < numIterators; iteratorIdx++ {
if withDuplicatedSeries {
perIteratorSets = append(perIteratorSets, createUnreleasableSets(0))
} else {
perIteratorSets = append(perIteratorSets, createUnreleasableSets(iteratorIdx))
}
}

// Merge the iterators and run through them.
it := mergedSeriesChunkRefsSetIterators(mergedBatchSize, iterators...)
b.ResetTimer()

actualSeries := 0
for it.Next() {
set := it.At()
actualSeries += len(set.series)
for n := 0; n < b.N; n++ {
// Reset batch's underlying iterators.
for i := 0; i < numIterators; i++ {
iterators[i].(*sliceSeriesChunkRefsSetIterator).reset(perIteratorSets[i])
}

set.release()
}
// Merge the iterators and run through them.
it := mergedSeriesChunkRefsSetIterators(mergedBatchSize, batch...)

if err := it.Err(); err != nil {
b.Fatal(it.Err())
}
actualSeries := 0
for it.Next() {
set := it.At()
actualSeries += len(set.series)

// Ensure each benchmark run go through the same data set.
var expectedSeries int
if withDuplicatedSeries {
expectedSeries = numSetsPerIterator * numSeriesPerSet
} else {
expectedSeries = numIterators * numSetsPerIterator * numSeriesPerSet
}
set.release()
}

if actualSeries != expectedSeries {
b.Fatalf("benchmark iterated through an unexpected number of series (expected: %d got: %d)", expectedSeries, actualSeries)
}
}
})
if err := it.Err(); err != nil {
b.Fatal(it.Err())
}

// Ensure each benchmark run go through the same data set.
var expectedSeries int
if withDuplicatedSeries {
expectedSeries = numSetsPerIterator * numSeriesPerSet
} else {
expectedSeries = numIterators * numSetsPerIterator * numSeriesPerSet
}

if actualSeries != expectedSeries {
b.Fatalf("benchmark iterated through an unexpected number of series (expected: %d got: %d)", expectedSeries, actualSeries)
}
}
}
Expand Down
Loading