Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set shuffle sharding ingester lookback automatically #2110

Merged
merged 3 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
* [CHANGE] Default values have changed for the following settings. This improves query performance for recent data (within 12h) by only reading from ingesters: #1909 #1921
- `-blocks-storage.bucket-store.ignore-blocks-within` now defaults to `10h` (previously `0`)
- `-querier.query-store-after` now defaults to `12h` (previously `0`)
- `-querier.shuffle-sharding-ingesters-lookback-period` now defaults to `13h` (previously `0`)

* [CHANGE] The following settings are now classified as advanced because the defaults should work for most users and tuning them requires in-depth knowledge of how the read path works: #1929
- `-querier.query-ingesters-within`
- `-querier.query-store-after`
* [CHANGE] Config flag category overrides can be set dynamically at runtime. #1934
* [CHANGE] Ingester: deprecated `-ingester.ring.join-after`. Mimir now behaves as this setting is always set to 0s. This configuration option will be removed in Mimir 2.4.0. #1965
* [CHANGE] Blocks uploaded by ingester no longer contain `__org_id__` label. Compactor now ignores this label and will compact blocks with and without this label together. `mimirconvert` tool will remove the label from blocks as "unknown" label. #1972
* [CHANGE] Querier: deprecated `-querier.shuffle-sharding-ingesters-lookback-period`, instead adding `-querier.shuffle-sharding-ingesters-enabled` to enable or disable shuffle sharding on the read path. The value of `-querier.query-ingesters-within` is now used internally for shuffle sharding lookback. #2110
* [ENHANCEMENT] Distributor: Added limit to prevent tenants from sending excessive number of requests: #1843
* The following CLI flags (and their respective YAML config options) have been added:
* `-distributor.request-rate-limit`
Expand Down
10 changes: 5 additions & 5 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1491,13 +1491,13 @@
},
{
"kind": "field",
"name": "shuffle_sharding_ingesters_lookback_period",
"name": "shuffle_sharding_ingesters_enabled",
"required": false,
"desc": "When this setting is \u003e 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured -querier.query-store-after and -querier.query-ingesters-within. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).",
"desc": "Fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since -querier.query-ingesters-within. If this setting is false or -querier.query-ingesters-within is '0', queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).",
"fieldValue": null,
"fieldDefaultValue": 46800000000000,
"fieldFlag": "querier.shuffle-sharding-ingesters-lookback-period",
"fieldType": "duration",
"fieldDefaultValue": true,
"fieldFlag": "querier.shuffle-sharding-ingesters-enabled",
"fieldType": "boolean",
"fieldCategory": "advanced"
},
{
Expand Down
4 changes: 2 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1093,8 +1093,8 @@ Usage of ./cmd/mimir/mimir:
The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. If this option is enabled, the time range of the query sent to the store-gateway will be manipulated to ensure the query end is not more recent than 'now - query-store-after'. (default 12h0m0s)
-querier.scheduler-address string
Address of the query-scheduler component, in host:port format. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.
-querier.shuffle-sharding-ingesters-lookback-period duration
When this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured -querier.query-store-after and -querier.query-ingesters-within. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled). (default 13h0m0s)
-querier.shuffle-sharding-ingesters-enabled
Fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since -querier.query-ingesters-within. If this setting is false or -querier.query-ingesters-within is '0', queriers always query all ingesters (ingesters shuffle sharding on read path is disabled). (default true)
-querier.store-gateway-client.tls-ca-path string
Path to the CA certificates file to validate server certificate against. If not set, the host's root CA certificates are used.
-querier.store-gateway-client.tls-cert-path string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,15 @@ To enable shuffle sharding for ingesters on the write path, configure the follow
Assuming that you have enabled shuffle sharding for the write path, to enable shuffle sharding for ingesters on the read path, configure the following flags (or their respective YAML configuration options) on the querier and ruler:

