Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not drop blocks in the store-gateway if missing in the ring #1823

Merged
merged 1 commit into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
* [ENHANCEMENT] Admin: Admin API now has some styling. #1482 #1549
* [ENHANCEMENT] Alertmanager: added `insight=true` field to alertmanager dispatch logs. #1379
* [ENHANCEMENT] Store-gateway: Add the experimental ability to run index header operations in a dedicated thread pool. This feature can be configured using `-blocks-storage.bucket-store.index-header-thread-pool-size` and is disabled by default. #1660
* [ENHANCEMENT] Store-gateway: don't drop all blocks if instance finds itself as unhealthy in the ring. #1806
* [ENHANCEMENT] Store-gateway: don't drop all blocks if instance finds itself as unhealthy or missing in the ring. #1806 #1823
* [ENHANCEMENT] Querier: wait until inflight queries are completed when shutting down queriers. #1756 #1767
* [BUGFIX] Query-frontend: do not shard queries with a subquery unless the subquery is inside a shardable aggregation function call. #1542
* [BUGFIX] Query-frontend: added `component=query-frontend` label to results cache memcached metrics to fix a panic when Mimir is running in single binary mode and results cache is enabled. #1704
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/bucket_index_metadata_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ func newNoShardingStrategy() *noShardingStrategy {
return &noShardingStrategy{}
}

func (s *noShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string {
return userIDs
func (s *noShardingStrategy) FilterUsers(_ context.Context, userIDs []string) ([]string, error) {
return userIDs, nil
}

func (s *noShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]struct{}, _ *extprom.TxGaugeVec) error {
Expand Down
21 changes: 14 additions & 7 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type BucketStores struct {
bucketStoreMetrics *BucketStoreMetrics
metaFetcherMetrics *MetadataFetcherMetrics
shardingStrategy ShardingStrategy
syncBackoffConfig backoff.Config

// Index cache shared across all tenants.
indexCache indexcache.IndexCache
Expand Down Expand Up @@ -116,6 +117,11 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
threadPool: mimir_indexheader.NewThreadPool(cfg.BucketStore.IndexHeaderThreadPoolSize, reg),
seriesHashCache: hashcache.NewSeriesHashCache(cfg.BucketStore.SeriesHashCacheMaxBytes),
syncBackoffConfig: backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
MaxRetries: 3,
},
}

// Register metrics.
Expand Down Expand Up @@ -198,11 +204,7 @@ func (u *BucketStores) SyncBlocks(ctx context.Context) error {
}

func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *BucketStore) error) error {
retries := backoff.New(ctx, backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
MaxRetries: 3,
})
retries := backoff.New(ctx, u.syncBackoffConfig)

var lastErr error
for retries.Ongoing() {
Expand Down Expand Up @@ -247,8 +249,13 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte
return err
}

