Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make chunk and series limit error messages returned by store-gateways consistent with those returned by queriers #6347

Merged
merged 3 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [ENHANCEMENT] All: improved IPv6 support by using the proper host:port formatting. #6311
* [ENHANCEMENT] Querier: always return error encountered during chunks streaming, rather than `the stream has already been exhausted`. #6345
* [ENHANCEMENT] Query-frontend: add `instance_enable_ipv6` to support IPv6. #6111
* [ENHANCEMENT] Store-gateway: return same detailed error messages as queriers when chunks or series limits are reached. #6347
* [ENHANCEMENT] Querier: reduce memory consumed for queries that hit store-gateways. #6348
* [BUGFIX] Ring: Ensure network addresses used for component hash rings are formatted correctly when using IPv6. #6068
* [BUGFIX] Query-scheduler: don't retain connections from queriers that have shut down, leading to gradually increasing enqueue latency over time. #6100 #6145
Expand Down
9 changes: 4 additions & 5 deletions integration/store_gateway_limits_hit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/integration/e2emimir"
"github.com/grafana/mimir/pkg/storegateway"
"github.com/grafana/mimir/pkg/util/globalerror"
)

Expand Down Expand Up @@ -83,17 +82,17 @@ func Test_MaxSeriesAndChunksPerQueryLimitHit(t *testing.T) {
additionalQuerierFlags map[string]string
expectedErrorKey string
}{
"when store-gateway hits max_fetched_series_per_query, 'exceeded series limit' is returned": {
"when store-gateway hits max_fetched_series_per_query, 'err-mimir-max-series-per-query' is returned": {
additionalStoreGatewayFlags: map[string]string{"-querier.max-fetched-series-per-query": "1"},
expectedErrorKey: storegateway.ErrSeriesLimitMessage,
expectedErrorKey: string(globalerror.MaxSeriesPerQuery),
},
"when querier hits max_fetched_series_per_query, 'err-mimir-max-series-per-query' is returned": {
additionalQuerierFlags: map[string]string{"-querier.max-fetched-series-per-query": "1"},
expectedErrorKey: string(globalerror.MaxSeriesPerQuery),
},
"when store-gateway hits max_fetched_chunks_per_query, 'exceeded chunks limit' is returned": {
"when store-gateway hits max_fetched_chunks_per_query, 'err-mimir-max-chunks-per-query' is returned": {
additionalStoreGatewayFlags: map[string]string{"-querier.max-fetched-chunks-per-query": "1"},
expectedErrorKey: storegateway.ErrChunksLimitMessage,
expectedErrorKey: string(globalerror.MaxChunksPerQuery),
},
"when querier hits max_fetched_chunks_per_query, 'err-mimir-max-chunks-per-query' is returned": {
additionalQuerierFlags: map[string]string{"-querier.max-fetched-chunks-per-query": "1"},
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,7 @@ func blockLabelNames(ctx context.Context, indexr *bucketIndexReader, matchers []
if err != nil {
return nil, errors.Wrap(err, "fetch series")
}
seriesSetsIterator = newLimitingSeriesChunkRefsSetIterator(seriesSetsIterator, NewLimiter(0, nil), seriesLimiter)
seriesSetsIterator = newLimitingSeriesChunkRefsSetIterator(seriesSetsIterator, NewLimiter(0, nil, ""), seriesLimiter)
seriesSet := newSeriesChunkRefsSeriesSet(seriesSetsIterator)
// Extract label names from all series. Many label names will be the same, so we need to deduplicate them.
labelNames := map[string]struct{}{}
Expand Down
7 changes: 3 additions & 4 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net/http"
"os"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -623,12 +622,12 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
},
"should fail if the max chunks limit is exceeded - 422": {
maxChunksLimit: expectedChunks - 1,
expectedErr: "exceeded chunks limit",
expectedErr: "the query exceeded the maximum number of chunks (limit: 11 chunks) (err-mimir-max-chunks-per-query)",
expectedCode: http.StatusUnprocessableEntity,
},
"should fail if the max series limit is exceeded - 422": {
maxChunksLimit: expectedChunks,
expectedErr: "exceeded series limit",
expectedErr: "the query exceeded the maximum number of series (limit: 1 series) (err-mimir-max-series-per-query)",
maxSeriesLimit: 1,
expectedCode: http.StatusUnprocessableEntity,
},
Expand Down Expand Up @@ -665,7 +664,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
assert.NoError(t, err)
} else {
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), testData.expectedErr))
assert.Contains(t, err.Error(), testData.expectedErr)
status, ok := status.FromError(err)
assert.Equal(t, true, ok)
assert.Equal(t, testData.expectedCode, status.Code())
Expand Down
6 changes: 3 additions & 3 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func TestBlockLabelNames(t *testing.T) {
slices.Sort(jFooLabelNames)
slices.Sort(jNotFooLabelNames)

sl := NewLimiter(math.MaxUint64, promauto.With(nil).NewCounter(prometheus.CounterOpts{Name: "test"}))
sl := NewLimiter(math.MaxUint64, promauto.With(nil).NewCounter(prometheus.CounterOpts{Name: "test"}), "exceeded unlimited limit of %v")
newTestBucketBlock := prepareTestBlock(test.NewTB(t), appendTestSeries(series))

t.Run("happy case with no matchers", func(t *testing.T) {
Expand Down Expand Up @@ -2322,7 +2322,7 @@ func TestBucketStore_Series_Limits(t *testing.T) {
"should fail if the number of unique series queried is greater than the configured series limit": {
reqMatchers: []storepb.LabelMatcher{{Type: storepb.LabelMatcher_RE, Name: labels.MetricName, Value: "series_[123]"}},
seriesLimit: 1,
expectedErr: ErrSeriesLimitMessage,
expectedErr: "the query exceeded the maximum number of series (limit: 1 series) (err-mimir-max-series-per-query)",
},
"should pass if the number of unique series queried is equal or less than the configured series limit": {
reqMatchers: []storepb.LabelMatcher{{Type: storepb.LabelMatcher_RE, Name: labels.MetricName, Value: "series_[123]"}},
Expand All @@ -2332,7 +2332,7 @@ func TestBucketStore_Series_Limits(t *testing.T) {
"should fail if the number of chunks queried is greater than the configured chunks limit": {
reqMatchers: []storepb.LabelMatcher{{Type: storepb.LabelMatcher_RE, Name: labels.MetricName, Value: "series_[123]"}},
chunksLimit: 3,
expectedErr: ErrChunksLimitMessage,
expectedErr: "the query exceeded the maximum number of chunks (limit: 3 chunks) (err-mimir-max-chunks-per-query)",
},
"should pass if the number of chunks queried is equal or less than the configured chunks limit": {
reqMatchers: []storepb.LabelMatcher{{Type: storepb.LabelMatcher_RE, Name: labels.MetricName, Value: "series_[123]"}},
Expand Down
5 changes: 3 additions & 2 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
mimir_testutil "github.com/grafana/mimir/pkg/storage/tsdb/testutil"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/limiter"
"github.com/grafana/mimir/pkg/util/test"
"github.com/grafana/mimir/pkg/util/validation"
)
Expand Down Expand Up @@ -1424,7 +1425,7 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi
},
"should return error if the actual number of queried chunks is > limit": {
limit: chunksQueried - 1,
expectedErr: status.Error(http.StatusUnprocessableEntity, fmt.Sprintf("exceeded chunks limit: rpc error: code = Code(422) desc = limit %d exceeded", chunksQueried-1)),
expectedErr: status.Error(http.StatusUnprocessableEntity, "rpc error: code = Code(422) desc = "+fmt.Sprintf(limiter.MaxChunksPerQueryLimitMsgFormat, chunksQueried-1)),
},
}

Expand Down Expand Up @@ -1488,7 +1489,7 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi
assert.True(t, ok)
s2, ok := status.FromError(errors.Cause(testData.expectedErr))
assert.True(t, ok)
assert.True(t, strings.Contains(s1.Message(), s2.Message()))
assert.Contains(t, s1.Message(), s2.Message())
assert.Equal(t, s1.Code(), s2.Code())
} else {
require.NoError(t, err)
Expand Down
17 changes: 10 additions & 7 deletions pkg/storegateway/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/grafana/dskit/httpgrpc"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"

"github.com/grafana/mimir/pkg/util/limiter"
)

type ChunksLimiter interface {
Expand All @@ -37,17 +39,18 @@ type SeriesLimiterFactory func(failedCounter prometheus.Counter) SeriesLimiter

// Limiter is a simple mechanism for checking if something has passed a certain threshold.
type Limiter struct {
limit uint64
reserved atomic.Uint64
limit uint64
reserved atomic.Uint64
errorMessageFormat string

// Counter metric which we will increase if limit is exceeded.
failedCounter prometheus.Counter
failedOnce sync.Once
}

// NewLimiter returns a new limiter with a specified limit. 0 disables the limit.
func NewLimiter(limit uint64, ctr prometheus.Counter) *Limiter {
return &Limiter{limit: limit, failedCounter: ctr}
func NewLimiter(limit uint64, ctr prometheus.Counter, errorMessageFormat string) *Limiter {
return &Limiter{limit: limit, failedCounter: ctr, errorMessageFormat: errorMessageFormat}
}

// Reserve implements ChunksLimiter.
Expand All @@ -59,21 +62,21 @@ func (l *Limiter) Reserve(num uint64) error {
// We need to protect from the counter being incremented twice due to concurrency
// while calling Reserve().
l.failedOnce.Do(l.failedCounter.Inc)
return httpgrpc.Errorf(http.StatusUnprocessableEntity, "limit %v exceeded", l.limit)
return httpgrpc.Errorf(http.StatusUnprocessableEntity, l.errorMessageFormat, l.limit)
}
return nil
}

// NewChunksLimiterFactory makes a new ChunksLimiterFactory with a dynamic limit.
func NewChunksLimiterFactory(limitsExtractor func() uint64) ChunksLimiterFactory {
return func(failedCounter prometheus.Counter) ChunksLimiter {
return NewLimiter(limitsExtractor(), failedCounter)
return NewLimiter(limitsExtractor(), failedCounter, limiter.MaxChunksPerQueryLimitMsgFormat)
}
}

// NewSeriesLimiterFactory makes a new NewSeriesLimiterFactory with a dynamic limit.
func NewSeriesLimiterFactory(limitsExtractor func() uint64) SeriesLimiterFactory {
return func(failedCounter prometheus.Counter) SeriesLimiter {
return NewLimiter(limitsExtractor(), failedCounter)
return NewLimiter(limitsExtractor(), failedCounter, limiter.MaxSeriesHitMsgFormat)
}
}
18 changes: 9 additions & 9 deletions pkg/storegateway/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

func TestLimiter(t *testing.T) {
c := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
l := NewLimiter(10, c)
l := NewLimiter(10, c, "limit of %v exceeded")

assert.NoError(t, l.Reserve(5))
assert.Equal(t, float64(0), prom_testutil.ToFloat64(c))
Expand All @@ -27,12 +27,12 @@ func TestLimiter(t *testing.T) {
assert.Equal(t, float64(0), prom_testutil.ToFloat64(c))

err := l.Reserve(1)
assert.Error(t, err)
assert.ErrorContains(t, err, "limit of 10 exceeded")
assert.Equal(t, float64(1), prom_testutil.ToFloat64(c))
checkErrorStatusCode(t, err)

err = l.Reserve(2)
assert.Error(t, err)
assert.ErrorContains(t, err, "limit of 10 exceeded")
assert.Equal(t, float64(1), prom_testutil.ToFloat64(c))
checkErrorStatusCode(t, err)
}
Expand All @@ -45,14 +45,14 @@ func checkErrorStatusCode(t *testing.T, err error) {

// newStaticChunksLimiterFactory makes a new ChunksLimiterFactory with a static limit.
func newStaticChunksLimiterFactory(limit uint64) ChunksLimiterFactory {
return func(failedCounter prometheus.Counter) ChunksLimiter {
return NewLimiter(limit, failedCounter)
}
return NewChunksLimiterFactory(func() uint64 {
return limit
})
}

// newStaticSeriesLimiterFactory makes a new ChunksLimiterFactory with a static limit.
func newStaticSeriesLimiterFactory(limit uint64) SeriesLimiterFactory {
return func(failedCounter prometheus.Counter) SeriesLimiter {
return NewLimiter(limit, failedCounter)
}
return NewSeriesLimiterFactory(func() uint64 {
return limit
})
}
9 changes: 2 additions & 7 deletions pkg/storegateway/series_refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ var (
})
)

const (
ErrSeriesLimitMessage = "exceeded series limit"
ErrChunksLimitMessage = "exceeded chunks limit"
)

// seriesChunkRefsSetIterator is the interface implemented by an iterator returning a sequence of seriesChunkRefsSet.
type seriesChunkRefsSetIterator interface {
Next() bool
Expand Down Expand Up @@ -649,7 +644,7 @@ func (l *limitingSeriesChunkRefsSetIterator) Next() bool {
l.currentBatch = l.from.At()
err := l.seriesLimiter.Reserve(uint64(l.currentBatch.len()))
if err != nil {
l.err = errors.Wrap(err, ErrSeriesLimitMessage)
l.err = err
return false
}

Expand All @@ -660,7 +655,7 @@ func (l *limitingSeriesChunkRefsSetIterator) Next() bool {

err = l.chunksLimiter.Reserve(uint64(totalChunks))
if err != nil {
l.err = errors.Wrap(err, ErrChunksLimitMessage)
l.err = err
return false
}
return true
Expand Down
7 changes: 4 additions & 3 deletions pkg/storegateway/series_refs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,8 +1048,8 @@ func TestLimitingSeriesChunkRefsSetIterator(t *testing.T) {
t.Run(testName, func(t *testing.T) {
iterator := newLimitingSeriesChunkRefsSetIterator(
newSliceSeriesChunkRefsSetIterator(testCase.upstreamErr, testCase.sets...),
&staticLimiter{limit: testCase.chunksLimit},
&staticLimiter{limit: testCase.seriesLimit},
&staticLimiter{limit: testCase.chunksLimit, msg: "exceeded chunks limit"},
&staticLimiter{limit: testCase.seriesLimit, msg: "exceeded series limit"},
)

sets := readAllSeriesChunkRefsSet(iterator)
Expand Down Expand Up @@ -2362,12 +2362,13 @@ func (s *sliceSeriesChunkRefsSetIterator) Err() error {

type staticLimiter struct {
limit int
msg string
current atomic.Uint64
}

func (l *staticLimiter) Reserve(num uint64) error {
if l.current.Add(num) > uint64(l.limit) {
return errors.New("test limit exceeded")
return errors.New(l.msg)
}
return nil
}
Expand Down
Loading