- `-distributor.ingestion-tenant-shard-size=<size>`
- `-querier.shuffle-sharding-ingesters-lookback-period=<period>`<br />
Queriers and rulers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which might have received series since 'now - lookback period'.
The configured lookback `<period>` should be:
- greater than or equal to `-querier.query-store-after` and `-querier.query-ingesters-within` and,

The following flags are set appropriately by default to enable shuffle sharding for ingesters on the read path. If you need to modify their defaults:

- `-querier.shuffle-sharding-ingesters-enabled=true`<br />
Shuffle sharding for ingesters on the read path can be explicitly enabled or disabled.
- `-querier.query-ingesters-within=<period>`<br />
Queriers and rulers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which might have received series since 'now - query ingesters within'. If this period is `0`, shuffle sharding for ingesters on the read path is disabled, which means all ingesters in the Mimir cluster are queried for any tenant.
The configured `<period>` should be:
- greater than `-querier.query-store-after` and,
- greater than the estimated minimum amount of time for the oldest samples stored in a block uploaded by ingester to be discovered and available for querying.
When running Grafana Mimir with the default configuration, the estimated minimum amount of time for the oldest sample in a uploaded block to be available for querying is `3h`.

Expand All @@ -115,23 +120,24 @@ Keeping ingesters shuffle sharding enabled only on the write path does not lead

If you’re running a Grafana Mimir cluster with shuffle sharding disabled, and you want to enable it for the ingesters, use the following rollout strategy to avoid missing querying for any series currently in the ingesters:

1. Explicitly disable ingesters shuffle-sharding on the read path via `-querier.shuffle-sharding-ingesters-enabled=false` since this is enabled by default.
1. Enable ingesters shuffle sharding on the write path.
56quarters marked this conversation as resolved.
Show resolved Hide resolved
1. Wait for at least the amount of time specified via `-querier.shuffle-sharding-ingesters-lookback-period`.
1. Enable ingesters shuffle-sharding on the read path.
1. Wait for at least the amount of time specified via `-querier.query-ingesters-within`.
1. Enable ingesters shuffle-sharding on the read path via `-querier.shuffle-sharding-ingesters-enabled=true`.

#### Limitation: Decreasing the tenant shard size

The current shuffle sharding implementation in Grafana Mimir has a limitation that prevents you from safely decreasing the tenant shard size when you enable ingesters’ shuffle sharding on the read path.

If a tenant’s shard decreases in size, there is currently no way for the queriers and rulers to know how large the tenant shard was previously, and as a result, they potentially miss an ingester with data for that tenant.
The lookback mechanism, which is used to select the ingesters that might have received series since 'now - lookback period', doesn't work correctly if the tenant shard size is decreased.
The query-ingesters-within period, which is used to select the ingesters that might have received series since 'now - query ingesters within', doesn't work correctly for finding tenant shards if the tenant shard size is decreased.

Although decreasing the tenant shard size is not supported, consider the following workaround:

1. Disable shuffle sharding on the read path.
1. Disable shuffle sharding on the read path via `-querier.shuffle-sharding-ingesters-enabled=false`.
1. Decrease the configured tenant shard size.
1. Wait for at least the amount of time specified via `-querier.shuffle-sharding-ingesters-lookback-period`.
1. Re-enable shuffle sharding on the read path.
1. Wait for at least the amount of time specified via `-querier.query-ingesters-within`.
1. Re-enable shuffle sharding on the read path via `-querier.shuffle-sharding-ingesters-enabled=true`.

### Query-frontend and query-scheduler shuffle sharding

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,14 +836,13 @@ store_gateway_client:
# CLI flag: -querier.store-gateway-client.tls-insecure-skip-verify
[tls_insecure_skip_verify: <boolean> | default = false]

