Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow tenants to be included or excluded from store-gateways #7653

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Grafana Mimir

* [CHANGE] Querier: the CLI flag `-querier.minimize-ingester-requests` has been moved from "experimental" to "advanced". #7638
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
* [ENHANCEMENT] Store-gateway: Add `stage="wait_max_concurrent"` to `cortex_bucket_store_series_request_stage_duration_seconds` which records how long the query had to wait for its turn for `-blocks-storage.bucket-store.max-concurrent`. #7609
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
Expand Down
22 changes: 22 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -9975,6 +9975,28 @@
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "enabled_tenants",
"required": false,
"desc": "Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "store-gateway.enabled-tenants",
"fieldType": "string",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "disabled_tenants",
"required": false,
"desc": "Comma separated list of tenants that cannot be loaded by the store-gateway. If specified, and the store-gateway would normally load a given tenant for (via -store-gateway.enabled-tenants or sharding), it will be ignored instead.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "store-gateway.disabled-tenants",
"fieldType": "string",
"fieldCategory": "advanced"
}
],
"fieldValue": null,
Expand Down
4 changes: 4 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2659,6 +2659,10 @@ Usage of ./cmd/mimir/mimir:
Minimum TLS version to use. Allowed values: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. If blank, the Go TLS minimum version is used.
-shutdown-delay duration
How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Mimir will report not-ready status via /ready endpoint.
-store-gateway.disabled-tenants comma-separated-list-of-strings
Comma separated list of tenants that cannot be loaded by the store-gateway. If specified, and the store-gateway would normally load a given tenant for (via -store-gateway.enabled-tenants or sharding), it will be ignored instead.
-store-gateway.enabled-tenants comma-separated-list-of-strings
Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding.
-store-gateway.sharding-ring.auto-forget-enabled
When enabled, a store-gateway is automatically removed from the ring after failing to heartbeat the ring for a period longer than 10 times the configured -store-gateway.sharding-ring.heartbeat-timeout. (default true)
-store-gateway.sharding-ring.consul.acl-token string
Expand Down
13 changes: 13 additions & 0 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -4262,6 +4262,19 @@ sharding_ring:
# Unregister from the ring upon clean shutdown.
# CLI flag: -store-gateway.sharding-ring.unregister-on-shutdown
[unregister_on_shutdown: <boolean> | default = true]

# (advanced) Comma separated list of tenants that can be loaded by the
# store-gateway. If specified, only blocks for these tenants will be loaded by
# the store-gateway, otherwise all tenants can be loaded. Subject to sharding.
# CLI flag: -store-gateway.enabled-tenants
[enabled_tenants: <string> | default = ""]

