Skip to content

Commit

Permalink
Introduced min setting for -compactor.partial-block-deletion-delay (#…
Browse files Browse the repository at this point in the history
…2787)

* Introduced min setting for -compactor.partial-block-deletion-delay

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fixed typo

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Added comment

Signed-off-by: Marco Pracucci <marco@pracucci.com>

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Aug 22, 2022
1 parent ec587dd commit b1121e9
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [CHANGE] Limits: change the default value of `max_global_series_per_metric` limit to `0` (disabled). Setting this limit by default does not provide much benefit because series are sharded by all labels. #2714
* [CHANGE] Ingester: experimental `-blocks-storage.tsdb.new-chunk-disk-mapper` has been removed, new chunk disk mapper is now always used, and is no longer marked experimental. Default value of `-blocks-storage.tsdb.head-chunks-write-queue-size` has changed to 1000000, this enables async chunk queue by default, which leads to improved latency on the write path when new chunks are created in ingesters. #2762
* [CHANGE] Ingester: removed deprecated `-blocks-storage.tsdb.isolation-enabled` option. TSDB-level isolation is now always disabled in Mimir. #2782
* [CHANGE] Compactor: `-compactor.partial-block-deletion-delay` must either be set to 0 (to disable partial blocks deletion) or a value higher than `4h`. #2787
* [FEATURE] Introduced an experimental anonymous usage statistics tracking (disabled by default), to help Mimir maintainers driving better decisions to support the opensource community. The tracking system anonymously collects non-sensitive and non-personal identifiable information about the running Mimir cluster, and is disabled by default. #2643 #2662 #2685 #2732 #2735
* [FEATURE] Introduced an experimental deployment mode called read-write and running a fully featured Mimir cluster with three components: write, read and backend. The read-write deployment mode is a trade-off between the monolithic mode (only one component, no isolation) and the microservices mode (many components, high isolation). #2754
* [FEATURE] Query-frontend: introduced experimental support to split instant queries by time. The instant query splitting can be enabled setting `-query-frontend.split-instant-queries-by-interval`. #2469 #2564 #2565 #2570 #2571 #2572 #2573 #2574 #2575 #2576 #2581 #2582 #2601 #2632 #2633 #2634 #2641 #2642 #2766
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2961,7 +2961,7 @@
"kind": "field",
"name": "compactor_partial_block_deletion_delay",
"required": false,
"desc": "If a partial block (unfinished block without meta.json file) hasn't been modified for this time, it will be marked for deletion. 0 to disable.",
"desc": "If a partial block (unfinished block without meta.json file) hasn't been modified for this time, it will be marked for deletion. The minimum accepted value is 4h0m0s: a lower value will be ignored and the feature disabled. 0 to disable.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "compactor.partial-block-deletion-delay",
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ Usage of ./cmd/mimir/mimir:
-compactor.meta-sync-concurrency int
Number of Go routines to use when syncing block meta files from the long term storage. (default 20)
-compactor.partial-block-deletion-delay duration
If a partial block (unfinished block without meta.json file) hasn't been modified for this time, it will be marked for deletion. 0 to disable.
If a partial block (unfinished block without meta.json file) hasn't been modified for this time, it will be marked for deletion. The minimum accepted value is 4h0m0s: a lower value will be ignored and the feature disabled. 0 to disable.
-compactor.ring.consul.acl-token string
ACL Token used to interact with Consul.
-compactor.ring.consul.cas-retry-delay duration
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ Usage of ./cmd/mimir/mimir:
-compactor.data-dir string
Directory to temporarily store blocks during compaction. This directory is not required to be persisted between restarts. (default "./data-compactor/")
-compactor.partial-block-deletion-delay duration
If a partial block (unfinished block without meta.json file) hasn't been modified for this time, it will be marked for deletion. 0 to disable.
If a partial block (unfinished block without meta.json file) hasn't been modified for this time, it will be marked for deletion. The minimum accepted value is 4h0m0s: a lower value will be ignored and the feature disabled. 0 to disable.
-compactor.ring.consul.hostname string
Hostname and port of Consul. (default "localhost:8500")
-compactor.ring.etcd.endpoints string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2554,7 +2554,9 @@ The `limits` block configures default and per-tenant limits imposed by component
[compactor_tenant_shard_size: <int> | default = 0]

# If a partial block (unfinished block without meta.json file) hasn't been
# modified for this time, it will be marked for deletion. 0 to disable.
# modified for this time, it will be marked for deletion. The minimum accepted
# value is 4h0m0s: a lower value will be ignored and the feature disabled. 0 to
# disable.
# CLI flag: -compactor.partial-block-deletion-delay
[compactor_partial_block_deletion_delay: <duration> | default = 0s]

Expand Down
6 changes: 5 additions & 1 deletion pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/grafana/mimir/pkg/storage/tsdb/bucketindex"
"github.com/grafana/mimir/pkg/util"
util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/validation"
)

