Skip to content

Commit

Permalink
store-gateway: add timeout to query gate wait (#7777)
Browse files Browse the repository at this point in the history
* store-gateway: add timeout to query gate wait

This PR adds a timeout to query gate waiting. The store-gateway returns an empty response. The querier treats the empty response as if the blocks weren't discovered by the store-gateway and tries them again on another store-gateway.

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add CHANGELOG.md entry

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Update pkg/storage/tsdb/config.go

Co-authored-by: Charles Korn <charleskorn@users.noreply.github.com>

* Update CHANGELOG.md

Co-authored-by: Charles Korn <charleskorn@users.noreply.github.com>

* Remote TODO

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add a spanLog to timeoutGate

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add comment on why timeoutGate is in Mimir

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Update docs

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Propagate errors from store-gateway to querier when hitting concurrency limit

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Fix formatting

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Update autogenerated config

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add license header

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add tests for timeoutGate

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Report timeoutGate timeouts as rejected_deadline_exceeded not rejected_other

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Properly detect custom errors in timeoutGate

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

---------

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Co-authored-by: Charles Korn <charleskorn@users.noreply.github.com>
  • Loading branch information
dimitarvdimitrov and charleskorn committed May 3, 2024
1 parent 6d1a45c commit 61a1c8b
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* [ENHANCEMENT] Rules: Add metric `cortex_prometheus_rule_group_last_restore_duration_seconds` which measures how long it takes to restore rule groups using the `ALERTS_FOR_STATE` series #7974
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #8012
* [ENHANCEMENT] Querying: Remove OpEmptyMatch from regex concatenations. #8012
* [ENHANCEMENT] Store-gateway: add `-blocks-storage.bucket-store.max-concurrent-queue-timeout`. When set, queries at the store-gateway's query gate will not wait longer than that to execute. If a query reaches the wait timeout, then the querier will retry the blocks on a different store-gateway. If all store-gateways are unavailable, then the query will fail with `err-mimir-store-consistency-check-failed`. #7777
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
* [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6502,6 +6502,17 @@
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "max_concurrent_queue_timeout",
"required": false,
"desc": "Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retried on another store-gateway. 0 means no timeout and all queries will wait indefinitely for their turn.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "blocks-storage.bucket-store.max-concurrent-queue-timeout",
"fieldType": "duration",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "tenant_sync_concurrency",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ Usage of ./cmd/mimir/mimir:
If true, verify the checksum of index headers upon loading them (either on startup or lazily when lazy loading is enabled). Setting to true helps detect disk corruption at the cost of slowing down index header loading.
-blocks-storage.bucket-store.max-concurrent int
Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants. (default 100)
-blocks-storage.bucket-store.max-concurrent-queue-timeout duration
Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retried on another store-gateway. 0 means no timeout and all queries will wait indefinitely for their turn.
-blocks-storage.bucket-store.meta-sync-concurrency int
Number of Go routines to use when syncing block meta files from object storage per tenant. (default 20)
-blocks-storage.bucket-store.metadata-cache.backend string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3526,6 +3526,13 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.max-concurrent
[max_concurrent: <int> | default = 100]
# (advanced) Timeout for the queue of queries waiting for execution. If the
# queue is full and the timeout is reached, the query will be retried on
# another store-gateway. 0 means no timeout and all queries will wait
# indefinitely for their turn.
# CLI flag: -blocks-storage.bucket-store.max-concurrent-queue-timeout
[max_concurrent_queue_timeout: <duration> | default = 0s]
# (advanced) Maximum number of concurrent tenants synching blocks.
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
[tenant_sync_concurrency: <int> | default = 1]
Expand Down
73 changes: 73 additions & 0 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

Expand All @@ -54,6 +55,7 @@ import (
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/globalerror"
"github.com/grafana/mimir/pkg/util/limiter"
"github.com/grafana/mimir/pkg/util/test"
)
Expand Down Expand Up @@ -625,6 +627,49 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
`
},
},
"a single store-gateway instance returns no response implies querying another instance for the same blocks (consistency check passed)": {
finderResult: bucketindex.Blocks{
{ID: block1},
},
storeSetResponses: []interface{}{
// First attempt returns a client whose response does not include all expected blocks.
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "1.1.1.1", mockedSeriesResponses: nil,
}: {block1},
},
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{mockHintsResponse(block1)},
}: {block1},
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: noOpQueryLimiter,
},
"two store-gateway instances returning no response causes consistency check to fail": {
finderResult: bucketindex.Blocks{
{ID: block1},
},
storeSetResponses: []interface{}{
// First attempt returns a client whose response does not include all expected blocks.
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "1.1.1.1", mockedSeriesResponses: nil,
}: {block1},
},
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "2.2.2.2", mockedSeriesResponses: nil,
}: {block1},
},
// Third attempt returns an error because there are no other store-gateways left.
errors.New("no store-gateway remaining after exclude"),
},
limits: &blocksStoreLimitsMock{},
queryLimiter: noOpQueryLimiter,
expectedErr: newStoreConsistencyCheckFailedError([]ulid.ULID{block1}),
},
"a single store-gateway instance has some missing blocks (consistency check failed)": {
finderResult: bucketindex.Blocks{
{ID: block1},
Expand Down Expand Up @@ -3243,6 +3288,34 @@ func TestShouldStopQueryFunc(t *testing.T) {
err: errors.New("test"),
expected: false,
},
"should not stop query on store-gateway instance limit": {
err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Aborted, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}),
expected: false,
},
"should not stop query on store-gateway instance limit; shouldn't look at the gRPC code, only Mimir error cause": {
err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Internal, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}),
expected: false,
},
"should not stop query on any other mimirpb error": {
err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Internal, &mimirpb.ErrorDetails{Cause: mimirpb.TOO_BUSY}),
expected: false,
},
"should not stop query on any unknown error detail": {
err: func() error {
st, createErr := status.New(codes.Internal, "test").WithDetails(&hintspb.Block{Id: "123"})
require.NoError(t, createErr)
return st.Err()
}(),
expected: false,
},
"should not stop query on multiple error details": {
err: func() error {
st, createErr := status.New(codes.Internal, "test").WithDetails(&hintspb.Block{Id: "123"}, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT})
require.NoError(t, createErr)
return st.Err()
}(),
expected: false,
},
}

for testName, testData := range tests {
Expand Down
26 changes: 14 additions & 12 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,18 +386,19 @@ func (cfg *TSDBConfig) IsBlocksShippingEnabled() bool {

// BucketStoreConfig holds the config information for Bucket Stores used by the querier and store-gateway.
type BucketStoreConfig struct {
SyncDir string `yaml:"sync_dir"`
SyncInterval time.Duration `yaml:"sync_interval" category:"advanced"`
MaxConcurrent int `yaml:"max_concurrent" category:"advanced"`
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency" category:"advanced"`
BlockSyncConcurrency int `yaml:"block_sync_concurrency" category:"advanced"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency" category:"advanced"`
IndexCache IndexCacheConfig `yaml:"index_cache"`
ChunksCache ChunksCacheConfig `yaml:"chunks_cache"`
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay" category:"advanced"`
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within" category:"advanced"`
SyncDir string `yaml:"sync_dir"`
SyncInterval time.Duration `yaml:"sync_interval" category:"advanced"`
MaxConcurrent int `yaml:"max_concurrent" category:"advanced"`
MaxConcurrentQueueTimeout time.Duration `yaml:"max_concurrent_queue_timeout" category:"advanced"`
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency" category:"advanced"`
BlockSyncConcurrency int `yaml:"block_sync_concurrency" category:"advanced"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency" category:"advanced"`
IndexCache IndexCacheConfig `yaml:"index_cache"`
ChunksCache ChunksCacheConfig `yaml:"chunks_cache"`
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay" category:"advanced"`
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within" category:"advanced"`

// Series hash cache.
SeriesHashCacheMaxBytes uint64 `yaml:"series_hash_cache_max_size_bytes" category:"advanced"`
Expand Down Expand Up @@ -448,6 +449,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.SyncInterval, "blocks-storage.bucket-store.sync-interval", 15*time.Minute, "How frequently to scan the bucket, or to refresh the bucket index (if enabled), in order to look for changes (new blocks shipped by ingesters and blocks deleted by retention or compaction).")
f.Uint64Var(&cfg.SeriesHashCacheMaxBytes, "blocks-storage.bucket-store.series-hash-cache-max-size-bytes", uint64(1*units.Gibibyte), "Max size - in bytes - of the in-memory series hash cache. The cache is shared across all tenants and it's used only when query sharding is enabled.")
f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.")
f.DurationVar(&cfg.MaxConcurrentQueueTimeout, "blocks-storage.bucket-store.max-concurrent-queue-timeout", 0, "Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retried on another store-gateway. 0 means no timeout and all queries will wait indefinitely for their turn.")
f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 1, "Maximum number of concurrent tenants synching blocks.")
f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 4, "Maximum number of concurrent blocks synching per tenant.")
f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.")
Expand Down
39 changes: 27 additions & 12 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/globalerror"
"github.com/grafana/mimir/pkg/util/pool"
"github.com/grafana/mimir/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -546,18 +547,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
// We don't do the streaming call if we are not requesting the chunks.
req.StreamingChunksBatchSize = 0
}
defer func() {
if err == nil {
return
}
code := codes.Internal
if st, ok := grpcutil.ErrorToStatus(err); ok {
code = st.Code()
} else if errors.Is(err, context.Canceled) {
code = codes.Canceled
}
err = status.Error(code, err.Error())
}()
defer func() { err = mapSeriesError(err) }()

matchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
if err != nil {
Expand Down Expand Up @@ -709,6 +699,31 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
return nil
}

func mapSeriesError(err error) error {
if err == nil {
return err
}

var stGwErr storeGatewayError
switch {
case errors.As(err, &stGwErr):
switch cause := stGwErr.errorCause(); cause {
case mimirpb.INSTANCE_LIMIT:
return globalerror.NewErrorWithGRPCStatus(stGwErr, codes.Unavailable, &mimirpb.ErrorDetails{Cause: cause})
default:
return globalerror.NewErrorWithGRPCStatus(stGwErr, codes.Internal, &mimirpb.ErrorDetails{Cause: cause})
}
default:
code := codes.Internal
if st, ok := grpcutil.ErrorToStatus(err); ok {
code = st.Code()
} else if errors.Is(err, context.Canceled) {
code = codes.Canceled
}
return status.Error(code, err.Error())
}
}

func (s *BucketStore) recordRequestAmbientTime(stats *safeQueryStats, requestStart time.Time) {
stats.update(func(stats *queryStats) {
stats.streamingSeriesAmbientTime += time.Since(requestStart)
Expand Down
12 changes: 8 additions & 4 deletions pkg/storegateway/bucket_store_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest
)

// Create a gRPC connection to the server.
conn, err = grpc.Dial(s.serverListener.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*1024)),
)
conn, err = s.dialConn()
if err != nil {
return
}
Expand Down Expand Up @@ -258,6 +255,13 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest
return
}

