Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow tenants to be included or excluded from store-gateways #7653

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Grafana Mimir

* [CHANGE] Querier: the CLI flag `-querier.minimize-ingester-requests` has been moved from "experimental" to "advanced". #7638
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
* [ENHANCEMENT] Store-gateway: Add `stage="wait_max_concurrent"` to `cortex_bucket_store_series_request_stage_duration_seconds` which records how long the query had to wait for its turn for `-blocks-storage.bucket-store.max-concurrent`. #7609
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
Expand Down
22 changes: 22 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -9975,6 +9975,28 @@
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "enabled_tenants",
"required": false,
"desc": "Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "store-gateway.enabled-tenants",
"fieldType": "string",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "disabled_tenants",
"required": false,
"desc": "Comma separated list of tenants that cannot be loaded by the store-gateway. If specified, and the store-gateway would normally load a given tenant for (via -store-gateway.enabled-tenants or sharding), it will be ignored instead.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "store-gateway.disabled-tenants",
"fieldType": "string",
"fieldCategory": "advanced"
}
],
"fieldValue": null,
Expand Down
4 changes: 4 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2659,6 +2659,10 @@ Usage of ./cmd/mimir/mimir:
Minimum TLS version to use. Allowed values: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. If blank, the Go TLS minimum version is used.
-shutdown-delay duration
How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Mimir will report not-ready status via /ready endpoint.
-store-gateway.disabled-tenants comma-separated-list-of-strings
Comma separated list of tenants that cannot be loaded by the store-gateway. If specified, and the store-gateway would normally load a given tenant for (via -store-gateway.enabled-tenants or sharding), it will be ignored instead.
-store-gateway.enabled-tenants comma-separated-list-of-strings
Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding.
-store-gateway.sharding-ring.auto-forget-enabled
When enabled, a store-gateway is automatically removed from the ring after failing to heartbeat the ring for a period longer than 10 times the configured -store-gateway.sharding-ring.heartbeat-timeout. (default true)
-store-gateway.sharding-ring.consul.acl-token string
Expand Down
13 changes: 13 additions & 0 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -4262,6 +4262,19 @@ sharding_ring:
# Unregister from the ring upon clean shutdown.
# CLI flag: -store-gateway.sharding-ring.unregister-on-shutdown
[unregister_on_shutdown: <boolean> | default = true]

# (advanced) Comma separated list of tenants that can be loaded by the
# store-gateway. If specified, only blocks for these tenants will be loaded by
# the store-gateway, otherwise all tenants can be loaded. Subject to sharding.
# CLI flag: -store-gateway.enabled-tenants
[enabled_tenants: <string> | default = ""]

# (advanced) Comma separated list of tenants that cannot be loaded by the
# store-gateway. If specified, and the store-gateway would normally load a given
# tenant for (via -store-gateway.enabled-tenants or sharding), it will be
# ignored instead.
# CLI flag: -store-gateway.disabled-tenants
[disabled_tenants: <string> | default = ""]
```

### memcached
Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,10 @@ func newMultitenantCompactor(
c.bucketCompactorMetrics = NewBucketCompactorMetrics(c.blocksMarkedForDeletion, registerer)

if len(compactorCfg.EnabledTenants) > 0 {
level.Info(c.logger).Log("msg", "compactor using enabled users", "enabled", strings.Join(compactorCfg.EnabledTenants, ", "))
level.Info(c.logger).Log("msg", "compactor using enabled users", "enabled", compactorCfg.EnabledTenants)
}
if len(compactorCfg.DisabledTenants) > 0 {
level.Info(c.logger).Log("msg", "compactor using disabled users", "disabled", strings.Join(compactorCfg.DisabledTenants, ", "))
level.Info(c.logger).Log("msg", "compactor using disabled users", "disabled", compactorCfg.DisabledTenants)
}

c.jobsOrder = GetJobsOrderFunction(compactorCfg.CompactionJobsOrder)
Expand Down
17 changes: 16 additions & 1 deletion pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
Expand Down Expand Up @@ -53,11 +54,17 @@ var (
// Config holds the store gateway config.
type Config struct {
ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration."`

EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants" category:"advanced"`
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants" category:"advanced"`
}

// RegisterFlags registers the Config flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
cfg.ShardingRing.RegisterFlags(f, logger)

f.Var(&cfg.EnabledTenants, "store-gateway.enabled-tenants", "Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding.")
f.Var(&cfg.DisabledTenants, "store-gateway.disabled-tenants", "Comma separated list of tenants that cannot be loaded by the store-gateway. If specified, and the store-gateway would normally load a given tenant for (via -store-gateway.enabled-tenants or sharding), it will be ignored instead.")
}

// Validate the Config.
Expand Down Expand Up @@ -166,7 +173,15 @@ func newStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfi
return nil, errors.Wrap(err, "create ring client")
}

shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger)
if len(gatewayCfg.EnabledTenants) > 0 {
level.Info(logger).Log("msg", "store-gateway using enabled users", "enabled", gatewayCfg.EnabledTenants)
}
if len(gatewayCfg.DisabledTenants) > 0 {
level.Info(logger).Log("msg", "store-gateway using disabled users", "disabled", gatewayCfg.DisabledTenants)
}