# (advanced) When this setting is > 0, queriers fetch in-memory series from the
# minimum set of required ingesters, selecting only ingesters which may have
# received series since 'now - lookback period'. The lookback period should be
# greater or equal than the configured -querier.query-store-after and
# -querier.query-ingesters-within. If this setting is 0, queriers always query
# all ingesters (ingesters shuffle sharding on read path is disabled).
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 13h]
# (advanced) Fetch in-memory series from the minimum set of required ingesters,
# selecting only ingesters which may have received series since
# -querier.query-ingesters-within. If this setting is false or
# -querier.query-ingesters-within is '0', queriers always query all ingesters
# (ingesters shuffle sharding on read path is disabled).
# CLI flag: -querier.shuffle-sharding-ingesters-enabled
[shuffle_sharding_ingesters_enabled: <boolean> | default = true]

# The maximum number of concurrent queries. This config option should be set on
# query-frontend too when query sharding is enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Complete the following steps to scale down ingesters deployed in a single zone.

```
-querier.query-store-after=0s
-querier.shuffle-sharding-ingesters-lookback-period=87600h
-querier.shuffle-sharding-ingesters-enabled=false
```

b. Configure the compactors to frequently update the bucket index:
Expand Down Expand Up @@ -106,10 +106,10 @@ Complete the following steps to scale down ingesters deployed in a single zone.

c. Send a `SIGINT` or `SIGTERM` signal to the process of the ingester to terminate.

d. Wait 10 minutes before proceeding with the next ingester. The temporarily configuration applied guarantees newly uploaded blocks are available for querying within 10 minutes.
d. Wait 10 minutes before proceeding with the next ingester. The temporarily applied configuration guarantees newly uploaded blocks are available for querying within 10 minutes.

1. Wait until the originally configured `-querier.query-store-after` period of time has elapsed since when all ingesters have been shutdown.
1. Revert the temporarily configuration changes done at the beginning of the scale down procedure.
1. Revert the temporary configuration changes done at the beginning of the scale down procedure.

#### Scaling down ingesters deployed in multiple zones

Expand Down
17 changes: 13 additions & 4 deletions integration/ingester_sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

