From 8aada65f43b44454f2f29dac3a89012f6a8f2080 Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Tue, 28 Feb 2023 18:22:47 +0100 Subject: [PATCH 1/3] Store: fix error handling on limits Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> --- pkg/store/bucket.go | 5 ++-- pkg/store/bucket_e2e_test.go | 44 +++--------------------------------- 2 files changed, 6 insertions(+), 43 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index cc0ca30e38..fc6129f815 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -9,6 +9,7 @@ import ( "context" "encoding/binary" "fmt" + "github.com/weaveworks/common/httpgrpc" "hash" "io" "math" @@ -932,7 +933,7 @@ func (b *blockSeriesClient) ExpandPostings( } if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil { - return errors.Wrap(err, "exceeded series limit") + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) } b.postings = ps @@ -1031,7 +1032,7 @@ func (b *blockSeriesClient) nextBatch() error { // Ensure sample limit through chunksLimiter if we return chunks. if err := b.chunksLimiter.Reserve(uint64(len(b.chkMetas))); err != nil { - return errors.Wrap(err, "exceeded chunks limit") + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded chunks limit: %s", err) } b.entries = append(b.entries, s) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index ccea0ff92a..55b4320fc8 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -22,7 +22,6 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/storage" - "github.com/weaveworks/common/httpgrpc" "google.golang.org/grpc/codes" "github.com/thanos-io/objstore" @@ -53,20 +52,6 @@ type swappableCache struct { ptr storecache.IndexCache } -type customLimiter struct { - limiter *Limiter - code codes.Code -} - -func (c *customLimiter) Reserve(num uint64) error { - err := c.limiter.Reserve(num) - if err != nil { - return httpgrpc.Errorf(int(c.code), err.Error()) - } - - return nil -} - func (c *swappableCache) SwapWith(ptr2 storecache.IndexCache) { c.ptr = ptr2 } @@ -135,24 +120,6 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o return } -func newCustomChunksLimiterFactory(limit uint64, code codes.Code) ChunksLimiterFactory { - return func(failedCounter prometheus.Counter) ChunksLimiter { - return &customLimiter{ - limiter: NewLimiter(limit, failedCounter), - code: code, - } - } -} - -func newCustomSeriesLimiterFactory(limit uint64, code codes.Code) SeriesLimiterFactory { - return func(failedCounter prometheus.Counter) SeriesLimiter { - return &customLimiter{ - limiter: NewLimiter(limit, failedCounter), - code: code, - } - } -} - func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, bytesLimiterFactory BytesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), @@ -645,16 +612,11 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { expectedErr: "exceeded chunks limit", code: codes.ResourceExhausted, }, - "should fail if the max chunks limit is exceeded - 422": { - maxChunksLimit: expectedChunks - 1, - expectedErr: "exceeded chunks limit", - code: 422, - }, - "should fail if the max series limit is exceeded - 422": { + "should fail if the max series limit is exceeded - ResourceExhausted": { maxChunksLimit: expectedChunks, expectedErr: "exceeded series limit", maxSeriesLimit: 1, - code: 422, + code: codes.ResourceExhausted, }, } @@ -666,7 +628,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(testData.maxChunksLimit), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) testutil.Ok(t, s.store.SyncBlocks(ctx)) req := &storepb.SeriesRequest{ From 0c4eff65bdd20b09c584e288f0ee9574ad96ff3d Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Tue, 28 Feb 2023 18:28:00 +0100 Subject: [PATCH 2/3] Update changelog Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cf095f77b..200fefe250 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed +- [#6171](https://github.com/thanos-io/thanos/pull/6171) Store: fix error handling on limits. + ### Changed ### Removed From 60cf9a024769b40931d0154ca97647b3147630d8 Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Tue, 28 Feb 2023 18:35:30 +0100 Subject: [PATCH 3/3] Run goimports Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> --- pkg/store/bucket.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index fc6129f815..dfe99e2ee3 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -9,7 +9,6 @@ import ( "context" "encoding/binary" "fmt" - "github.com/weaveworks/common/httpgrpc" "hash" "io" "math" @@ -21,6 +20,8 @@ import ( "sync" "time" + "github.com/weaveworks/common/httpgrpc" + "github.com/cespare/xxhash" "github.com/alecthomas/units"