Skip to content

Commit

Permalink
Fix: Failure to close BlockSeriesClient cause store-gateway deadlock (#…
Browse files Browse the repository at this point in the history
…6086)

* Fix: Failure to close BlockSeriesClient cause store-gateway deadlock

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* Adding tests

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* reverting the change on get series

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* fix lint

Signed-off-by: Alan Protasio <alanprot@gmail.com>

---------

Signed-off-by: Alan Protasio <alanprot@gmail.com>
  • Loading branch information
alanprot committed Jan 31, 2023
1 parent 4a3166f commit a3b6b10
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
}

shardMatcher := req.ShardInfo.Matcher(&s.buffers)

blockClient := newBlockSeriesClient(
srv.Context(),
s.logger,
Expand All @@ -1238,9 +1239,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.seriesBatchSize,
s.metrics.chunkFetchDuration,
)

defer blockClient.Close()

g.Go(func() error {

span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{
"block.id": blk.meta.ULID,
"block.mint": blk.meta.MinTime,
Expand Down Expand Up @@ -1482,6 +1485,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
SkipChunks: true,
}
blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize, s.metrics.chunkFetchDuration)
defer blockClient.Close()

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
Expand Down Expand Up @@ -1656,6 +1660,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
SkipChunks: true,
}
blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize, s.metrics.chunkFetchDuration)
defer blockClient.Close()

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
Expand Down
23 changes: 23 additions & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -765,6 +766,10 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {
} {
t.Run(name, func(t *testing.T) {
vals, err := s.store.LabelNames(ctx, tc.req)
for _, b := range s.store.blocks {
waitTimeout(t, &b.pendingReaders, 5*time.Second)
}

testutil.Ok(t, err)

testutil.Equals(t, tc.expected, vals.Names)
Expand Down Expand Up @@ -868,6 +873,10 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) {
} {
t.Run(name, func(t *testing.T) {
vals, err := s.store.LabelValues(ctx, tc.req)
for _, b := range s.store.blocks {
waitTimeout(t, &b.pendingReaders, 5*time.Second)
}

testutil.Ok(t, err)

testutil.Equals(t, tc.expected, emptyToNil(vals.Values))
Expand All @@ -882,3 +891,17 @@ func emptyToNil(values []string) []string {
}
return values
}

func waitTimeout(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return
case <-time.After(timeout):
t.Fatalf("timeout waiting wg for %v", timeout)
}
}

0 comments on commit a3b6b10

Please sign in to comment.