func (s *storeTestServer) dialConn() (*grpc.ClientConn, error) {
return grpc.Dial(s.serverListener.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*1024)),
)
}

// Close releases all resources.
func (s *storeTestServer) Close() {
s.server.GracefulStop()
Expand Down
36 changes: 36 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/thanos-io/objstore"
"google.golang.org/grpc/metadata"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
Expand Down Expand Up @@ -106,6 +107,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
queryGateReg := prometheus.WrapRegistererWith(prometheus.Labels{"gate": "query"}, gateReg)
queryGate := gate.NewBlocking(cfg.BucketStore.MaxConcurrent)
queryGate = gate.NewInstrumented(queryGateReg, cfg.BucketStore.MaxConcurrent, queryGate)
queryGate = timeoutGate{delegate: queryGate, timeout: cfg.BucketStore.MaxConcurrentQueueTimeout}

// The number of concurrent index header loads from storegateway are limited.
lazyLoadingGateReg := prometheus.WrapRegistererWith(prometheus.Labels{"gate": "index_header"}, gateReg)
Expand Down Expand Up @@ -420,6 +422,40 @@ func (u *BucketStores) syncDirForUser(userID string) string {
return filepath.Join(u.cfg.BucketStore.SyncDir, userID)
}

// timeoutGate returns errGateTimeout when the timeout is reached while still waiting for the delegate gate.
// timeoutGate belongs better in dskit. However, at the time of writing dskit supports go 1.20.
// go 1.20 doesn't have context.WithTimeoutCause yet,
// so we choose to implement timeoutGate here instead of implementing context.WithTimeoutCause ourselves in dskit.
// It also allows to keep the span logger in timeoutGate as opposed to in the bucket store.
type timeoutGate struct {
delegate gate.Gate
timeout time.Duration
}

var errGateTimeout = staticError{cause: mimirpb.INSTANCE_LIMIT, msg: "timeout waiting for concurrency gate"}

func (t timeoutGate) Start(ctx context.Context) error {
if t.timeout == 0 {
return t.delegate.Start(ctx)
}

// Inject our own error so that we can differentiate between a timeout caused by this gate
// or a timeout in the original request timeout.
ctx, cancel := context.WithTimeoutCause(ctx, t.timeout, errGateTimeout)
defer cancel()

err := t.delegate.Start(ctx)
if errors.Is(context.Cause(ctx), errGateTimeout) {
_ = spanlogger.FromContext(ctx, log.NewNopLogger()).Error(err)
err = errGateTimeout
}
return err
}

func (t timeoutGate) Done() {
t.delegate.Done()
}

func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
// Check if the store already exists.
bs := u.getStore(userID)
Expand Down
Loading

0 comments on commit 61a1c8b

Please sign in to comment.