diff --git a/CHANGELOG.md b/CHANGELOG.md index 752e3ed541a..601f953ecdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,7 +58,7 @@ * [ENHANCEMENT] Admin: Admin API now has some styling. #1482 #1549 * [ENHANCEMENT] Alertmanager: added `insight=true` field to alertmanager dispatch logs. #1379 * [ENHANCEMENT] Store-gateway: Add the experimental ability to run index header operations in a dedicated thread pool. This feature can be configured using `-blocks-storage.bucket-store.index-header-thread-pool-size` and is disabled by default. #1660 -* [ENHANCEMENT] Store-gateway: don't drop all blocks if instance finds itself as unhealthy in the ring. #1806 +* [ENHANCEMENT] Store-gateway: don't drop all blocks if instance finds itself as unhealthy or missing in the ring. #1806 #1823 * [ENHANCEMENT] Querier: wait until inflight queries are completed when shutting down queriers. #1756 #1767 * [BUGFIX] Query-frontend: do not shard queries with a subquery unless the subquery is inside a shardable aggregation function call. #1542 * [BUGFIX] Query-frontend: added `component=query-frontend` label to results cache memcached metrics to fix a panic when Mimir is running in single binary mode and results cache is enabled. #1704 diff --git a/pkg/storegateway/bucket_index_metadata_fetcher_test.go b/pkg/storegateway/bucket_index_metadata_fetcher_test.go index b71ab8a5447..6a087bc4a4f 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher_test.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher_test.go @@ -222,8 +222,8 @@ func newNoShardingStrategy() *noShardingStrategy { return &noShardingStrategy{} } -func (s *noShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { - return userIDs +func (s *noShardingStrategy) FilterUsers(_ context.Context, userIDs []string) ([]string, error) { + return userIDs, nil } func (s *noShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]struct{}, _ *extprom.TxGaugeVec) error { diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 74b603df1ba..07b7ba05a03 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -56,6 +56,7 @@ type BucketStores struct { bucketStoreMetrics *BucketStoreMetrics metaFetcherMetrics *MetadataFetcherMetrics shardingStrategy ShardingStrategy + syncBackoffConfig backoff.Config // Index cache shared across all tenants. indexCache indexcache.IndexCache @@ -116,6 +117,11 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg), threadPool: mimir_indexheader.NewThreadPool(cfg.BucketStore.IndexHeaderThreadPoolSize, reg), seriesHashCache: hashcache.NewSeriesHashCache(cfg.BucketStore.SeriesHashCacheMaxBytes), + syncBackoffConfig: backoff.Config{ + MinBackoff: 1 * time.Second, + MaxBackoff: 10 * time.Second, + MaxRetries: 3, + }, } // Register metrics. @@ -198,11 +204,7 @@ func (u *BucketStores) SyncBlocks(ctx context.Context) error { } func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *BucketStore) error) error { - retries := backoff.New(ctx, backoff.Config{ - MinBackoff: 1 * time.Second, - MaxBackoff: 10 * time.Second, - MaxRetries: 3, - }) + retries := backoff.New(ctx, u.syncBackoffConfig) var lastErr error for retries.Ongoing() { @@ -247,8 +249,13 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte return err } - includeUserIDs := make(map[string]struct{}) - for _, userID := range u.shardingStrategy.FilterUsers(ctx, userIDs) { + ownedUserIDs, err := u.shardingStrategy.FilterUsers(ctx, userIDs) + if err != nil { + return errors.Wrap(err, "unable to check tenants owned by this store-gateway instance") + } + + includeUserIDs := make(map[string]struct{}, len(ownedUserIDs)) + for _, userID := range ownedUserIDs { includeUserIDs[userID] = struct{}{} } diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index c2c27037999..eb526c14408 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -290,7 +290,7 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) { "when sharding is enabled only stores for filtered users should be created": { shardingStrategy: func() ShardingStrategy { s := &mockShardingStrategy{} - s.On("FilterUsers", mock.Anything, allUsers).Return([]string{"user-1", "user-2"}) + s.On("FilterUsers", mock.Anything, allUsers).Return([]string{"user-1", "user-2"}, nil) return s }(), expectedStores: 2, @@ -713,8 +713,8 @@ type userShardingStrategy struct { users []string } -func (u *userShardingStrategy) FilterUsers(ctx context.Context, userIDs []string) []string { - return u.users +func (u *userShardingStrategy) FilterUsers(ctx context.Context, userIDs []string) ([]string, error) { + return u.users, nil } func (u *userShardingStrategy) FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*thanos_metadata.Meta, loaded map[ulid.ULID]struct{}, synced *extprom.TxGaugeVec) error { diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 3092bad6664..c78d8e9aaf5 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/ring" @@ -31,6 +32,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -751,6 +753,187 @@ func TestStoreGateway_SyncOnRingTopologyChanged(t *testing.T) { } } +func TestStoreGateway_SyncShouldKeepPreviousBlocksIfInstanceIsUnhealthyInTheRing(t *testing.T) { + test.VerifyNoLeak(t) + + const ( + instanceID = "instance-1" + instanceAddr = "127.0.0.1" + userID = "user-1" + metricName = "series_1" + ) + + ctx := context.Background() + gatewayCfg := mockGatewayConfig() + gatewayCfg.ShardingRing.InstanceID = instanceID + gatewayCfg.ShardingRing.InstanceAddr = instanceAddr + gatewayCfg.ShardingRing.RingCheckPeriod = time.Hour // Do not trigger the sync each time the ring changes (we want to control it in this test). + + storageCfg := mockStorageConfig(t) + storageCfg.BucketStore.SyncInterval = time.Hour // Do not trigger the periodic sync (we want to control it in this test). + + reg := prometheus.NewPedanticRegistry() + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + storageDir := t.TempDir() + + // Generate a real TSDB block in the storage. + bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15) + + g, err := newStoreGateway(gatewayCfg, storageCfg, bucket, ringStore, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) + require.NoError(t, err) + + // No sync retries to speed up tests. + g.stores.syncBackoffConfig = backoff.Config{MaxRetries: 1} + + // Start the store-gateway. + require.NoError(t, services.StartAndAwaitRunning(ctx, g)) + t.Cleanup(func() { assert.NoError(t, services.StopAndAwaitTerminated(ctx, g)) }) + + t.Run("store-gateway is healthy in the ring", func(t *testing.T) { + g.syncStores(ctx, syncReasonPeriodic) + + // Run query and ensure the block is queried. + req := &storepb.SeriesRequest{MinTime: math.MinInt64, MaxTime: math.MaxInt64} + srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID)) + require.NoError(t, g.Series(req, srv)) + assert.Len(t, srv.Hints.QueriedBlocks, 1) + + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_bucket_store_blocks_loaded Number of currently loaded blocks. + # TYPE cortex_bucket_store_blocks_loaded gauge + cortex_bucket_store_blocks_loaded{component="store-gateway"} 1 + + # HELP cortex_bucket_store_block_loads_total Total number of remote block loading attempts. + # TYPE cortex_bucket_store_block_loads_total counter + cortex_bucket_store_block_loads_total{component="store-gateway"} 1 + + # HELP cortex_bucket_store_block_load_failures_total Total number of failed remote block loading attempts. + # TYPE cortex_bucket_store_block_load_failures_total counter + cortex_bucket_store_block_load_failures_total{component="store-gateway"} 0 + `), + "cortex_bucket_store_blocks_loaded", + "cortex_bucket_store_block_loads_total", + "cortex_bucket_store_block_load_failures_total", + )) + }) + + t.Run("store-gateway is unhealthy in the ring", func(t *testing.T) { + // Change heartbeat timestamp in the ring to make it unhealthy. + require.NoError(t, ringStore.CAS(ctx, RingKey, func(in interface{}) (interface{}, bool, error) { + ringDesc := ring.GetOrCreateRingDesc(in) + instance := ringDesc.Ingesters[instanceID] + instance.Timestamp = time.Now().Add(-time.Hour).Unix() + ringDesc.Ingesters[instanceID] = instance + return ringDesc, true, nil + })) + + // Wait until the ring client has received the update. + // We expect the set of healthy instances to be empty. + dstest.Poll(t, 5*time.Second, true, func() interface{} { + actual, err := g.ring.GetAllHealthy(BlocksOwnerSync) + return err == nil && len(actual.Instances) == 0 + }) + + g.syncStores(ctx, syncReasonPeriodic) + + // Run query and ensure the block is queried. + req := &storepb.SeriesRequest{MinTime: math.MinInt64, MaxTime: math.MaxInt64} + srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID)) + require.NoError(t, g.Series(req, srv)) + assert.Len(t, srv.Hints.QueriedBlocks, 1) + + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_bucket_store_blocks_loaded Number of currently loaded blocks. + # TYPE cortex_bucket_store_blocks_loaded gauge + cortex_bucket_store_blocks_loaded{component="store-gateway"} 1 + + # HELP cortex_bucket_store_block_loads_total Total number of remote block loading attempts. + # TYPE cortex_bucket_store_block_loads_total counter + cortex_bucket_store_block_loads_total{component="store-gateway"} 1 + `), + "cortex_bucket_store_blocks_loaded", + "cortex_bucket_store_block_loads_total", + )) + }) + + t.Run("store-gateway is missing in the ring (e.g. removed from another instance because of the auto-forget feature)", func(t *testing.T) { + // Remove the instance from the ring. + require.NoError(t, ringStore.CAS(ctx, RingKey, func(in interface{}) (interface{}, bool, error) { + ringDesc := ring.GetOrCreateRingDesc(in) + delete(ringDesc.Ingesters, instanceID) + return ringDesc, true, nil + })) + + // Wait until the ring client has received the update. + // We expect the ring to be empty. + dstest.Poll(t, 5*time.Second, ring.ErrEmptyRing, func() interface{} { + _, err := g.ring.GetAllHealthy(BlocksOwnerSync) + return err + }) + + g.syncStores(ctx, syncReasonPeriodic) + + // Run query and ensure the block is queried. + req := &storepb.SeriesRequest{MinTime: math.MinInt64, MaxTime: math.MaxInt64} + srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID)) + require.NoError(t, g.Series(req, srv)) + assert.Len(t, srv.Hints.QueriedBlocks, 1) + + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_bucket_store_blocks_loaded Number of currently loaded blocks. + # TYPE cortex_bucket_store_blocks_loaded gauge + cortex_bucket_store_blocks_loaded{component="store-gateway"} 1 + + # HELP cortex_bucket_store_block_loads_total Total number of remote block loading attempts. + # TYPE cortex_bucket_store_block_loads_total counter + cortex_bucket_store_block_loads_total{component="store-gateway"} 1 + `), + "cortex_bucket_store_blocks_loaded", + "cortex_bucket_store_block_loads_total", + )) + }) + + t.Run("store-gateway is re-registered to the ring and it's healthy", func(t *testing.T) { + // Re-register the instance to the ring. + require.NoError(t, ringStore.CAS(ctx, RingKey, func(in interface{}) (interface{}, bool, error) { + ringDesc := ring.GetOrCreateRingDesc(in) + ringDesc.AddIngester(instanceID, instanceAddr, "", ring.Tokens{1, 2, 3}, ring.ACTIVE, time.Now()) + return ringDesc, true, nil + })) + + // Wait until the ring client has received the update. + dstest.Poll(t, 5*time.Second, true, func() interface{} { + actual, err := g.ring.GetAllHealthy(BlocksOwnerSync) + return err == nil && actual.Includes(instanceAddr) + }) + + g.syncStores(ctx, syncReasonPeriodic) + + // Run query and ensure the block is queried. + req := &storepb.SeriesRequest{MinTime: math.MinInt64, MaxTime: math.MaxInt64} + srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID)) + require.NoError(t, g.Series(req, srv)) + assert.Len(t, srv.Hints.QueriedBlocks, 1) + + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_bucket_store_blocks_loaded Number of currently loaded blocks. + # TYPE cortex_bucket_store_blocks_loaded gauge + cortex_bucket_store_blocks_loaded{component="store-gateway"} 1 + + # HELP cortex_bucket_store_block_loads_total Total number of remote block loading attempts. + # TYPE cortex_bucket_store_block_loads_total counter + cortex_bucket_store_block_loads_total{component="store-gateway"} 1 + `), + "cortex_bucket_store_blocks_loaded", + "cortex_bucket_store_block_loads_total", + )) + }) +} + func TestStoreGateway_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) { test.VerifyNoLeak(t) @@ -1359,9 +1542,9 @@ type mockShardingStrategy struct { mock.Mock } -func (m *mockShardingStrategy) FilterUsers(ctx context.Context, userIDs []string) []string { +func (m *mockShardingStrategy) FilterUsers(ctx context.Context, userIDs []string) ([]string, error) { args := m.Called(ctx, userIDs) - return args.Get(0).([]string) + return args.Get(0).([]string), args.Error(1) } func (m *mockShardingStrategy) FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced *extprom.TxGaugeVec) error { diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go index ac0fa925c10..2fd9488118a 100644 --- a/pkg/storegateway/sharding_strategy.go +++ b/pkg/storegateway/sharding_strategy.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/ring" "github.com/oklog/ulid" + "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" @@ -23,10 +24,14 @@ const ( shardExcludedMeta = "shard-excluded" ) +var ( + errStoreGatewayUnhealthy = errors.New("store-gateway is unhealthy in the ring") +) + type ShardingStrategy interface { // FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs // that should be synced by the store-gateway. - FilterUsers(ctx context.Context, userIDs []string) []string + FilterUsers(ctx context.Context, userIDs []string) ([]string, error) // FilterBlocks filters metas in-place keeping only blocks that should be loaded by the store-gateway. // The provided loaded map contains blocks which have been previously returned by this function and @@ -62,7 +67,16 @@ func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, l } // FilterUsers implements ShardingStrategy. -func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { +func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) ([]string, error) { + // As a protection, ensure the store-gateway instance is healthy in the ring. It could also be missing + // in the ring if it was failing to heartbeat the ring and it got remove from another healthy store-gateway + // instance, because of the auto-forget feature. + if set, err := s.r.GetAllHealthy(BlocksOwnerSync); err != nil { + return nil, err + } else if !set.Includes(s.instanceAddr) { + return nil, errStoreGatewayUnhealthy + } + var filteredIDs []string for _, userID := range userIDs { @@ -74,7 +88,7 @@ func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []strin } } - return filteredIDs + return filteredIDs, nil } // FilterBlocks implements ShardingStrategy. @@ -82,7 +96,6 @@ func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, // As a protection, ensure the store-gateway instance is healthy in the ring. If it's unhealthy because it's failing // to heartbeat or get updates from the ring, or even removed from the ring because of the auto-forget feature, then // keep the previously loaded blocks. - // TODO test if set, err := s.r.GetAllHealthy(BlocksOwnerSync); err != nil || !set.Includes(s.instanceAddr) { for blockID := range metas { if _, ok := loaded[blockID]; ok { diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index dd06322f8fd..1755b8df72d 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -46,6 +46,7 @@ func TestShuffleShardingStrategy(t *testing.T) { instanceID string instanceAddr string users []string + err error } type blocksExpectation struct { @@ -70,7 +71,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, expectedUsers: []usersExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, - {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", err: errStoreGatewayUnhealthy}, }, expectedBlocks: []blocksExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}}, @@ -85,7 +86,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, expectedUsers: []usersExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, - {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", err: errStoreGatewayUnhealthy}, }, expectedBlocks: []blocksExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}}, @@ -100,7 +101,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, expectedUsers: []usersExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, - {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", err: errStoreGatewayUnhealthy}, }, expectedBlocks: []blocksExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}}, @@ -207,7 +208,7 @@ func TestShuffleShardingStrategy(t *testing.T) { expectedUsers: []usersExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, - {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", err: errStoreGatewayUnhealthy}, }, expectedBlocks: []blocksExpectation{ // No shard has the blocks of the unhealthy instance. @@ -233,7 +234,7 @@ func TestShuffleShardingStrategy(t *testing.T) { expectedUsers: []usersExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, - {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", err: errStoreGatewayUnhealthy}, }, expectedBlocks: []blocksExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block3 /* replicated: */, block2, block4}}, @@ -258,7 +259,7 @@ func TestShuffleShardingStrategy(t *testing.T) { expectedUsers: []usersExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, - {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", err: errStoreGatewayUnhealthy}, }, expectedBlocks: []blocksExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}}, @@ -286,7 +287,7 @@ func TestShuffleShardingStrategy(t *testing.T) { expectedUsers: []usersExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, - {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", err: errStoreGatewayUnhealthy}, }, expectedBlocks: []blocksExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}}, @@ -385,7 +386,9 @@ func TestShuffleShardingStrategy(t *testing.T) { // Assert on filter users. for _, expected := range testData.expectedUsers { filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger()) - assert.Equal(t, expected.users, filter.FilterUsers(ctx, []string{userID})) + actualUsers, err := filter.FilterUsers(ctx, []string{userID}) + assert.Equal(t, expected.err, err) + assert.Equal(t, expected.users, actualUsers) } // Assert on filter blocks.