Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store-gateway: add timeout to query gate wait #7777

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* [ENHANCEMENT] Rules: Add metric `cortex_prometheus_rule_group_last_restore_duration_seconds` which measures how long it takes to restore rule groups using the `ALERTS_FOR_STATE` series #7974
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #8012
* [ENHANCEMENT] Querying: Remove OpEmptyMatch from regex concatenations. #8012
* [ENHANCEMENT] Store-gateway: add `-blocks-storage.bucket-store.max-concurrent-queue-timeout`. When set, queries at the store-gateway's query gate will not wait longer than that to execute. If a query reaches the wait timeout, then the querier will retry the blocks on a different store-gateway. If all store-gateways are unavailable, then the query will fail with `err-mimir-store-consistency-check-failed`. #7777
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
* [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6502,6 +6502,17 @@
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "max_concurrent_queue_timeout",
"required": false,
"desc": "Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retried on another store-gateway. 0 means no timeout and all queries will wait indefinitely for their turn.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "blocks-storage.bucket-store.max-concurrent-queue-timeout",
"fieldType": "duration",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "tenant_sync_concurrency",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ Usage of ./cmd/mimir/mimir:
If true, verify the checksum of index headers upon loading them (either on startup or lazily when lazy loading is enabled). Setting to true helps detect disk corruption at the cost of slowing down index header loading.
-blocks-storage.bucket-store.max-concurrent int
Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants. (default 100)
-blocks-storage.bucket-store.max-concurrent-queue-timeout duration
Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retried on another store-gateway. 0 means no timeout and all queries will wait indefinitely for their turn.
-blocks-storage.bucket-store.meta-sync-concurrency int
Number of Go routines to use when syncing block meta files from object storage per tenant. (default 20)
-blocks-storage.bucket-store.metadata-cache.backend string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3526,6 +3526,13 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.max-concurrent
[max_concurrent: <int> | default = 100]

# (advanced) Timeout for the queue of queries waiting for execution. If the
# queue is full and the timeout is reached, the query will be retried on
# another store-gateway. 0 means no timeout and all queries will wait
# indefinitely for their turn.
# CLI flag: -blocks-storage.bucket-store.max-concurrent-queue-timeout
[max_concurrent_queue_timeout: <duration> | default = 0s]

# (advanced) Maximum number of concurrent tenants synching blocks.
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
[tenant_sync_concurrency: <int> | default = 1]
Expand Down
73 changes: 73 additions & 0 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

Expand All @@ -54,6 +55,7 @@ import (
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/globalerror"
"github.com/grafana/mimir/pkg/util/limiter"
"github.com/grafana/mimir/pkg/util/test"
)
Expand Down Expand Up @@ -625,6 +627,49 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
`
},
},
"a single store-gateway instance returns no response implies querying another instance for the same blocks (consistency check passed)": {
finderResult: bucketindex.Blocks{
{ID: block1},
},
storeSetResponses: []interface{}{
// First attempt returns a client whose response does not include all expected blocks.
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "1.1.1.1", mockedSeriesResponses: nil,
}: {block1},
},
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{mockHintsResponse(block1)},
}: {block1},
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: noOpQueryLimiter,
},
"two store-gateway instances returning no response causes consistency check to fail": {
finderResult: bucketindex.Blocks{
{ID: block1},
},
storeSetResponses: []interface{}{
// First attempt returns a client whose response does not include all expected blocks.
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "1.1.1.1", mockedSeriesResponses: nil,
}: {block1},
},
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "2.2.2.2", mockedSeriesResponses: nil,
}: {block1},
},
// Third attempt returns an error because there are no other store-gateways left.
errors.New("no store-gateway remaining after exclude"),
},
limits: &blocksStoreLimitsMock{},
queryLimiter: noOpQueryLimiter,
expectedErr: newStoreConsistencyCheckFailedError([]ulid.ULID{block1}),
},
"a single store-gateway instance has some missing blocks (consistency check failed)": {
finderResult: bucketindex.Blocks{
{ID: block1},
Expand Down Expand Up @@ -3243,6 +3288,34 @@ func TestShouldStopQueryFunc(t *testing.T) {
err: errors.New("test"),
expected: false,
},
"should not stop query on store-gateway instance limit": {
err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Aborted, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}),
expected: false,
},
"should not stop query on store-gateway instance limit; shouldn't look at the gRPC code, only Mimir error cause": {
err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Internal, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}),
expected: false,
},
"should not stop query on any other mimirpb error": {
err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Internal, &mimirpb.ErrorDetails{Cause: mimirpb.TOO_BUSY}),
expected: false,
},
"should not stop query on any unknown error detail": {
err: func() error {
st, createErr := status.New(codes.Internal, "test").WithDetails(&hintspb.Block{Id: "123"})
require.NoError(t, createErr)
return st.Err()
}(),
expected: false,
},
"should not stop query on multiple error details": {
err: func() error {
st, createErr := status.New(codes.Internal, "test").WithDetails(&hintspb.Block{Id: "123"}, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT})
require.NoError(t, createErr)
return st.Err()
}(),
expected: false,
},
Comment on lines +3291 to +3318
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't change the implementation, because that was the default behaviour anyways. But decided to add tests because the current behaviour is implicit rather than explicit.

}

for testName, testData := range tests {
Expand Down
26 changes: 14 additions & 12 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,18 +386,19 @@ func (cfg *TSDBConfig) IsBlocksShippingEnabled() bool {

// BucketStoreConfig holds the config information for Bucket Stores used by the querier and store-gateway.
type BucketStoreConfig struct {
SyncDir string `yaml:"sync_dir"`
SyncInterval time.Duration `yaml:"sync_interval" category:"advanced"`
MaxConcurrent int `yaml:"max_concurrent" category:"advanced"`
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency" category:"advanced"`
BlockSyncConcurrency int `yaml:"block_sync_concurrency" category:"advanced"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency" category:"advanced"`
IndexCache IndexCacheConfig `yaml:"index_cache"`
ChunksCache ChunksCacheConfig `yaml:"chunks_cache"`
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay" category:"advanced"`
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within" category:"advanced"`
SyncDir string `yaml:"sync_dir"`
SyncInterval time.Duration `yaml:"sync_interval" category:"advanced"`
MaxConcurrent int `yaml:"max_concurrent" category:"advanced"`
MaxConcurrentQueueTimeout time.Duration `yaml:"max_concurrent_queue_timeout" category:"advanced"`
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency" category:"advanced"`
BlockSyncConcurrency int `yaml:"block_sync_concurrency" category:"advanced"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency" category:"advanced"`
IndexCache IndexCacheConfig `yaml:"index_cache"`
ChunksCache ChunksCacheConfig `yaml:"chunks_cache"`
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay" category:"advanced"`
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within" category:"advanced"`

// Series hash cache.
SeriesHashCacheMaxBytes uint64 `yaml:"series_hash_cache_max_size_bytes" category:"advanced"`
Expand Down Expand Up @@ -448,6 +449,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.SyncInterval, "blocks-storage.bucket-store.sync-interval", 15*time.Minute, "How frequently to scan the bucket, or to refresh the bucket index (if enabled), in order to look for changes (new blocks shipped by ingesters and blocks deleted by retention or compaction).")
f.Uint64Var(&cfg.SeriesHashCacheMaxBytes, "blocks-storage.bucket-store.series-hash-cache-max-size-bytes", uint64(1*units.Gibibyte), "Max size - in bytes - of the in-memory series hash cache. The cache is shared across all tenants and it's used only when query sharding is enabled.")
f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.")
f.DurationVar(&cfg.MaxConcurrentQueueTimeout, "blocks-storage.bucket-store.max-concurrent-queue-timeout", 0, "Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retried on another store-gateway. 0 means no timeout and all queries will wait indefinitely for their turn.")
f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 1, "Maximum number of concurrent tenants synching blocks.")
f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 4, "Maximum number of concurrent blocks synching per tenant.")
f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.")
Expand Down
39 changes: 27 additions & 12 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/globalerror"
"github.com/grafana/mimir/pkg/util/pool"
"github.com/grafana/mimir/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -546,18 +547,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
// We don't do the streaming call if we are not requesting the chunks.
req.StreamingChunksBatchSize = 0
}
defer func() {
if err == nil {
return
}
code := codes.Internal
if st, ok := grpcutil.ErrorToStatus(err); ok {
code = st.Code()
} else if errors.Is(err, context.Canceled) {
code = codes.Canceled
}
err = status.Error(code, err.Error())
}()
defer func() { err = mapSeriesError(err) }()

matchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
if err != nil {
Expand Down Expand Up @@ -709,6 +699,31 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
return nil
}

func mapSeriesError(err error) error {
if err == nil {
return err
}

var stGwErr storeGatewayError
switch {
case errors.As(err, &stGwErr):
switch cause := stGwErr.errorCause(); cause {
case mimirpb.INSTANCE_LIMIT:
return globalerror.NewErrorWithGRPCStatus(stGwErr, codes.Unavailable, &mimirpb.ErrorDetails{Cause: cause})
default:
return globalerror.NewErrorWithGRPCStatus(stGwErr, codes.Internal, &mimirpb.ErrorDetails{Cause: cause})
}
default:
code := codes.Internal
if st, ok := grpcutil.ErrorToStatus(err); ok {
code = st.Code()
} else if errors.Is(err, context.Canceled) {
code = codes.Canceled
}
return status.Error(code, err.Error())
}
}

func (s *BucketStore) recordRequestAmbientTime(stats *safeQueryStats, requestStart time.Time) {
stats.update(func(stats *queryStats) {
stats.streamingSeriesAmbientTime += time.Since(requestStart)
Expand Down
12 changes: 8 additions & 4 deletions pkg/storegateway/bucket_store_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest
)

// Create a gRPC connection to the server.
conn, err = grpc.Dial(s.serverListener.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*1024)),
)
conn, err = s.dialConn()
if err != nil {
return
}
Expand Down Expand Up @@ -258,6 +255,13 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest
return
}

