diff --git a/CHANGELOG.md b/CHANGELOG.md index 275bbe1f6bf..a28a1c00f04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ * [ENHANCEMENT] All: improved IPv6 support by using the proper host:port formatting. #6311 * [ENHANCEMENT] Querier: always return error encountered during chunks streaming, rather than `the stream has already been exhausted`. #6345 * [ENHANCEMENT] Query-frontend: add `instance_enable_ipv6` to support IPv6. #6111 +* [ENHANCEMENT] Store-gateway: return same detailed error messages as queriers when chunks or series limits are reached. #6347 * [ENHANCEMENT] Querier: reduce memory consumed for queries that hit store-gateways. #6348 * [BUGFIX] Ring: Ensure network addresses used for component hash rings are formatted correctly when using IPv6. #6068 * [BUGFIX] Query-scheduler: don't retain connections from queriers that have shut down, leading to gradually increasing enqueue latency over time. #6100 #6145 diff --git a/integration/store_gateway_limits_hit_test.go b/integration/store_gateway_limits_hit_test.go index 545c930f6e6..6bc97f87991 100644 --- a/integration/store_gateway_limits_hit_test.go +++ b/integration/store_gateway_limits_hit_test.go @@ -18,7 +18,6 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/mimir/integration/e2emimir" - "github.com/grafana/mimir/pkg/storegateway" "github.com/grafana/mimir/pkg/util/globalerror" ) @@ -83,17 +82,17 @@ func Test_MaxSeriesAndChunksPerQueryLimitHit(t *testing.T) { additionalQuerierFlags map[string]string expectedErrorKey string }{ - "when store-gateway hits max_fetched_series_per_query, 'exceeded series limit' is returned": { + "when store-gateway hits max_fetched_series_per_query, 'err-mimir-max-series-per-query' is returned": { additionalStoreGatewayFlags: map[string]string{"-querier.max-fetched-series-per-query": "1"}, - expectedErrorKey: storegateway.ErrSeriesLimitMessage, + expectedErrorKey: string(globalerror.MaxSeriesPerQuery), }, "when querier hits max_fetched_series_per_query, 'err-mimir-max-series-per-query' is returned": { additionalQuerierFlags: map[string]string{"-querier.max-fetched-series-per-query": "1"}, expectedErrorKey: string(globalerror.MaxSeriesPerQuery), }, - "when store-gateway hits max_fetched_chunks_per_query, 'exceeded chunks limit' is returned": { + "when store-gateway hits max_fetched_chunks_per_query, 'err-mimir-max-chunks-per-query' is returned": { additionalStoreGatewayFlags: map[string]string{"-querier.max-fetched-chunks-per-query": "1"}, - expectedErrorKey: storegateway.ErrChunksLimitMessage, + expectedErrorKey: string(globalerror.MaxChunksPerQuery), }, "when querier hits max_fetched_chunks_per_query, 'err-mimir-max-chunks-per-query' is returned": { additionalQuerierFlags: map[string]string{"-querier.max-fetched-chunks-per-query": "1"}, diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 5f94b5df7ee..33e7f62ed0c 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -1418,7 +1418,7 @@ func blockLabelNames(ctx context.Context, indexr *bucketIndexReader, matchers [] if err != nil { return nil, errors.Wrap(err, "fetch series") } - seriesSetsIterator = newLimitingSeriesChunkRefsSetIterator(seriesSetsIterator, NewLimiter(0, nil), seriesLimiter) + seriesSetsIterator = newLimitingSeriesChunkRefsSetIterator(seriesSetsIterator, NewLimiter(0, nil, ""), seriesLimiter) seriesSet := newSeriesChunkRefsSeriesSet(seriesSetsIterator) // Extract label names from all series. Many label names will be the same, so we need to deduplicate them. labelNames := map[string]struct{}{} diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index 07a38f0a1ce..f6a413b63e6 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -11,7 +11,6 @@ import ( "net/http" "os" "path/filepath" - "strings" "testing" "time" @@ -623,12 +622,12 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { }, "should fail if the max chunks limit is exceeded - 422": { maxChunksLimit: expectedChunks - 1, - expectedErr: "exceeded chunks limit", + expectedErr: "the query exceeded the maximum number of chunks (limit: 11 chunks) (err-mimir-max-chunks-per-query)", expectedCode: http.StatusUnprocessableEntity, }, "should fail if the max series limit is exceeded - 422": { maxChunksLimit: expectedChunks, - expectedErr: "exceeded series limit", + expectedErr: "the query exceeded the maximum number of series (limit: 1 series) (err-mimir-max-series-per-query)", maxSeriesLimit: 1, expectedCode: http.StatusUnprocessableEntity, }, @@ -665,7 +664,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { assert.NoError(t, err) } else { assert.Error(t, err) - assert.True(t, strings.Contains(err.Error(), testData.expectedErr)) + assert.Contains(t, err.Error(), testData.expectedErr) status, ok := status.FromError(err) assert.Equal(t, true, ok) assert.Equal(t, testData.expectedCode, status.Code()) diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index e7be861ef12..b086405c9e8 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -268,7 +268,7 @@ func TestBlockLabelNames(t *testing.T) { slices.Sort(jFooLabelNames) slices.Sort(jNotFooLabelNames) - sl := NewLimiter(math.MaxUint64, promauto.With(nil).NewCounter(prometheus.CounterOpts{Name: "test"})) + sl := NewLimiter(math.MaxUint64, promauto.With(nil).NewCounter(prometheus.CounterOpts{Name: "test"}), "exceeded unlimited limit of %v") newTestBucketBlock := prepareTestBlock(test.NewTB(t), appendTestSeries(series)) t.Run("happy case with no matchers", func(t *testing.T) { @@ -2322,7 +2322,7 @@ func TestBucketStore_Series_Limits(t *testing.T) { "should fail if the number of unique series queried is greater than the configured series limit": { reqMatchers: []storepb.LabelMatcher{{Type: storepb.LabelMatcher_RE, Name: labels.MetricName, Value: "series_[123]"}}, seriesLimit: 1, - expectedErr: ErrSeriesLimitMessage, + expectedErr: "the query exceeded the maximum number of series (limit: 1 series) (err-mimir-max-series-per-query)", }, "should pass if the number of unique series queried is equal or less than the configured series limit": { reqMatchers: []storepb.LabelMatcher{{Type: storepb.LabelMatcher_RE, Name: labels.MetricName, Value: "series_[123]"}}, @@ -2332,7 +2332,7 @@ func TestBucketStore_Series_Limits(t *testing.T) { "should fail if the number of chunks queried is greater than the configured chunks limit": { reqMatchers: []storepb.LabelMatcher{{Type: storepb.LabelMatcher_RE, Name: labels.MetricName, Value: "series_[123]"}}, chunksLimit: 3, - expectedErr: ErrChunksLimitMessage, + expectedErr: "the query exceeded the maximum number of chunks (limit: 3 chunks) (err-mimir-max-chunks-per-query)", }, "should pass if the number of chunks queried is equal or less than the configured chunks limit": { reqMatchers: []storepb.LabelMatcher{{Type: storepb.LabelMatcher_RE, Name: labels.MetricName, Value: "series_[123]"}}, diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index b333e2fb9f9..1678a4a25e7 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -52,6 +52,7 @@ import ( mimir_testutil "github.com/grafana/mimir/pkg/storage/tsdb/testutil" "github.com/grafana/mimir/pkg/storegateway/storepb" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/limiter" "github.com/grafana/mimir/pkg/util/test" "github.com/grafana/mimir/pkg/util/validation" ) @@ -1424,7 +1425,7 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi }, "should return error if the actual number of queried chunks is > limit": { limit: chunksQueried - 1, - expectedErr: status.Error(http.StatusUnprocessableEntity, fmt.Sprintf("exceeded chunks limit: rpc error: code = Code(422) desc = limit %d exceeded", chunksQueried-1)), + expectedErr: status.Error(http.StatusUnprocessableEntity, "rpc error: code = Code(422) desc = "+fmt.Sprintf(limiter.MaxChunksPerQueryLimitMsgFormat, chunksQueried-1)), }, } @@ -1488,7 +1489,7 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi assert.True(t, ok) s2, ok := status.FromError(errors.Cause(testData.expectedErr)) assert.True(t, ok) - assert.True(t, strings.Contains(s1.Message(), s2.Message())) + assert.Contains(t, s1.Message(), s2.Message()) assert.Equal(t, s1.Code(), s2.Code()) } else { require.NoError(t, err) diff --git a/pkg/storegateway/limiter.go b/pkg/storegateway/limiter.go index 116ac9bfd60..2b5da3fcb18 100644 --- a/pkg/storegateway/limiter.go +++ b/pkg/storegateway/limiter.go @@ -12,6 +12,8 @@ import ( "github.com/grafana/dskit/httpgrpc" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" + + "github.com/grafana/mimir/pkg/util/limiter" ) type ChunksLimiter interface { @@ -37,8 +39,9 @@ type SeriesLimiterFactory func(failedCounter prometheus.Counter) SeriesLimiter // Limiter is a simple mechanism for checking if something has passed a certain threshold. type Limiter struct { - limit uint64 - reserved atomic.Uint64 + limit uint64 + reserved atomic.Uint64 + errorMessageFormat string // Counter metric which we will increase if limit is exceeded. failedCounter prometheus.Counter @@ -46,8 +49,8 @@ type Limiter struct { } // NewLimiter returns a new limiter with a specified limit. 0 disables the limit. -func NewLimiter(limit uint64, ctr prometheus.Counter) *Limiter { - return &Limiter{limit: limit, failedCounter: ctr} +func NewLimiter(limit uint64, ctr prometheus.Counter, errorMessageFormat string) *Limiter { + return &Limiter{limit: limit, failedCounter: ctr, errorMessageFormat: errorMessageFormat} } // Reserve implements ChunksLimiter. @@ -59,7 +62,7 @@ func (l *Limiter) Reserve(num uint64) error { // We need to protect from the counter being incremented twice due to concurrency // while calling Reserve(). l.failedOnce.Do(l.failedCounter.Inc) - return httpgrpc.Errorf(http.StatusUnprocessableEntity, "limit %v exceeded", l.limit) + return httpgrpc.Errorf(http.StatusUnprocessableEntity, l.errorMessageFormat, l.limit) } return nil } @@ -67,13 +70,13 @@ func (l *Limiter) Reserve(num uint64) error { // NewChunksLimiterFactory makes a new ChunksLimiterFactory with a dynamic limit. func NewChunksLimiterFactory(limitsExtractor func() uint64) ChunksLimiterFactory { return func(failedCounter prometheus.Counter) ChunksLimiter { - return NewLimiter(limitsExtractor(), failedCounter) + return NewLimiter(limitsExtractor(), failedCounter, limiter.MaxChunksPerQueryLimitMsgFormat) } } // NewSeriesLimiterFactory makes a new NewSeriesLimiterFactory with a dynamic limit. func NewSeriesLimiterFactory(limitsExtractor func() uint64) SeriesLimiterFactory { return func(failedCounter prometheus.Counter) SeriesLimiter { - return NewLimiter(limitsExtractor(), failedCounter) + return NewLimiter(limitsExtractor(), failedCounter, limiter.MaxSeriesHitMsgFormat) } } diff --git a/pkg/storegateway/limiter_test.go b/pkg/storegateway/limiter_test.go index 12e999cf486..348a965ba02 100644 --- a/pkg/storegateway/limiter_test.go +++ b/pkg/storegateway/limiter_test.go @@ -18,7 +18,7 @@ import ( func TestLimiter(t *testing.T) { c := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) - l := NewLimiter(10, c) + l := NewLimiter(10, c, "limit of %v exceeded") assert.NoError(t, l.Reserve(5)) assert.Equal(t, float64(0), prom_testutil.ToFloat64(c)) @@ -27,12 +27,12 @@ func TestLimiter(t *testing.T) { assert.Equal(t, float64(0), prom_testutil.ToFloat64(c)) err := l.Reserve(1) - assert.Error(t, err) + assert.ErrorContains(t, err, "limit of 10 exceeded") assert.Equal(t, float64(1), prom_testutil.ToFloat64(c)) checkErrorStatusCode(t, err) err = l.Reserve(2) - assert.Error(t, err) + assert.ErrorContains(t, err, "limit of 10 exceeded") assert.Equal(t, float64(1), prom_testutil.ToFloat64(c)) checkErrorStatusCode(t, err) } @@ -45,14 +45,14 @@ func checkErrorStatusCode(t *testing.T, err error) { // newStaticChunksLimiterFactory makes a new ChunksLimiterFactory with a static limit. func newStaticChunksLimiterFactory(limit uint64) ChunksLimiterFactory { - return func(failedCounter prometheus.Counter) ChunksLimiter { - return NewLimiter(limit, failedCounter) - } + return NewChunksLimiterFactory(func() uint64 { + return limit + }) } // newStaticSeriesLimiterFactory makes a new ChunksLimiterFactory with a static limit. func newStaticSeriesLimiterFactory(limit uint64) SeriesLimiterFactory { - return func(failedCounter prometheus.Counter) SeriesLimiter { - return NewLimiter(limit, failedCounter) - } + return NewSeriesLimiterFactory(func() uint64 { + return limit + }) } diff --git a/pkg/storegateway/series_refs.go b/pkg/storegateway/series_refs.go index 4711b203445..128bd1ccb70 100644 --- a/pkg/storegateway/series_refs.go +++ b/pkg/storegateway/series_refs.go @@ -51,11 +51,6 @@ var ( }) ) -const ( - ErrSeriesLimitMessage = "exceeded series limit" - ErrChunksLimitMessage = "exceeded chunks limit" -) - // seriesChunkRefsSetIterator is the interface implemented by an iterator returning a sequence of seriesChunkRefsSet. type seriesChunkRefsSetIterator interface { Next() bool @@ -649,7 +644,7 @@ func (l *limitingSeriesChunkRefsSetIterator) Next() bool { l.currentBatch = l.from.At() err := l.seriesLimiter.Reserve(uint64(l.currentBatch.len())) if err != nil { - l.err = errors.Wrap(err, ErrSeriesLimitMessage) + l.err = err return false } @@ -660,7 +655,7 @@ func (l *limitingSeriesChunkRefsSetIterator) Next() bool { err = l.chunksLimiter.Reserve(uint64(totalChunks)) if err != nil { - l.err = errors.Wrap(err, ErrChunksLimitMessage) + l.err = err return false } return true diff --git a/pkg/storegateway/series_refs_test.go b/pkg/storegateway/series_refs_test.go index fa751e17f87..6be6934f018 100644 --- a/pkg/storegateway/series_refs_test.go +++ b/pkg/storegateway/series_refs_test.go @@ -1048,8 +1048,8 @@ func TestLimitingSeriesChunkRefsSetIterator(t *testing.T) { t.Run(testName, func(t *testing.T) { iterator := newLimitingSeriesChunkRefsSetIterator( newSliceSeriesChunkRefsSetIterator(testCase.upstreamErr, testCase.sets...), - &staticLimiter{limit: testCase.chunksLimit}, - &staticLimiter{limit: testCase.seriesLimit}, + &staticLimiter{limit: testCase.chunksLimit, msg: "exceeded chunks limit"}, + &staticLimiter{limit: testCase.seriesLimit, msg: "exceeded series limit"}, ) sets := readAllSeriesChunkRefsSet(iterator) @@ -2362,12 +2362,13 @@ func (s *sliceSeriesChunkRefsSetIterator) Err() error { type staticLimiter struct { limit int + msg string current atomic.Uint64 } func (l *staticLimiter) Reserve(num uint64) error { if l.current.Add(num) > uint64(l.limit) { - return errors.New("test limit exceeded") + return errors.New(l.msg) } return nil }