From ff5e06548f75b8f3323d40a8ded82a57149c36a5 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 30 Jan 2023 13:28:51 -0800 Subject: [PATCH 1/4] Fix: Failure to close BlockSeriesClient cause store-gateway deadlock Signed-off-by: Alan Protasio --- pkg/store/bucket.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ad37f3254f..821bb72b14 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1226,21 +1226,22 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie } shardMatcher := req.ShardInfo.Matcher(&s.buffers) - blockClient := newBlockSeriesClient( - srv.Context(), - s.logger, - blk, - req, - chunksLimiter, - bytesLimiter, - shardMatcher, - s.enableChunkHashCalculation, - s.seriesBatchSize, - s.metrics.chunkFetchDuration, - ) - defer blockClient.Close() g.Go(func() error { + blockClient := newBlockSeriesClient( + srv.Context(), + s.logger, + blk, + req, + chunksLimiter, + bytesLimiter, + shardMatcher, + s.enableChunkHashCalculation, + s.seriesBatchSize, + s.metrics.chunkFetchDuration, + ) + defer blockClient.Close() + span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{ "block.id": blk.meta.ULID, "block.mint": blk.meta.MinTime, @@ -1482,6 +1483,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq SkipChunks: true, } blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize, s.metrics.chunkFetchDuration) + defer blockClient.Close() if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, @@ -1656,6 +1658,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR SkipChunks: true, } blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize, s.metrics.chunkFetchDuration) + defer blockClient.Close() if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, From 27e72c59629c915c397a5d2451b008173f5f253a Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 30 Jan 2023 14:18:57 -0800 Subject: [PATCH 2/4] Adding tests Signed-off-by: Alan Protasio --- pkg/store/bucket_e2e_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 3588c5f061..a6af5a2922 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strings" + "sync" "testing" "time" @@ -765,6 +766,10 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) { } { t.Run(name, func(t *testing.T) { vals, err := s.store.LabelNames(ctx, tc.req) + for _, b := range s.store.blocks { + waitTimeout(t, b.pendingReaders, 5*time.Second) + } + testutil.Ok(t, err) testutil.Equals(t, tc.expected, vals.Names) @@ -868,6 +873,10 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) { } { t.Run(name, func(t *testing.T) { vals, err := s.store.LabelValues(ctx, tc.req) + for _, b := range s.store.blocks { + waitTimeout(t, b.pendingReaders, 5*time.Second) + } + testutil.Ok(t, err) testutil.Equals(t, tc.expected, emptyToNil(vals.Values)) @@ -882,3 +891,17 @@ func emptyToNil(values []string) []string { } return values } + +func waitTimeout(t *testing.T, wg sync.WaitGroup, timeout time.Duration) { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + return + case <-time.After(timeout): + t.Fatalf("timeout waiting wg for %v", timeout) + } +} From fcd6679adae5addf39cdc091bde2fe2d1945d950 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 30 Jan 2023 14:54:07 -0800 Subject: [PATCH 3/4] reverting the change on get series Signed-off-by: Alan Protasio --- pkg/store/bucket.go | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 821bb72b14..23517b8d91 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1227,20 +1227,22 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie shardMatcher := req.ShardInfo.Matcher(&s.buffers) + blockClient := newBlockSeriesClient( + srv.Context(), + s.logger, + blk, + req, + chunksLimiter, + bytesLimiter, + shardMatcher, + s.enableChunkHashCalculation, + s.seriesBatchSize, + s.metrics.chunkFetchDuration, + ) + + defer blockClient.Close() + g.Go(func() error { - blockClient := newBlockSeriesClient( - srv.Context(), - s.logger, - blk, - req, - chunksLimiter, - bytesLimiter, - shardMatcher, - s.enableChunkHashCalculation, - s.seriesBatchSize, - s.metrics.chunkFetchDuration, - ) - defer blockClient.Close() span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{ "block.id": blk.meta.ULID, From a28f6803eb5db3fe2d471702eb4508daa6c128a1 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 30 Jan 2023 14:56:37 -0800 Subject: [PATCH 4/4] fix lint Signed-off-by: Alan Protasio --- pkg/store/bucket_e2e_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index a6af5a2922..e24a83ac3e 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -767,7 +767,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) { t.Run(name, func(t *testing.T) { vals, err := s.store.LabelNames(ctx, tc.req) for _, b := range s.store.blocks { - waitTimeout(t, b.pendingReaders, 5*time.Second) + waitTimeout(t, &b.pendingReaders, 5*time.Second) } testutil.Ok(t, err) @@ -874,7 +874,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) { t.Run(name, func(t *testing.T) { vals, err := s.store.LabelValues(ctx, tc.req) for _, b := range s.store.blocks { - waitTimeout(t, b.pendingReaders, 5*time.Second) + waitTimeout(t, &b.pendingReaders, 5*time.Second) } testutil.Ok(t, err) @@ -892,7 +892,7 @@ func emptyToNil(values []string) []string { return values } -func waitTimeout(t *testing.T, wg sync.WaitGroup, timeout time.Duration) { +func waitTimeout(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) { c := make(chan struct{}) go func() { defer close(c)