func (s *storeTestServer) dialConn() (*grpc.ClientConn, error) {
return grpc.Dial(s.serverListener.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*1024)),
)
}

// Close releases all resources.
func (s *storeTestServer) Close() {
s.server.GracefulStop()
Expand Down
36 changes: 36 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/thanos-io/objstore"
"google.golang.org/grpc/metadata"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
Expand Down Expand Up @@ -106,6 +107,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
queryGateReg := prometheus.WrapRegistererWith(prometheus.Labels{"gate": "query"}, gateReg)
queryGate := gate.NewBlocking(cfg.BucketStore.MaxConcurrent)
queryGate = gate.NewInstrumented(queryGateReg, cfg.BucketStore.MaxConcurrent, queryGate)
queryGate = timeoutGate{delegate: queryGate, timeout: cfg.BucketStore.MaxConcurrentQueueTimeout}

// The number of concurrent index header loads from storegateway are limited.
lazyLoadingGateReg := prometheus.WrapRegistererWith(prometheus.Labels{"gate": "index_header"}, gateReg)
Expand Down Expand Up @@ -420,6 +422,40 @@ func (u *BucketStores) syncDirForUser(userID string) string {
return filepath.Join(u.cfg.BucketStore.SyncDir, userID)
}

// timeoutGate returns errGateTimeout when the timeout is reached while still waiting for the delegate gate.
// timeoutGate belongs better in dskit. However, at the time of writing dskit supports go 1.20.
// go 1.20 doesn't have context.WithTimeoutCause yet,
// so we choose to implement timeoutGate here instead of implementing context.WithTimeoutCause ourselves in dskit.
// It also allows to keep the span logger in timeoutGate as opposed to in the bucket store.
type timeoutGate struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Looks something we could have done in dskit. Non blocking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i also realized this after submitting the PR. But dskit doesn't have context.WithTimeoutCause and implementing that isn't as trivial as I expected. Let's keep it here as the scope is very small anyways. Someone can move it to dskit when it's updated. I left a comment on timeoutGate 7bf6b24

delegate gate.Gate
timeout time.Duration
}

var errGateTimeout = staticError{cause: mimirpb.INSTANCE_LIMIT, msg: "timeout waiting for concurrency gate"}

func (t timeoutGate) Start(ctx context.Context) error {
if t.timeout == 0 {
return t.delegate.Start(ctx)
}

// Inject our own error so that we can differentiate between a timeout caused by this gate
// or a timeout in the original request timeout.
ctx, cancel := context.WithTimeoutCause(ctx, t.timeout, errGateTimeout)
defer cancel()

err := t.delegate.Start(ctx)
if errors.Is(context.Cause(ctx), errGateTimeout) {
_ = spanlogger.FromContext(ctx, log.NewNopLogger()).Error(err)
err = errGateTimeout
}
return err
}

func (t timeoutGate) Done() {
t.delegate.Done()
}

func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
// Check if the store already exists.
bs := u.getStore(userID)
Expand Down
Loading
Loading