const (
Expand Down Expand Up @@ -357,10 +358,13 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string) (returnErr
// error if the cleanup of partial blocks fail.
if len(partials) > 0 {
var partialDeletionCutoffTime time.Time // zero value, disabled.
if delay := c.cfgProvider.CompactorPartialBlockDeletionDelay(userID); delay > 0 {
if delay, valid := c.cfgProvider.CompactorPartialBlockDeletionDelay(userID); delay > 0 {
// enable cleanup of partial blocks without deletion marker
partialDeletionCutoffTime = time.Now().Add(-delay)
} else if !valid {
level.Warn(userLogger).Log("msg", "partial blocks deletion has been disabled for tenant because the delay has been set lower than the minimum value allowed", "minimum", validation.MinCompactorPartialBlockDeletionDelay)
}

c.cleanUserPartialBlocks(ctx, partials, idx, partialDeletionCutoffTime, userBucket, userLogger)
}

Expand Down
88 changes: 75 additions & 13 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -849,6 +850,65 @@ func TestBlocksCleaner_ShouldNotRemovePartialBlocksInsideDelayPeriod(t *testing.
))
}

func TestBlocksCleaner_ShouldNotRemovePartialBlocksIfConfiguredDelayIsInvalid(t *testing.T) {
ctx := context.Background()
reg := prometheus.NewPedanticRegistry()
logs := &concurrency.SyncBuffer{}
logger := log.NewLogfmtLogger(logs)

bucketClient, _ := mimir_testutil.PrepareFilesystemBucket(t)
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)

ts := func(hours int) int64 {
return time.Now().Add(time.Duration(hours)*time.Hour).Unix() * 1000
}

// Create a partial block.
block1 := createTSDBBlock(t, bucketClient, "user-1", ts(-10), ts(-8), 2, nil)
err := bucketClient.Delete(ctx, path.Join("user-1", block1.String(), metadata.MetaFilename))
require.NoError(t, err)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
}

// Configure an invalid delay.
cfgProvider := newMockConfigProvider()
cfgProvider.userPartialBlockDelay["user-1"] = 0
cfgProvider.userPartialBlockDelayInvalid["user-1"] = true

// Pre-condition check: block should be partial and not being marked for deletion.
checkBlock(t, "user-1", bucketClient, block1, false, false)

// Run the cleanup.
cleaner := NewBlocksCleaner(cfg, bucketClient, tsdb.AllUsers, cfgProvider, logger, reg)
require.NoError(t, cleaner.cleanUser(ctx, "user-1"))

// Ensure the block has NOT been marked for deletion.
checkBlock(t, "user-1", bucketClient, block1, false, false)
assert.Contains(t, logs.String(), "partial blocks deletion has been disabled for tenant because the delay has been set lower than the minimum value allowed")

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_bucket_blocks_count Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks.
# TYPE cortex_bucket_blocks_count gauge
cortex_bucket_blocks_count{user="user-1"} 0
# HELP cortex_bucket_blocks_marked_for_deletion_count Total number of blocks marked for deletion in the bucket.
# TYPE cortex_bucket_blocks_marked_for_deletion_count gauge
cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 0
# HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor.
# TYPE cortex_compactor_blocks_marked_for_deletion_total counter
cortex_compactor_blocks_marked_for_deletion_total{reason="partial"} 0
cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
"cortex_compactor_blocks_marked_for_deletion_total",
))
}