# (advanced) Comma separated list of tenants that cannot be loaded by the
# store-gateway. If specified, and the store-gateway would normally load a given
# tenant for (via -store-gateway.enabled-tenants or sharding), it will be
# ignored instead.
# CLI flag: -store-gateway.disabled-tenants
[disabled_tenants: <string> | default = ""]
```

### memcached
Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,10 @@ func newMultitenantCompactor(
c.bucketCompactorMetrics = NewBucketCompactorMetrics(c.blocksMarkedForDeletion, registerer)

if len(compactorCfg.EnabledTenants) > 0 {
level.Info(c.logger).Log("msg", "compactor using enabled users", "enabled", strings.Join(compactorCfg.EnabledTenants, ", "))
level.Info(c.logger).Log("msg", "compactor using enabled users", "enabled", compactorCfg.EnabledTenants)
}
if len(compactorCfg.DisabledTenants) > 0 {
level.Info(c.logger).Log("msg", "compactor using disabled users", "disabled", strings.Join(compactorCfg.DisabledTenants, ", "))
level.Info(c.logger).Log("msg", "compactor using disabled users", "disabled", compactorCfg.DisabledTenants)
}

c.jobsOrder = GetJobsOrderFunction(compactorCfg.CompactionJobsOrder)
Expand Down
28 changes: 21 additions & 7 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/grafana/mimir/pkg/storegateway/indexcache"
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util"
util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
Expand Down Expand Up @@ -76,6 +77,9 @@ type BucketStores struct {
storesMu sync.RWMutex
stores map[string]*BucketStore

// Tenants that are specifically enabled or disabled via configuration
allowedTenants *util.AllowedTenants

// Metrics.
syncTimes prometheus.Histogram
syncLastSuccess prometheus.Gauge
Expand All @@ -85,7 +89,7 @@ type BucketStores struct {
}

// NewBucketStores makes a new BucketStores.
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, allowedTenants *util.AllowedTenants, limits *validation.Overrides, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
chunksCacheClient, err := cache.CreateClient("chunks-cache", cfg.BucketStore.ChunksCache.BackendConfig, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return nil, errors.Wrapf(err, "chunks-cache")
Expand Down Expand Up @@ -118,6 +122,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
limits: limits,
bucket: cachingBucket,
shardingStrategy: shardingStrategy,
allowedTenants: allowedTenants,
stores: map[string]*BucketStore{},
bucketStoreMetrics: NewBucketStoreMetrics(reg),
metaFetcherMetrics: NewMetadataFetcherMetrics(),
Expand Down Expand Up @@ -229,9 +234,6 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte
errs := tsdb_errors.NewMulti()
errsMx := sync.Mutex{}

// Scan users in the bucket. In case of error, it may return a subset of users. If we sync a subset of users
// during a periodic sync, we may end up unloading blocks for users that still belong to this store-gateway
// so we do prefer to not run the sync at all.
userIDs, err := u.scanUsers(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -360,10 +362,22 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues
return store.LabelValues(ctx, req)
}

// scanUsers in the bucket and return the list of found users. If an error occurs while
// iterating the bucket, it may return both an error and a subset of the users in the bucket.
// scanUsers in the bucket and return the list of found users, respecting any specifically
// enabled or disabled users.
func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) {
return tsdb.ListUsers(ctx, u.bucket)
users, err := tsdb.ListUsers(ctx, u.bucket)
if err != nil {
return nil, err
}

filtered := make([]string, 0, len(users))
for _, user := range users {
if u.allowedTenants.IsAllowed(user) {
filtered = append(filtered, user)
}
}

return filtered, nil
}

func (u *BucketStores) getStore(userID string) *BucketStore {
Expand Down
37 changes: 28 additions & 9 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ func TestBucketStores_InitialSync(t *testing.T) {
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

var allowedTenants *util.AllowedTenants
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, allowedTenants, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
require.NoError(t, err)

// Query series before the initial sync.
Expand Down Expand Up @@ -152,8 +153,9 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) {
// Wrap the bucket to fail the 1st Get() request.
bucket = &failFirstGetBucket{Bucket: bucket}

var allowedTenants *util.AllowedTenants
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, allowedTenants, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
require.NoError(t, err)

// Initial sync should succeed even if a transient error occurs.
Expand Down Expand Up @@ -213,8 +215,9 @@ func TestBucketStores_SyncBlocks(t *testing.T) {
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

var allowedTenants *util.AllowedTenants
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, allowedTenants, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
require.NoError(t, err)

// Run an initial sync to discover 1 block.
Expand Down Expand Up @@ -279,6 +282,7 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) {

tests := map[string]struct {
shardingStrategy ShardingStrategy
allowedTenants *util.AllowedTenants
expectedStores int32
}{
"when sharding is disabled all users should be synced": {
Expand All @@ -293,6 +297,17 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) {
}(),
expectedStores: 2,
},
"when user is disabled, their stores should not be created": {
shardingStrategy: newNoShardingStrategy(),
allowedTenants: util.NewAllowedTenants(nil, []string{"user-2"}),
expectedStores: 2,
},

"when single user is enabled, only their stores should be created": {
shardingStrategy: newNoShardingStrategy(),
allowedTenants: util.NewAllowedTenants([]string{"user-3"}, nil),
expectedStores: 1,
},
}

for testName, testData := range tests {
Expand All @@ -303,7 +318,7 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) {
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", allUsers, nil)

stores, err := NewBucketStores(cfg, testData.shardingStrategy, bucketClient, defaultLimitsOverrides(t), log.NewNopLogger(), nil)
stores, err := NewBucketStores(cfg, testData.shardingStrategy, bucketClient, testData.allowedTenants, defaultLimitsOverrides(t), log.NewNopLogger(), nil)
require.NoError(t, err)

// Sync user stores and count the number of times the callback is called.
Expand All @@ -315,7 +330,7 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) {

assert.NoError(t, err)
bucketClient.AssertNumberOfCalls(t, "Iter", 1)
assert.Equal(t, storesCount.Load(), testData.expectedStores)
assert.Equal(t, testData.expectedStores, storesCount.Load())
})
}
}
Expand Down Expand Up @@ -372,8 +387,9 @@ func TestBucketStores_ChunksAndSeriesLimiterFactoriesInitializedByEnforcedLimits
overrides, err := validation.NewOverrides(defaultLimits, validation.NewMockTenantLimits(testData.tenantLimits))
require.NoError(t, err)

var allowedTenants *util.AllowedTenants
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, overrides, log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, allowedTenants, overrides, log.NewNopLogger(), reg)
require.NoError(t, err)

store, err := stores.getOrCreateStore(userID)
Expand Down Expand Up @@ -424,8 +440,9 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

var allowedTenants *util.AllowedTenants
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, allowedTenants, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
require.NoError(t, err)

createBucketIndex(t, bucket, userID)
Expand Down Expand Up @@ -528,8 +545,9 @@ func TestBucketStore_Series_ShouldQueryBlockWithOutOfOrderChunks(t *testing.T) {

createBucketIndex(t, bkt, userID)

var allowedTenants *util.AllowedTenants
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bkt, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bkt, allowedTenants, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, stores.InitialSync(ctx))

Expand Down Expand Up @@ -693,8 +711,9 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) {

sharding := userShardingStrategy{}

var allowedTenants *util.AllowedTenants
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, &sharding, bucket, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, &sharding, bucket, allowedTenants, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
require.NoError(t, err)

// Perform sync.
Expand Down
17 changes: 16 additions & 1 deletion pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
Expand Down Expand Up @@ -53,11 +54,17 @@ var (
// Config holds the store gateway config.
type Config struct {
ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration."`

EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants" category:"advanced"`
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants" category:"advanced"`
}

// RegisterFlags registers the Config flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
cfg.ShardingRing.RegisterFlags(f, logger)

f.Var(&cfg.EnabledTenants, "store-gateway.enabled-tenants", "Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding.")
f.Var(&cfg.DisabledTenants, "store-gateway.disabled-tenants", "Comma separated list of tenants that cannot be loaded by the store-gateway. If specified, and the store-gateway would normally load a given tenant for (via -store-gateway.enabled-tenants or sharding), it will be ignored instead.")
}

// Validate the Config.
Expand Down Expand Up @@ -168,7 +175,15 @@ func newStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfi

shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger)

g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, limits, logger, prometheus.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg))
allowedTenants := util.NewAllowedTenants(gatewayCfg.EnabledTenants, gatewayCfg.DisabledTenants)
if len(gatewayCfg.EnabledTenants) > 0 {
level.Info(logger).Log("msg", "store-gateway using enabled users", "enabled", gatewayCfg.EnabledTenants)
}
if len(gatewayCfg.DisabledTenants) > 0 {
level.Info(logger).Log("msg", "store-gateway using disabled users", "disabled", gatewayCfg.DisabledTenants)
}

g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, allowedTenants, limits, logger, prometheus.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg))
if err != nil {
return nil, errors.Wrap(err, "create bucket stores")
}
Expand Down
Loading