allowedTenants := util.NewAllowedTenants(gatewayCfg.EnabledTenants, gatewayCfg.DisabledTenants)
shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, allowedTenants, limits, logger)

g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, limits, logger, prometheus.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg))
if err != nil {
Expand Down
47 changes: 35 additions & 12 deletions pkg/storegateway/sharding_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/util"
)

const (
Expand Down Expand Up @@ -46,21 +47,23 @@ type ShardingLimits interface {
// ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways,
// where each tenant blocks are sharded across a subset of store-gateway instances.
type ShuffleShardingStrategy struct {
r *ring.Ring
instanceID string
instanceAddr string
limits ShardingLimits
logger log.Logger
r *ring.Ring
instanceID string
instanceAddr string
allowedTenants *util.AllowedTenants
limits ShardingLimits
logger log.Logger
}

// NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.
func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy {
func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, allowedTenants *util.AllowedTenants, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy {
return &ShuffleShardingStrategy{
r: r,
instanceID: instanceID,
instanceAddr: instanceAddr,
limits: limits,
logger: logger,
r: r,
instanceID: instanceID,
instanceAddr: instanceAddr,
allowedTenants: allowedTenants,
limits: limits,
logger: logger,
}
}

Expand All @@ -78,9 +81,14 @@ func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []strin
var filteredIDs []string

for _, userID := range userIDs {
subRing := GetShuffleShardingSubring(s.r, userID, s.limits)
// If this user has been explicitly disabled or _other_ users have been explicitly
// enabled and this user isn't, don't include this user in the filtered list of users.
if !s.allowedTenants.IsAllowed(userID) {
continue
}

// Include the user only if it belongs to this store-gateway shard.
subRing := GetShuffleShardingSubring(s.r, userID, s.limits)
if subRing.HasInstance(s.instanceID) {
filteredIDs = append(filteredIDs, userID)
}
Expand Down Expand Up @@ -110,6 +118,21 @@ func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string,
return nil
}

// If this user has been explicitly disabled or _other_ users have been explicitly
// enabled and this user isn't, don't sync any blocks for this particular user. This
// should not happen in practice since the user would have been filtered before this
// method is called but, we check just in case.
if !s.allowedTenants.IsAllowed(userID) {
level.Warn(s.logger).Log("msg", "user is disabled by configuration but block sync is still being attempted", "user", userID)

for blockID := range metas {
synced.WithLabelValues(shardExcludedMeta).Inc()
delete(metas, blockID)
}

return nil
}
56quarters marked this conversation as resolved.
Show resolved Hide resolved

r := GetShuffleShardingSubring(s.r, userID, s.limits)
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()

Expand Down
40 changes: 38 additions & 2 deletions pkg/storegateway/sharding_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/extprom"
)

Expand Down Expand Up @@ -59,6 +60,7 @@ func TestShuffleShardingStrategy(t *testing.T) {
replicationFactor int
limits ShardingLimits
setupRing func(*ring.Desc)
allowedTenants *util.AllowedTenants
prevLoadedBlocks map[string]map[ulid.ULID]struct{}
expectedUsers []usersExpectation
expectedBlocks []blocksExpectation
Expand Down Expand Up @@ -349,6 +351,40 @@ func TestShuffleShardingStrategy(t *testing.T) {
{instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{block2, block4}},
},
},
"RF = 2, SS = 2, user explicitly disabled": {
replicationFactor: 2,
limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2},
allowedTenants: util.NewAllowedTenants(nil, []string{userID}),
setupRing: func(r *ring.Desc) {
r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt)
r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt)
},
expectedUsers: []usersExpectation{
{instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string(nil)},
{instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string(nil)},
},
expectedBlocks: []blocksExpectation{
{instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{}},
{instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{}},
},
},
"RF = 2, SS = 2, different user explicitly enabled": {
replicationFactor: 2,
limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2},
allowedTenants: util.NewAllowedTenants([]string{"different-user"}, nil),
setupRing: func(r *ring.Desc) {
r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt)
r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt)
},
expectedUsers: []usersExpectation{
{instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string(nil)},
{instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string(nil)},
},
expectedBlocks: []blocksExpectation{
{instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{}},
{instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{}},
},
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -385,15 +421,15 @@ func TestShuffleShardingStrategy(t *testing.T) {

// Assert on filter users.
for _, expected := range testData.expectedUsers {
filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger())
filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.allowedTenants, testData.limits, log.NewNopLogger())
actualUsers, err := filter.FilterUsers(ctx, []string{userID})
assert.Equal(t, expected.err, err)
assert.Equal(t, expected.users, actualUsers)
}

// Assert on filter blocks.
for _, expected := range testData.expectedBlocks {
filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger())
filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.allowedTenants, testData.limits, log.NewNopLogger())
synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"})
synced.WithLabelValues(shardExcludedMeta).Set(0)

Expand Down
Loading