func TestFindMostRecentModifiedTimeForBlock(t *testing.T) {
b, dir := mimir_testutil.PrepareFilesystemBucket(t)

Expand Down Expand Up @@ -890,21 +950,23 @@ func (m *mockBucketFailure) Delete(ctx context.Context, name string) error {
}

type mockConfigProvider struct {
userRetentionPeriods map[string]time.Duration
splitAndMergeShards map[string]int
instancesShardSize map[string]int
splitGroups map[string]int
blockUploadEnabled map[string]bool
userPartialBlockDelay map[string]time.Duration
userRetentionPeriods map[string]time.Duration
splitAndMergeShards map[string]int
instancesShardSize map[string]int
splitGroups map[string]int
blockUploadEnabled map[string]bool
userPartialBlockDelay map[string]time.Duration
userPartialBlockDelayInvalid map[string]bool
}

func newMockConfigProvider() *mockConfigProvider {
return &mockConfigProvider{
userRetentionPeriods: make(map[string]time.Duration),
splitAndMergeShards: make(map[string]int),
splitGroups: make(map[string]int),
blockUploadEnabled: make(map[string]bool),
userPartialBlockDelay: make(map[string]time.Duration),
userRetentionPeriods: make(map[string]time.Duration),
splitAndMergeShards: make(map[string]int),
splitGroups: make(map[string]int),
blockUploadEnabled: make(map[string]bool),
userPartialBlockDelay: make(map[string]time.Duration),
userPartialBlockDelayInvalid: make(map[string]bool),
}
}

Expand Down Expand Up @@ -940,8 +1002,8 @@ func (m *mockConfigProvider) CompactorBlockUploadEnabled(tenantID string) bool {
return m.blockUploadEnabled[tenantID]
}

func (m *mockConfigProvider) CompactorPartialBlockDeletionDelay(user string) time.Duration {
return m.userPartialBlockDelay[user]
func (m *mockConfigProvider) CompactorPartialBlockDeletionDelay(user string) (time.Duration, bool) {
return m.userPartialBlockDelay[user], !m.userPartialBlockDelayInvalid[user]
}

func (m *mockConfigProvider) S3SSEType(user string) string {
Expand Down
6 changes: 4 additions & 2 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,10 @@ type ConfigProvider interface {
// CompactorTenantShardSize returns number of compactors that this user can use. 0 = all compactors.
CompactorTenantShardSize(userID string) int

// CompactorPartialBlockDeletionDelay returns the partial block delay time period for a given user.
CompactorPartialBlockDeletionDelay(userID string) time.Duration
// CompactorPartialBlockDeletionDelay returns the partial block delay time period for a given user,
// and whether the configured value was valid. If the value wasn't valid, the returned delay is the default one
// and the caller is responsible to warn the Mimir operator about it.
CompactorPartialBlockDeletionDelay(userID string) (delay time.Duration, valid bool)

// CompactorBlockUploadEnabled returns whether block upload is enabled for a given tenant.
CompactorBlockUploadEnabled(tenantID string) bool
Expand Down
20 changes: 16 additions & 4 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ const (
ingestionRateFlag = "distributor.ingestion-rate-limit"
ingestionBurstSizeFlag = "distributor.ingestion-burst-size"
HATrackerMaxClustersFlag = "distributor.ha-tracker.max-clusters"

// MinCompactorPartialBlockDeletionDelay is the minimum partial blocks deletion delay that can be configured in Mimir.
MinCompactorPartialBlockDeletionDelay = 4 * time.Hour
)

// LimitError are errors that do not comply with the limits specified.
Expand Down Expand Up @@ -214,7 +217,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.CompactorSplitAndMergeShards, "compactor.split-and-merge-shards", 0, "The number of shards to use when splitting blocks. 0 to disable splitting.")
f.IntVar(&l.CompactorSplitGroups, "compactor.split-groups", 1, "Number of groups that blocks for splitting should be grouped into. Each group of blocks is then split separately. Number of output split shards is controlled by -compactor.split-and-merge-shards.")
f.IntVar(&l.CompactorTenantShardSize, "compactor.compactor-tenant-shard-size", 0, "Max number of compactors that can compact blocks for single tenant. 0 to disable the limit and use all compactors.")
f.Var(&l.CompactorPartialBlockDeletionDelay, "compactor.partial-block-deletion-delay", fmt.Sprintf("If a partial block (unfinished block without %s file) hasn't been modified for this time, it will be marked for deletion. 0 to disable.", block.MetaFilename))
f.Var(&l.CompactorPartialBlockDeletionDelay, "compactor.partial-block-deletion-delay", fmt.Sprintf("If a partial block (unfinished block without %s file) hasn't been modified for this time, it will be marked for deletion. The minimum accepted value is %s: a lower value will be ignored and the feature disabled. 0 to disable.", block.MetaFilename, MinCompactorPartialBlockDeletionDelay.String()))
f.BoolVar(&l.CompactorBlockUploadEnabled, "compactor.block-upload-enabled", false, "Enable block upload API for the tenant.")

// Store-gateway.
Expand Down Expand Up @@ -552,9 +555,18 @@ func (o *Overrides) CompactorSplitGroups(userID string) int {
return o.getOverridesForUser(userID).CompactorSplitGroups
}

// CompactorPartialBlockDeletionDelay returns the partial block deletion delay time period for a given user.
func (o *Overrides) CompactorPartialBlockDeletionDelay(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).CompactorPartialBlockDeletionDelay)
// CompactorPartialBlockDeletionDelay returns the partial block deletion delay time period for a given user,
// and whether the configured value was valid. If the value wasn't valid, the returned delay is the default one
// and the caller is responsible to warn the Mimir operator about it.
func (o *Overrides) CompactorPartialBlockDeletionDelay(userID string) (delay time.Duration, valid bool) {
delay = time.Duration(o.getOverridesForUser(userID).CompactorPartialBlockDeletionDelay)

// Forcefully disable partial blocks deletion if the configured delay is too low.
if delay > 0 && delay < MinCompactorPartialBlockDeletionDelay {
return 0, false
}

return delay, true
}

// CompactorBlockUploadEnabled returns whether block upload is enabled for a certain tenant.
Expand Down

0 comments on commit b1121e9

Please sign in to comment.