From 9635709b20d24eb8f668ba54c1774d6397034135 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 28 Apr 2023 15:30:58 -0700 Subject: [PATCH 1/7] return grpc code resource exhausted for byte limit error Signed-off-by: Ben Ye --- pkg/store/bucket.go | 12 ++++++------ pkg/store/bucket_e2e_test.go | 11 ++++++++++- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 876f8220c6..f95338a9bf 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2329,7 +2329,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys) for _, dataFromCache := range fromCache { if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { - return nil, closeFns, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache") + return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err) } } @@ -2402,7 +2402,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab length := int64(part.End) - start if err := bytesLimiter.Reserve(uint64(length)); err != nil { - return nil, closeFns, errors.Wrap(err, "bytes limit exceeded while fetching postings") + return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching postings: %s", err) } } @@ -2562,7 +2562,7 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser for id, b := range fromCache { r.loadedSeries[id] = b if err := bytesLimiter.Reserve(uint64(len(b))); err != nil { - return errors.Wrap(err, "exceeded bytes limit while loading series from index cache") + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading series from index cache: %s", err) } } @@ -2587,7 +2587,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series if bytesLimiter != nil { if err := bytesLimiter.Reserve(uint64(end - start)); err != nil { - return errors.Wrap(err, "exceeded bytes limit while fetching series") + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching series: %s", err) } } @@ -2859,7 +2859,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ for _, p := range parts { if err := bytesLimiter.Reserve(uint64(p.End - p.Start)); err != nil { - return errors.Wrap(err, "bytes limit exceeded while fetching chunks") + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err) } } @@ -2976,7 +2976,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a // Read entire chunk into new buffer. // TODO: readChunkRange call could be avoided for any chunk but last in this particular part. if err := bytesLimiter.Reserve(uint64(chunkLen)); err != nil { - return errors.Wrap(err, "bytes limit exceeded while fetching chunks") + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err) } nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}) if err != nil { diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 1847e336b7..9e1fd17ca4 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/alecthomas/units" "github.com/go-kit/log" "github.com/gogo/status" "github.com/oklog/ulid" @@ -602,6 +603,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { cases := map[string]struct { maxChunksLimit uint64 maxSeriesLimit uint64 + maxBytesLimit int64 expectedErr string code codes.Code }{ @@ -619,6 +621,13 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { maxSeriesLimit: 1, code: codes.ResourceExhausted, }, + "should fail if the max bytes limit is exceeded - ResourceExhausted": { + maxChunksLimit: expectedChunks, + expectedErr: "exceeded bytes limit", + maxSeriesLimit: 2, + maxBytesLimit: 1, + code: codes.ResourceExhausted, + }, } for testName, testData := range cases { @@ -629,7 +638,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(testData.maxChunksLimit), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(testData.maxChunksLimit), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(units.Base2Bytes(testData.maxBytesLimit)), emptyRelabelConfig, allowAllFilterConf) testutil.Ok(t, s.store.SyncBlocks(ctx)) req := &storepb.SeriesRequest{ From a9c5ed304b20a6e7bbcb434e45ddf8a16e2a9f84 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 28 Apr 2023 15:38:00 -0700 Subject: [PATCH 2/7] fix lint Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c08147575..dedf12c5b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6306](https://github.com/thanos-io/thanos/pull/6306) Tracing: tracing in OTLP utilize the OTEL_TRACES_SAMPLER env variable - [#6330](https://github.com/thanos-io/thanos/pull/6330) Store: Fix inconsistent error for series limits. - [#6342](https://github.com/thanos-io/thanos/pull/6342) Cache/Redis: Upgrade `rueidis` to v1.0.2 to to improve error handling while shrinking a redis cluster. +- [#6325](https://github.com/thanos-io/thanos/pull/6325) Store: return gRPC resource exhausted error for byte limiter. ### Changed - [#6168](https://github.com/thanos-io/thanos/pull/6168) Receiver: Make ketama hashring fail early when configured with number of nodes lower than the replication factor. From cd7901826e6b82320898b025bec93cd1d04d9db0 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 30 Apr 2023 11:03:39 -0700 Subject: [PATCH 3/7] update partial response strategy Signed-off-by: Ben Ye --- test/e2e/store_gateway_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index d82eac5585..b5c69f4a3b 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -6,6 +6,7 @@ package e2e_test import ( "context" "fmt" + "github.com/thanos-io/thanos/pkg/store/storepb" "net/http" "os" "path" @@ -869,7 +870,9 @@ config: q1.Endpoint("http"), func() string { return testQuery }, time.Now, - promclient.QueryOptions{Deduplicate: true}, 0) + promclient.QueryOptions{Deduplicate: true, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, + 0, + ) if err != nil { if strings.Contains(err.Error(), "expanded matching posting: get postings: bytes limit exceeded while fetching postings: limit 1 violated") { return nil @@ -885,7 +888,9 @@ config: q2.Endpoint("http"), func() string { return testQuery }, time.Now, - promclient.QueryOptions{Deduplicate: true}, 0) + promclient.QueryOptions{Deduplicate: true, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, + 0, + ) if err != nil { if strings.Contains(err.Error(), "preload series: exceeded bytes limit while fetching series: limit 100 violated") { return nil From c041f032699bf5db934dc11d01c4f07b758afccc Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 30 Apr 2023 11:13:45 -0700 Subject: [PATCH 4/7] fix limit Signed-off-by: Ben Ye --- test/e2e/store_gateway_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index b5c69f4a3b..e037326b5e 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -6,7 +6,6 @@ package e2e_test import ( "context" "fmt" - "github.com/thanos-io/thanos/pkg/store/storepb" "net/http" "os" "path" @@ -15,6 +14,7 @@ import ( "testing" "time" + "github.com/efficientgo/core/testutil" "github.com/efficientgo/e2e" e2edb "github.com/efficientgo/e2e/db" e2emon "github.com/efficientgo/e2e/monitoring" @@ -24,18 +24,17 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/timestamp" - "github.com/thanos-io/objstore/providers/s3" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/client" - - "github.com/efficientgo/core/testutil" + "github.com/thanos-io/objstore/providers/s3" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/cacheutil" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" ) From 291752d3b31699f2bcba86a2a3b55c611cb61cbc Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 7 May 2023 23:33:21 -0700 Subject: [PATCH 5/7] try to fix tests Signed-off-by: Ben Ye --- pkg/store/bucket_test.go | 3 +++ test/e2e/store_gateway_test.go | 29 ++++------------------------- 2 files changed, 7 insertions(+), 25 deletions(-) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 28b96025db..90ba08feef 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1213,6 +1213,9 @@ func benchmarkExpandedPostings( for _, c := range cases { t.Run(c.name, func(t testutil.TB) { + if c.name != `i=~".*"` { + return + } b := &bucketBlock{ logger: log.NewNopLogger(), metrics: newBucketStoreMetrics(nil), diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index e037326b5e..37870ab5a6 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -860,19 +860,12 @@ config: testutil.Ok(t, store1.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) testutil.Ok(t, store2.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) testutil.Ok(t, store3.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) + opts := promclient.QueryOptions{Deduplicate: true, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT} t.Run("Series() limits", func(t *testing.T) { testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { - _, err := simpleInstantQuery(t, - ctx, - q1.Endpoint("http"), - func() string { return testQuery }, - time.Now, - promclient.QueryOptions{Deduplicate: true, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, - 0, - ) - if err != nil { + if _, _, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+q1.Endpoint("http")), testQuery, time.Now(), opts); err != nil { if strings.Contains(err.Error(), "expanded matching posting: get postings: bytes limit exceeded while fetching postings: limit 1 violated") { return nil } @@ -882,15 +875,7 @@ config: })) testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { - _, err := simpleInstantQuery(t, - ctx, - q2.Endpoint("http"), - func() string { return testQuery }, - time.Now, - promclient.QueryOptions{Deduplicate: true, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, - 0, - ) - if err != nil { + if _, _, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+q2.Endpoint("http")), testQuery, time.Now(), opts); err != nil { if strings.Contains(err.Error(), "preload series: exceeded bytes limit while fetching series: limit 100 violated") { return nil } @@ -900,13 +885,7 @@ config: })) testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { - _, err := simpleInstantQuery(t, - ctx, - q3.Endpoint("http"), - func() string { return testQuery }, - time.Now, - promclient.QueryOptions{Deduplicate: true}, 0) - if err != nil { + if _, _, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+q3.Endpoint("http")), testQuery, time.Now(), opts); err != nil { if strings.Contains(err.Error(), "load chunks: bytes limit exceeded while fetching chunks: limit 196627 violated") { return nil } From fa5fc4b18ee00e4afcb2f14ddc1fe8d9ff8cc4e1 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 8 May 2023 00:10:03 -0700 Subject: [PATCH 6/7] fix test error message Signed-off-by: Ben Ye --- test/e2e/store_gateway_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 37870ab5a6..fc4db723a0 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -866,7 +866,8 @@ config: testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { if _, _, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+q1.Endpoint("http")), testQuery, time.Now(), opts); err != nil { - if strings.Contains(err.Error(), "expanded matching posting: get postings: bytes limit exceeded while fetching postings: limit 1 violated") { + e := err.Error() + if strings.Contains(e, "expanded matching posting: get postings") && strings.Contains(e, "exceeded bytes limit while fetching postings: limit 1 violated") { return nil } return err @@ -876,7 +877,8 @@ config: testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { if _, _, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+q2.Endpoint("http")), testQuery, time.Now(), opts); err != nil { - if strings.Contains(err.Error(), "preload series: exceeded bytes limit while fetching series: limit 100 violated") { + e := err.Error() + if strings.Contains(e, "preload series") && strings.Contains(e, "exceeded bytes limit while fetching series: limit 100 violated") { return nil } return err @@ -886,7 +888,8 @@ config: testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { if _, _, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+q3.Endpoint("http")), testQuery, time.Now(), opts); err != nil { - if strings.Contains(err.Error(), "load chunks: bytes limit exceeded while fetching chunks: limit 196627 violated") { + e := err.Error() + if strings.Contains(e, "load chunks") && strings.Contains(e, "bytes limit exceeded while fetching chunks: limit 196627 violated") { return nil } return err From e83ca1f7929457e185ba118342c3f05e0926ffaa Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 8 May 2023 00:26:41 -0700 Subject: [PATCH 7/7] fix test Signed-off-by: Ben Ye --- test/e2e/store_gateway_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index fc4db723a0..efcf177366 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -889,7 +889,7 @@ config: testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { if _, _, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+q3.Endpoint("http")), testQuery, time.Now(), opts); err != nil { e := err.Error() - if strings.Contains(e, "load chunks") && strings.Contains(e, "bytes limit exceeded while fetching chunks: limit 196627 violated") { + if strings.Contains(e, "load chunks") && strings.Contains(e, "exceeded bytes limit while fetching chunks: limit 196627 violated") { return nil } return err