func TestIngesterSharding(t *testing.T) {
const numSeriesToPush = 1000
const queryIngestersWithinSecs = 5

tests := map[string]struct {
tenantShardSize int
Expand All @@ -48,11 +49,13 @@ func TestIngesterSharding(t *testing.T) {

flags := BlocksStorageFlags()
flags["-distributor.ingestion-tenant-shard-size"] = strconv.Itoa(testData.tenantShardSize)

// Enable shuffle sharding on read path but not lookback, otherwise all ingesters would be
// queried being just registered.
// We're verifying that shuffle sharding on the read path works so we need to set `query-ingesters-within`
// to a small enough value that they'll have been part of the ring for long enough by the time we attempt
// to query back the values we wrote to them. If they _haven't_ been part of the ring for long enough, the
// query would be sent to all ingesters and our test wouldn't really be testing anything.
flags["-querier.query-store-after"] = "0"
flags["-querier.shuffle-sharding-ingesters-lookback-period"] = "1ns"
flags["-querier.query-ingesters-within"] = fmt.Sprintf("%ds", queryIngestersWithinSecs)
flags["-ingester.ring.heartbeat-period"] = "1s"

// Start dependencies.
consul := e2edb.NewConsul()
Expand All @@ -77,6 +80,11 @@ func TestIngesterSharding(t *testing.T) {
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Yes, we're sleeping in this test. We need to make sure that the ingesters have been part
// of the ring long enough before writing metrics to them to ensure that only the shuffle
// sharded ingesters will be queried for them when we go to verify the series written.
time.Sleep((queryIngestersWithinSecs + 1) * time.Second)

// Push series.
now := time.Now()
expectedVectors := map[string]model.Vector{}
Expand Down Expand Up @@ -109,6 +117,7 @@ func TestIngesterSharding(t *testing.T) {
}
}

// Verify that the expected number of ingesters had series (write path).
require.Equal(t, testData.expectedIngestersWithSeries, numIngestersWithSeries)
require.Equal(t, numSeriesToPush, totalIngestedSeries)

Expand Down
6 changes: 2 additions & 4 deletions operations/mimir/shuffle-sharding.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,8 @@
}
) + (
if !($._config.shuffle_sharding.ingester_write_path_enabled && !$._config.shuffle_sharding.ingester_read_path_enabled) then {} else {
// The shuffle-sharding flags in the ruler applies both to read and write path, so we don’t have a way
// to keep it enabled on the write path and disable it only on the read path. However, we can obtain the
// same effect setting the lookback period to a very high value.
'querier.shuffle-sharding-ingesters-lookback-period': '87600h', // 3650 days.
// If shuffle sharding is enabled for the write path but isn't enabled for the read path, Mimir will query all ingesters
'querier.shuffle-sharding-ingesters-enabled': 'false',
}
) + (
if !$._config.shuffle_sharding.store_gateway_enabled then {} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type Config struct {
// this (and should never use it) but this feature is used by other projects built on top of it
SkipLabelNameValidation bool `yaml:"-"`

// This config is dynamically injected because defined in the querier config.
// This config is dynamically injected because it is defined in the querier config.
ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`

// Limits for distributor
Expand Down
13 changes: 6 additions & 7 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2007,7 +2007,7 @@ func TestDistributor_MetricsMetadata(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Create distributor
ds, ingesters, _ := prepare(t, prepConfig{
ds, _, _ := prepare(t, prepConfig{
numIngesters: numIngesters,
happyIngesters: numIngesters,
numDistributors: 1,
Expand All @@ -2022,16 +2022,15 @@ func TestDistributor_MetricsMetadata(t *testing.T) {
_, err := ds[0].Push(ctx, req)
require.NoError(t, err)

// Check how many ingesters are queried as part of the shuffle sharding subring.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to add unit tests explicitly for GetIngestersForMetadata and GetIngestersForQuery but this test is already doing a similar verification. Swapped it to use this method instead of counting mock calls because we don't have to test both expectedIngesters and expectedIngesters - 1

replicationSet, err := ds[0].GetIngestersForMetadata(ctx)
require.NoError(t, err)
assert.Equal(t, testData.expectedIngesters, len(replicationSet.Instances))

// Assert on metric metadata
metadata, err := ds[0].MetricsMetadata(ctx)
require.NoError(t, err)
assert.Equal(t, 10, len(metadata))

// Check how many ingesters have been queried.
// Due to the quorum the distributor could cancel the last request towards ingesters
// if all other ones are successful, so we're good either has been queried X or X-1
// ingesters.
assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalls(ingesters, "MetricsMetadata"))
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
c.API.RegisterFlags(f)
c.registerServerFlagsWithChangedDefaultValues(f)
c.Distributor.RegisterFlags(f, logger)
c.Querier.RegisterFlags(f)
c.Querier.RegisterFlags(f, logger)
c.IngesterClient.RegisterFlags(f)
c.Ingester.RegisterFlags(f, logger)
c.Flusher.RegisterFlags(f)
Expand Down
9 changes: 8 additions & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,14 @@ func (t *Mimir) initOverridesExporter() (services.Service, error) {

func (t *Mimir) initDistributorService() (serv services.Service, err error) {
t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.ShuffleShardingIngestersLookbackPeriod

// Only enable shuffle sharding on the read path when `query-ingesters-within`
// is non-zero since otherwise we can't determine if an ingester should be part
// of a tenant's shuffle sharding subring (we compare its registration time with
// the lookback period).
if t.Cfg.Querier.ShuffleShardingIngestersEnabled && t.Cfg.Querier.QueryIngestersWithin > 0 {
t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.QueryIngestersWithin
}

// Check whether the distributor can join the distributors ring, which is
// whenever it's not running as an internal dependency (ie. querier or
Expand Down
Loading