includeUserIDs := make(map[string]struct{})
for _, userID := range u.shardingStrategy.FilterUsers(ctx, userIDs) {
ownedUserIDs, err := u.shardingStrategy.FilterUsers(ctx, userIDs)
if err != nil {
return errors.Wrap(err, "unable to check tenants owned by this store-gateway instance")
}

includeUserIDs := make(map[string]struct{}, len(ownedUserIDs))
for _, userID := range ownedUserIDs {
includeUserIDs[userID] = struct{}{}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) {
"when sharding is enabled only stores for filtered users should be created": {
shardingStrategy: func() ShardingStrategy {
s := &mockShardingStrategy{}
s.On("FilterUsers", mock.Anything, allUsers).Return([]string{"user-1", "user-2"})
s.On("FilterUsers", mock.Anything, allUsers).Return([]string{"user-1", "user-2"}, nil)
return s
}(),
expectedStores: 2,
Expand Down Expand Up @@ -713,8 +713,8 @@ type userShardingStrategy struct {
users []string
}

func (u *userShardingStrategy) FilterUsers(ctx context.Context, userIDs []string) []string {
return u.users
func (u *userShardingStrategy) FilterUsers(ctx context.Context, userIDs []string) ([]string, error) {
return u.users, nil
}

func (u *userShardingStrategy) FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*thanos_metadata.Meta, loaded map[ulid.ULID]struct{}, synced *extprom.TxGaugeVec) error {
Expand Down
187 changes: 185 additions & 2 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/ring"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
Expand Down Expand Up @@ -751,6 +753,187 @@ func TestStoreGateway_SyncOnRingTopologyChanged(t *testing.T) {
}
}

func TestStoreGateway_SyncShouldKeepPreviousBlocksIfInstanceIsUnhealthyInTheRing(t *testing.T) {
test.VerifyNoLeak(t)

const (
instanceID = "instance-1"
instanceAddr = "127.0.0.1"
userID = "user-1"
metricName = "series_1"
)

ctx := context.Background()
gatewayCfg := mockGatewayConfig()
gatewayCfg.ShardingRing.InstanceID = instanceID
gatewayCfg.ShardingRing.InstanceAddr = instanceAddr
gatewayCfg.ShardingRing.RingCheckPeriod = time.Hour // Do not trigger the sync each time the ring changes (we want to control it in this test).

storageCfg := mockStorageConfig(t)
storageCfg.BucketStore.SyncInterval = time.Hour // Do not trigger the periodic sync (we want to control it in this test).

reg := prometheus.NewPedanticRegistry()
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

storageDir := t.TempDir()

// Generate a real TSDB block in the storage.
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)
generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15)

g, err := newStoreGateway(gatewayCfg, storageCfg, bucket, ringStore, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
require.NoError(t, err)

// No sync retries to speed up tests.
g.stores.syncBackoffConfig = backoff.Config{MaxRetries: 1}

// Start the store-gateway.
require.NoError(t, services.StartAndAwaitRunning(ctx, g))
t.Cleanup(func() { assert.NoError(t, services.StopAndAwaitTerminated(ctx, g)) })

t.Run("store-gateway is healthy in the ring", func(t *testing.T) {
g.syncStores(ctx, syncReasonPeriodic)

// Run query and ensure the block is queried.
req := &storepb.SeriesRequest{MinTime: math.MinInt64, MaxTime: math.MaxInt64}
srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID))
require.NoError(t, g.Series(req, srv))
assert.Len(t, srv.Hints.QueriedBlocks, 1)

assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_bucket_store_blocks_loaded Number of currently loaded blocks.
# TYPE cortex_bucket_store_blocks_loaded gauge
cortex_bucket_store_blocks_loaded{component="store-gateway"} 1

# HELP cortex_bucket_store_block_loads_total Total number of remote block loading attempts.
# TYPE cortex_bucket_store_block_loads_total counter
cortex_bucket_store_block_loads_total{component="store-gateway"} 1

# HELP cortex_bucket_store_block_load_failures_total Total number of failed remote block loading attempts.
# TYPE cortex_bucket_store_block_load_failures_total counter
cortex_bucket_store_block_load_failures_total{component="store-gateway"} 0
`),
"cortex_bucket_store_blocks_loaded",
"cortex_bucket_store_block_loads_total",
"cortex_bucket_store_block_load_failures_total",
))
})

t.Run("store-gateway is unhealthy in the ring", func(t *testing.T) {
// Change heartbeat timestamp in the ring to make it unhealthy.
require.NoError(t, ringStore.CAS(ctx, RingKey, func(in interface{}) (interface{}, bool, error) {
ringDesc := ring.GetOrCreateRingDesc(in)
instance := ringDesc.Ingesters[instanceID]
instance.Timestamp = time.Now().Add(-time.Hour).Unix()
ringDesc.Ingesters[instanceID] = instance
return ringDesc, true, nil
}))

// Wait until the ring client has received the update.
// We expect the set of healthy instances to be empty.
dstest.Poll(t, 5*time.Second, true, func() interface{} {
actual, err := g.ring.GetAllHealthy(BlocksOwnerSync)
return err == nil && len(actual.Instances) == 0
})

g.syncStores(ctx, syncReasonPeriodic)

// Run query and ensure the block is queried.
req := &storepb.SeriesRequest{MinTime: math.MinInt64, MaxTime: math.MaxInt64}
srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID))
require.NoError(t, g.Series(req, srv))
assert.Len(t, srv.Hints.QueriedBlocks, 1)
Comment on lines +843 to +847
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though it is tested when asking store-gateway, the querier wouldn't query this store-gateway as it's unhealthy, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. The rationale of this fix is not unload all blocks if the store-gateway is unable to heartbeat the ring (or receive updates from the ring backend for a while) but keep whatever it previously had.


assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_bucket_store_blocks_loaded Number of currently loaded blocks.
# TYPE cortex_bucket_store_blocks_loaded gauge
cortex_bucket_store_blocks_loaded{component="store-gateway"} 1

# HELP cortex_bucket_store_block_loads_total Total number of remote block loading attempts.
# TYPE cortex_bucket_store_block_loads_total counter
cortex_bucket_store_block_loads_total{component="store-gateway"} 1
`),
"cortex_bucket_store_blocks_loaded",
"cortex_bucket_store_block_loads_total",
))
})

t.Run("store-gateway is missing in the ring (e.g. removed from another instance because of the auto-forget feature)", func(t *testing.T) {
// Remove the instance from the ring.
require.NoError(t, ringStore.CAS(ctx, RingKey, func(in interface{}) (interface{}, bool, error) {
ringDesc := ring.GetOrCreateRingDesc(in)
delete(ringDesc.Ingesters, instanceID)
return ringDesc, true, nil
}))

// Wait until the ring client has received the update.
// We expect the ring to be empty.
dstest.Poll(t, 5*time.Second, ring.ErrEmptyRing, func() interface{} {
_, err := g.ring.GetAllHealthy(BlocksOwnerSync)
return err
})

g.syncStores(ctx, syncReasonPeriodic)

// Run query and ensure the block is queried.
req := &storepb.SeriesRequest{MinTime: math.MinInt64, MaxTime: math.MaxInt64}
srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID))
require.NoError(t, g.Series(req, srv))
assert.Len(t, srv.Hints.QueriedBlocks, 1)

assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_bucket_store_blocks_loaded Number of currently loaded blocks.
# TYPE cortex_bucket_store_blocks_loaded gauge
cortex_bucket_store_blocks_loaded{component="store-gateway"} 1

# HELP cortex_bucket_store_block_loads_total Total number of remote block loading attempts.
# TYPE cortex_bucket_store_block_loads_total counter
cortex_bucket_store_block_loads_total{component="store-gateway"} 1
`),
"cortex_bucket_store_blocks_loaded",
"cortex_bucket_store_block_loads_total",
))
})

t.Run("store-gateway is re-registered to the ring and it's healthy", func(t *testing.T) {
// Re-register the instance to the ring.
require.NoError(t, ringStore.CAS(ctx, RingKey, func(in interface{}) (interface{}, bool, error) {
ringDesc := ring.GetOrCreateRingDesc(in)
ringDesc.AddIngester(instanceID, instanceAddr, "", ring.Tokens{1, 2, 3}, ring.ACTIVE, time.Now())
return ringDesc, true, nil
}))

// Wait until the ring client has received the update.
dstest.Poll(t, 5*time.Second, true, func() interface{} {
actual, err := g.ring.GetAllHealthy(BlocksOwnerSync)
return err == nil && actual.Includes(instanceAddr)
})

g.syncStores(ctx, syncReasonPeriodic)

// Run query and ensure the block is queried.
req := &storepb.SeriesRequest{MinTime: math.MinInt64, MaxTime: math.MaxInt64}
srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID))
require.NoError(t, g.Series(req, srv))
assert.Len(t, srv.Hints.QueriedBlocks, 1)

assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_bucket_store_blocks_loaded Number of currently loaded blocks.
# TYPE cortex_bucket_store_blocks_loaded gauge
cortex_bucket_store_blocks_loaded{component="store-gateway"} 1

# HELP cortex_bucket_store_block_loads_total Total number of remote block loading attempts.
# TYPE cortex_bucket_store_block_loads_total counter
cortex_bucket_store_block_loads_total{component="store-gateway"} 1
`),
"cortex_bucket_store_blocks_loaded",
"cortex_bucket_store_block_loads_total",
))
})
}

func TestStoreGateway_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
test.VerifyNoLeak(t)

Expand Down Expand Up @@ -1359,9 +1542,9 @@ type mockShardingStrategy struct {
mock.Mock
}

func (m *mockShardingStrategy) FilterUsers(ctx context.Context, userIDs []string) []string {
func (m *mockShardingStrategy) FilterUsers(ctx context.Context, userIDs []string) ([]string, error) {
args := m.Called(ctx, userIDs)
return args.Get(0).([]string)
return args.Get(0).([]string), args.Error(1)
}

func (m *mockShardingStrategy) FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced *extprom.TxGaugeVec) error {
Expand Down
21 changes: 17 additions & 4 deletions pkg/storegateway/sharding_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
Expand All @@ -23,10 +24,14 @@ const (
shardExcludedMeta = "shard-excluded"
)

var (
errStoreGatewayUnhealthy = errors.New("store-gateway is unhealthy in the ring")
)

type ShardingStrategy interface {
// FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs
// that should be synced by the store-gateway.
FilterUsers(ctx context.Context, userIDs []string) []string
FilterUsers(ctx context.Context, userIDs []string) ([]string, error)

// FilterBlocks filters metas in-place keeping only blocks that should be loaded by the store-gateway.
// The provided loaded map contains blocks which have been previously returned by this function and
Expand Down Expand Up @@ -62,7 +67,16 @@ func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, l
}

// FilterUsers implements ShardingStrategy.
func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string {
func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) ([]string, error) {
// As a protection, ensure the store-gateway instance is healthy in the ring. It could also be missing
// in the ring if it was failing to heartbeat the ring and it got remove from another healthy store-gateway
// instance, because of the auto-forget feature.
if set, err := s.r.GetAllHealthy(BlocksOwnerSync); err != nil {
return nil, err
} else if !set.Includes(s.instanceAddr) {
return nil, errStoreGatewayUnhealthy
}

var filteredIDs []string

for _, userID := range userIDs {
Expand All @@ -74,15 +88,14 @@ func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []strin
}
}

return filteredIDs
return filteredIDs, nil
}

// FilterBlocks implements ShardingStrategy.
func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced *extprom.TxGaugeVec) error {
// As a protection, ensure the store-gateway instance is healthy in the ring. If it's unhealthy because it's failing
// to heartbeat or get updates from the ring, or even removed from the ring because of the auto-forget feature, then
// keep the previously loaded blocks.
// TODO test
if set, err := s.r.GetAllHealthy(BlocksOwnerSync); err != nil || !set.Includes(s.instanceAddr) {
for blockID := range metas {
if _, ok := loaded[blockID]; ok {
Expand Down
Loading