Skip to content

Commit

Permalink
Store: fix error handling on limits (#6171)
Browse files Browse the repository at this point in the history
* Store: fix error handling on limits

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Update changelog

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Run goimports

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

---------

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>
  • Loading branch information
douglascamata committed Feb 28, 2023
1 parent 6fb4f72 commit 62423a1
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 43 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Fixed

- [#6171](https://github.com/thanos-io/thanos/pull/6171) Store: fix error handling on limits.

### Changed

### Removed
Expand Down
6 changes: 4 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"sync"
"time"

"github.com/weaveworks/common/httpgrpc"

"github.com/cespare/xxhash"

"github.com/alecthomas/units"
Expand Down Expand Up @@ -932,7 +934,7 @@ func (b *blockSeriesClient) ExpandPostings(
}

if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil {
return errors.Wrap(err, "exceeded series limit")
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err)
}

b.postings = ps
Expand Down Expand Up @@ -1031,7 +1033,7 @@ func (b *blockSeriesClient) nextBatch() error {

// Ensure sample limit through chunksLimiter if we return chunks.
if err := b.chunksLimiter.Reserve(uint64(len(b.chkMetas))); err != nil {
return errors.Wrap(err, "exceeded chunks limit")
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded chunks limit: %s", err)
}

b.entries = append(b.entries, s)
Expand Down
44 changes: 3 additions & 41 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc/codes"

"github.com/thanos-io/objstore"
Expand Down Expand Up @@ -53,20 +52,6 @@ type swappableCache struct {
ptr storecache.IndexCache
}

type customLimiter struct {
limiter *Limiter
code codes.Code
}

func (c *customLimiter) Reserve(num uint64) error {
err := c.limiter.Reserve(num)
if err != nil {
return httpgrpc.Errorf(int(c.code), err.Error())
}

return nil
}

func (c *swappableCache) SwapWith(ptr2 storecache.IndexCache) {
c.ptr = ptr2
}
Expand Down Expand Up @@ -135,24 +120,6 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o
return
}

func newCustomChunksLimiterFactory(limit uint64, code codes.Code) ChunksLimiterFactory {
return func(failedCounter prometheus.Counter) ChunksLimiter {
return &customLimiter{
limiter: NewLimiter(limit, failedCounter),
code: code,
}
}
}

func newCustomSeriesLimiterFactory(limit uint64, code codes.Code) SeriesLimiterFactory {
return func(failedCounter prometheus.Counter) SeriesLimiter {
return &customLimiter{
limiter: NewLimiter(limit, failedCounter),
code: code,
}
}
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, bytesLimiterFactory BytesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
Expand Down Expand Up @@ -645,16 +612,11 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
expectedErr: "exceeded chunks limit",
code: codes.ResourceExhausted,
},
"should fail if the max chunks limit is exceeded - 422": {
maxChunksLimit: expectedChunks - 1,
expectedErr: "exceeded chunks limit",
code: 422,
},
"should fail if the max series limit is exceeded - 422": {
"should fail if the max series limit is exceeded - ResourceExhausted": {
maxChunksLimit: expectedChunks,
expectedErr: "exceeded series limit",
maxSeriesLimit: 1,
code: 422,
code: codes.ResourceExhausted,
},
}

Expand All @@ -666,7 +628,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(testData.maxChunksLimit), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
testutil.Ok(t, s.store.SyncBlocks(ctx))

req := &storepb.SeriesRequest{
Expand Down

0 comments on commit 62423a1

Please sign in to comment.