diff --git a/CHANGELOG.md b/CHANGELOG.md index c84cab9815..8f01a3c35b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,11 @@ ## master / unreleased -* [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels. * [CHANGE] Fix incorrectly named `cortex_cache_fetched_keys` and `cortex_cache_hits` metrics. Renamed to `cortex_cache_fetched_keys_total` and `cortex_cache_hits_total` respectively. #4686 * [CHANGE] Enable Thanos series limiter in store-gateway. #4702 * [CHANGE] Distributor: Apply `max_fetched_series_per_query` limit for `/series` API. #4683 +* [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels. +* [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction ## 1.12.0 in progress diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index d52f88bb4e..3d3bb0159f 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -147,6 +147,11 @@ compactor: # CLI flag: -compactor.tenant-cleanup-delay [tenant_cleanup_delay: | default = 6h] + # When enabled, mark blocks containing index with out-of-order chunks for no + # compact instead of halting the compaction. + # CLI flag: -compactor.skip-blocks-with-out-of-order-chunks-enabled + [skip_blocks_with_out_of_order_chunks_enabled: | default = false] + # When enabled, at compactor startup the bucket will be scanned and all found # deletion marks inside the block location will be copied to the markers # global location too. This option can (and should) be safely disabled as soon diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 7407c7c922..e6f3c5ea7a 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -5294,6 +5294,11 @@ The `compactor_config` configures the compactor for the blocks storage. # CLI flag: -compactor.tenant-cleanup-delay [tenant_cleanup_delay: | default = 6h] +# When enabled, mark blocks containing index with out-of-order chunks for no +# compact instead of halting the compaction. +# CLI flag: -compactor.skip-blocks-with-out-of-order-chunks-enabled +[skip_blocks_with_out_of_order_chunks_enabled: | default = false] + # When enabled, at compactor startup the bucket will be scanned and all found # deletion marks inside the block location will be copied to the markers global # location too. This option can (and should) be safely disabled as soon as the diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 2661123fa1..345d0abb00 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -51,7 +51,7 @@ var ( supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} errInvalidShardingStrategy = errors.New("invalid sharding strategy") - DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper { + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter) compact.Grouper { return compact.NewDefaultGrouper( logger, bkt, @@ -60,11 +60,11 @@ var ( reg, blocksMarkedForDeletion, garbageCollectedBlocks, - prometheus.NewCounter(prometheus.CounterOpts{}), + blocksMarkedForNoCompaction, metadata.NoneFunc) } - ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper { + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter) compact.Grouper { return NewShuffleShardingGrouper( logger, bkt, @@ -72,30 +72,36 @@ var ( true, // Enable vertical compaction reg, blocksMarkedForDeletion, - prometheus.NewCounter(prometheus.CounterOpts{}), + blocksMarkedForNoCompaction, garbageCollectedBlocks, metadata.NoneFunc, cfg) } - DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, compact.Planner, error) { + DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil) if err != nil { return nil, nil, err } - planner := compact.NewTSDBBasedPlanner(logger, cfg.BlockRanges.ToMilliseconds()) - return compactor, planner, nil + plannerFactory := func(logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Planner { + return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter) + } + + return compactor, plannerFactory, nil } - ShuffleShardingBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, compact.Planner, error) { + ShuffleShardingBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil) if err != nil { return nil, nil, err } - planner := NewShuffleShardingPlanner(logger, cfg.BlockRanges.ToMilliseconds()) - return compactor, planner, nil + plannerFactory := func(logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Planner { + + return NewShuffleShardingPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks) + } + return compactor, plannerFactory, nil } ) @@ -107,6 +113,7 @@ type BlocksGrouperFactory func( logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, + blocksMarkedForNoCompact prometheus.Counter, garbageCollectedBlocks prometheus.Counter, ) compact.Grouper @@ -116,22 +123,29 @@ type BlocksCompactorFactory func( cfg Config, logger log.Logger, reg prometheus.Registerer, -) (compact.Compactor, compact.Planner, error) +) (compact.Compactor, PlannerFactory, error) + +type PlannerFactory func( + logger log.Logger, + cfg Config, + noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, +) compact.Planner // Config holds the Compactor config. type Config struct { - BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"` - BlockSyncConcurrency int `yaml:"block_sync_concurrency"` - MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` - ConsistencyDelay time.Duration `yaml:"consistency_delay"` - DataDir string `yaml:"data_dir"` - CompactionInterval time.Duration `yaml:"compaction_interval"` - CompactionRetries int `yaml:"compaction_retries"` - CompactionConcurrency int `yaml:"compaction_concurrency"` - CleanupInterval time.Duration `yaml:"cleanup_interval"` - CleanupConcurrency int `yaml:"cleanup_concurrency"` - DeletionDelay time.Duration `yaml:"deletion_delay"` - TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"` + BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"` + BlockSyncConcurrency int `yaml:"block_sync_concurrency"` + MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` + ConsistencyDelay time.Duration `yaml:"consistency_delay"` + DataDir string `yaml:"data_dir"` + CompactionInterval time.Duration `yaml:"compaction_interval"` + CompactionRetries int `yaml:"compaction_retries"` + CompactionConcurrency int `yaml:"compaction_concurrency"` + CleanupInterval time.Duration `yaml:"cleanup_interval"` + CleanupConcurrency int `yaml:"cleanup_concurrency"` + DeletionDelay time.Duration `yaml:"deletion_delay"` + TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"` + SkipBlocksWithOutOfOrderChunksEnabled bool `yaml:"skip_blocks_with_out_of_order_chunks_enabled"` // Whether the migration of block deletion marks to the global markers location is enabled. BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"` @@ -180,6 +194,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { "If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.") f.DurationVar(&cfg.TenantCleanupDelay, "compactor.tenant-cleanup-delay", 6*time.Hour, "For tenants marked for deletion, this is time between deleting of last block, and doing final cleanup (marker files, debug files) of the tenant.") f.BoolVar(&cfg.BlockDeletionMarksMigrationEnabled, "compactor.block-deletion-marks-migration-enabled", false, "When enabled, at compactor startup the bucket will be scanned and all found deletion marks inside the block location will be copied to the markers global location too. This option can (and should) be safely disabled as soon as the compactor has successfully run at least once.") + f.BoolVar(&cfg.SkipBlocksWithOutOfOrderChunksEnabled, "compactor.skip-blocks-with-out-of-order-chunks-enabled", false, "When enabled, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction.") f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.") f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.") @@ -231,9 +246,10 @@ type Compactor struct { // Blocks cleaner is responsible to hard delete blocks marked for deletion. blocksCleaner *BlocksCleaner - // Underlying compactor and planner used to compact TSDB blocks. + // Underlying compactor used to compact TSDB blocks. blocksCompactor compact.Compactor - blocksPlanner compact.Planner + + blocksPlannerFactory PlannerFactory // Client used to run operations on the bucket storing blocks. bucketClient objstore.Bucket @@ -255,6 +271,7 @@ type Compactor struct { compactionRunFailedTenants prometheus.Gauge compactionRunInterval prometheus.Gauge blocksMarkedForDeletion prometheus.Counter + blocksMarkedForNoCompaction prometheus.Counter garbageCollectedBlocks prometheus.Counter // TSDB syncer metrics @@ -357,6 +374,10 @@ func newCompactor( Help: blocksMarkedForDeletionHelp, ConstLabels: prometheus.Labels{"reason": "compaction"}, }), + blocksMarkedForNoCompaction: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_blocks_marked_for_no_compaction_total", + Help: "Total number of blocks marked for no compact during a compaction run.", + }), garbageCollectedBlocks: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_garbage_collected_blocks_total", Help: "Total number of blocks marked for deletion by compactor.", @@ -389,7 +410,7 @@ func (c *Compactor) starting(ctx context.Context) error { } // Create blocks compactor dependencies. - c.blocksCompactor, c.blocksPlanner, err = c.blocksCompactorFactory(ctx, c.compactorCfg, c.logger, c.registerer) + c.blocksCompactor, c.blocksPlannerFactory, err = c.blocksCompactorFactory(ctx, c.compactorCfg, c.logger, c.registerer) if err != nil { return errors.Wrap(err, "failed to initialize compactor dependencies") } @@ -657,6 +678,10 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { 0, c.compactorCfg.MetaSyncConcurrency) + // Filters out blocks with no compaction maker; blocks can be marked as no compaction for reasons like + // out of order chunks or index file too big. + noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(ulogger, bucket, c.compactorCfg.MetaSyncConcurrency) + fetcher, err := block.NewMetaFetcher( ulogger, c.compactorCfg.MetaSyncConcurrency, @@ -671,6 +696,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg), ignoreDeletionMarkFilter, deduplicateBlocksFilter, + noCompactMarkerFilter, }, nil, ) @@ -696,13 +722,13 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { compactor, err := compact.NewBucketCompactor( ulogger, syncer, - c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks), - c.blocksPlanner, + c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks), + c.blocksPlannerFactory(ulogger, c.compactorCfg, noCompactMarkerFilter), c.blocksCompactor, path.Join(c.compactorCfg.DataDir, "compact"), bucket, c.compactorCfg.CompactionConcurrency, - false, + c.compactorCfg.SkipBlocksWithOutOfOrderChunksEnabled, ) if err != nil { return errors.Wrap(err, "failed to create bucket compactor") diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 2f8c6c7964..0ef3d23daa 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -30,12 +30,15 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/testutil" "gopkg.in/yaml.v2" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + cortex_storage_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" @@ -231,6 +234,10 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { cortex_compactor_blocks_marked_for_deletion_total{reason="compaction"} 0 cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 0 + # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. + # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter + cortex_compactor_blocks_marked_for_no_compaction_total 0 + # TYPE cortex_compactor_block_cleanup_started_total counter # HELP cortex_compactor_block_cleanup_started_total Total number of blocks cleanup runs started. cortex_compactor_block_cleanup_started_total 1 @@ -262,6 +269,7 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { "cortex_compactor_block_cleanup_failures_total", "cortex_compactor_blocks_cleaned_total", "cortex_compactor_blocks_marked_for_deletion_total", + "cortex_compactor_blocks_marked_for_no_compaction_total", "cortex_compactor_block_cleanup_started_total", "cortex_compactor_block_cleanup_completed_total", "cortex_compactor_block_cleanup_failed_total", @@ -381,6 +389,10 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket # HELP cortex_compactor_block_cleanup_started_total Total number of blocks cleanup runs started. cortex_compactor_block_cleanup_started_total 1 + # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. + # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter + cortex_compactor_blocks_marked_for_no_compaction_total 0 + # TYPE cortex_compactor_block_cleanup_completed_total counter # HELP cortex_compactor_block_cleanup_completed_total Total number of blocks cleanup runs successfully completed. cortex_compactor_block_cleanup_completed_total 0 @@ -408,6 +420,7 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket "cortex_compactor_block_cleanup_failures_total", "cortex_compactor_blocks_cleaned_total", "cortex_compactor_blocks_marked_for_deletion_total", + "cortex_compactor_blocks_marked_for_no_compaction_total", "cortex_compactor_block_cleanup_started_total", "cortex_compactor_block_cleanup_completed_total", "cortex_compactor_block_cleanup_failed_total", @@ -424,8 +437,10 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( bucketClient.MockIter(userID+"/markers/", nil, nil) bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) + bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil) + bucketClient.MockGet(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", "", nil) bucketClient.MockGet(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil) bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) @@ -474,12 +489,16 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { bucketClient.MockIter("user-2/markers/", nil, nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil) bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", "", nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil) + bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil) bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json", mockBlockMetaJSON("01FN3V83ABR9992RF8WRJZ76ZQ"), nil) bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/deletion-mark.json", "", nil) + bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/no-compact-mark.json", "", nil) bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil) bucketClient.MockIter("user-1/markers/", nil, nil) @@ -507,6 +526,8 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { // Ensure a plan has been executed for the blocks of each user. tsdbPlanner.AssertNumberOfCalls(t, "Plan", 2) + assert.Len(t, tsdbPlanner.getNoCompactBlocks(), 0) + assert.ElementsMatch(t, []string{ `level=info component=cleaner msg="started blocks cleanup and maintenance"`, `level=info component=cleaner org_id=user-1 msg="started blocks cleanup and maintenance"`, @@ -536,6 +557,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { "cortex_compactor_runs_started_total", "cortex_compactor_runs_completed_total", "cortex_compactor_runs_failed_total", "cortex_compactor_blocks_cleaned_total", "cortex_compactor_block_cleanup_failures_total", "cortex_compactor_blocks_marked_for_deletion_total", "cortex_compactor_block_cleanup_started_total", "cortex_compactor_block_cleanup_completed_total", "cortex_compactor_block_cleanup_failed_total", + "cortex_compactor_blocks_marked_for_no_compaction_total", } assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` # TYPE cortex_compactor_runs_started_total counter @@ -574,6 +596,10 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { # TYPE cortex_compactor_block_cleanup_failed_total counter # HELP cortex_compactor_block_cleanup_failed_total Total number of blocks cleanup runs failed. cortex_compactor_block_cleanup_failed_total 0 + + # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. + # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter + cortex_compactor_blocks_marked_for_no_compaction_total 0 `), testedMetrics...)) } @@ -654,6 +680,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { "cortex_compactor_runs_started_total", "cortex_compactor_runs_completed_total", "cortex_compactor_runs_failed_total", "cortex_compactor_blocks_cleaned_total", "cortex_compactor_block_cleanup_failures_total", "cortex_compactor_blocks_marked_for_deletion_total", "cortex_compactor_block_cleanup_started_total", "cortex_compactor_block_cleanup_completed_total", "cortex_compactor_block_cleanup_failed_total", + "cortex_compactor_blocks_marked_for_no_compaction_total", } assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` # TYPE cortex_compactor_runs_started_total counter @@ -692,6 +719,69 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { # TYPE cortex_compactor_block_cleanup_failed_total counter # HELP cortex_compactor_block_cleanup_failed_total Total number of blocks cleanup runs failed. cortex_compactor_block_cleanup_failed_total 0 + + # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. + # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter + cortex_compactor_blocks_marked_for_no_compaction_total 0 + `), testedMetrics...)) +} + +func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) { + t.Parallel() + + // Mock the bucket to contain two users, each one with one block. + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockExists(path.Join("user-1", cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucketClient.MockExists(path.Join("user-2", cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX"}, nil) + bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ"}, nil) + bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockIter("user-2/markers/", nil, nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", mockNoCompactBlockJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) + bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil) + bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", mockNoCompactBlockJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil) + + bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) + bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil) + bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json", mockBlockMetaJSON("01FN3V83ABR9992RF8WRJZ76ZQ"), nil) + bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/deletion-mark.json", "", nil) + bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/no-compact-mark.json", "", nil) + + bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) + bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil) + bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockIter("user-2/markers/", nil, nil) + bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) + bucketClient.MockUpload("user-2/bucket-index.json.gz", nil) + + c, _, tsdbPlanner, _, registry := prepare(t, prepareConfig(), bucketClient) + + tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) + + cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { + return prom_testutil.ToFloat64(c.compactionRunsCompleted) + }) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) + + // Planner still called for user with all blocks makred for skip compaction. + tsdbPlanner.AssertNumberOfCalls(t, "Plan", 2) + + assert.ElementsMatch(t, []string{"01DTVP434PA9VFXSW2JKB3392D", "01FN6CDF3PNEWWRY5MPGJPE3EX"}, tsdbPlanner.getNoCompactBlocks()) + + testedMetrics := []string{"cortex_compactor_blocks_marked_for_no_compaction_total"} + + assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` + # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. + # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter + cortex_compactor_blocks_marked_for_no_compaction_total 0 `), testedMetrics...)) } @@ -759,6 +849,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) "cortex_compactor_blocks_cleaned_total", "cortex_compactor_block_cleanup_failures_total", "cortex_compactor_blocks_marked_for_deletion_total", "cortex_compactor_block_cleanup_started_total", "cortex_compactor_block_cleanup_completed_total", "cortex_compactor_block_cleanup_failed_total", "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", "cortex_bucket_index_last_successful_update_timestamp_seconds", + "cortex_compactor_blocks_marked_for_no_compaction_total", } assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` # TYPE cortex_compactor_runs_started_total counter @@ -797,9 +888,65 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) # TYPE cortex_compactor_block_cleanup_failed_total counter # HELP cortex_compactor_block_cleanup_failed_total Total number of blocks cleanup runs failed. cortex_compactor_block_cleanup_failed_total 0 + + # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. + # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter + cortex_compactor_blocks_marked_for_no_compaction_total 0 `), testedMetrics...)) } +func TestCompactor_ShouldSkipOutOrOrderBlocks(t *testing.T) { + bucketClient, tmpDir := cortex_storage_testutil.PrepareFilesystemBucket(t) + bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient) + + b1 := createTSDBBlock(t, bucketClient, "user-1", 10, 20, map[string]string{"__name__": "Teste"}) + b2 := createTSDBBlock(t, bucketClient, "user-1", 20, 30, map[string]string{"__name__": "Teste"}) + + err := testutil.PutOutOfOrderIndex(path.Join(tmpDir, "user-1", b1.String()), 10, 20) + require.NoError(t, err) + + cfg := prepareConfig() + cfg.SkipBlocksWithOutOfOrderChunksEnabled = true + c, tsdbCompac, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient) + + tsdbCompac.On("Compact", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(b1, nil) + + tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: b1, + MinTime: 10, + MaxTime: 20, + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: b2, + MinTime: 20, + MaxTime: 30, + }, + }, + }, nil) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) + + defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck + + // Wait until a run has completed. + cortex_testutil.Poll(t, 5*time.Second, true, func() interface{} { + if _, err := os.Stat(path.Join(tmpDir, "user-1", b1.String(), "no-compact-mark.json")); err == nil { + return true + } + return false + }) + + assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` + # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. + # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter + cortex_compactor_blocks_marked_for_no_compaction_total 1 + `), "cortex_compactor_blocks_marked_for_no_compaction_total")) +} + func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunning(t *testing.T) { t.Parallel() @@ -814,12 +961,16 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni bucketClient.MockIter("user-2/markers/", nil, nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil) bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", "", nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil) + bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil) bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json", mockBlockMetaJSON("01FN3V83ABR9992RF8WRJZ76ZQ"), nil) bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/deletion-mark.json", "", nil) + bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/no-compact-mark.json", "", nil) bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) @@ -900,6 +1051,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) + bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) } @@ -1145,7 +1297,9 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (* }) tsdbCompactor := &tsdbCompactorMock{} - tsdbPlanner := &tsdbPlannerMock{} + tsdbPlanner := &tsdbPlannerMock{ + noCompactMarkFilters: []*compact.GatherNoCompactionMarkFilter{}, + } logs := &concurrency.SyncBuffer{} logger := log.NewLogfmtLogger(logs) registry := prometheus.NewRegistry() @@ -1159,8 +1313,13 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (* return bucketClient, nil } - blocksCompactorFactory := func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, compact.Planner, error) { - return tsdbCompactor, tsdbPlanner, nil + blocksCompactorFactory := func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { + return tsdbCompactor, + func(_ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Planner { + tsdbPlanner.noCompactMarkFilters = append(tsdbPlanner.noCompactMarkFilters, noCompactMarkFilter) + return tsdbPlanner + }, + nil } c, err := newCompactor(compactorCfg, storageCfg, overrides, logger, registry, bucketClientFactory, DefaultBlocksGrouperFactory, blocksCompactorFactory) @@ -1190,6 +1349,7 @@ func (m *tsdbCompactorMock) Compact(dest string, dirs []string, open []*tsdb.Blo type tsdbPlannerMock struct { mock.Mock + noCompactMarkFilters []*compact.GatherNoCompactionMarkFilter } func (m *tsdbPlannerMock) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { @@ -1197,6 +1357,19 @@ func (m *tsdbPlannerMock) Plan(ctx context.Context, metasByMinTime []*metadata.M return args.Get(0).([]*metadata.Meta), args.Error(1) } +func (m *tsdbPlannerMock) getNoCompactBlocks() []string { + + result := []string{} + + for _, noCompactMarkFilter := range m.noCompactMarkFilters { + for _, mark := range noCompactMarkFilter.NoCompactMarkedBlocks() { + result = append(result, mark.ID.String()) + } + } + + return result +} + func mockBlockMetaJSON(id string) string { meta := tsdb.BlockMeta{ Version: 1, @@ -1217,6 +1390,23 @@ func mockBlockMetaJSON(id string) string { return string(content) } +func mockNoCompactBlockJSON(id string) string { + meta := metadata.NoCompactMark{ + Version: 1, + ID: ulid.MustParse(id), + NoCompactTime: time.Now().Unix(), + Details: "yolo", + Reason: "testing", + } + + content, err := json.Marshal(meta) + if err != nil { + panic("failed to marshal mocked block no compact mark") + } + + return string(content) +} + func mockDeletionMarkJSON(id string, deletionTime time.Time) string { meta := metadata.DeletionMark{ Version: metadata.DeletionMarkVersion1, @@ -1226,7 +1416,7 @@ func mockDeletionMarkJSON(id string, deletionTime time.Time) string { content, err := json.Marshal(meta) if err != nil { - panic("failed to marshal mocked block meta") + panic("failed to marshal mocked block deletion mark") } return string(content) diff --git a/pkg/compactor/shuffle_sharding_planner.go b/pkg/compactor/shuffle_sharding_planner.go index f5c97c1266..d9e99ff0d3 100644 --- a/pkg/compactor/shuffle_sharding_planner.go +++ b/pkg/compactor/shuffle_sharding_planner.go @@ -5,18 +5,21 @@ import ( "fmt" "github.com/go-kit/log" + "github.com/oklog/ulid" "github.com/thanos-io/thanos/pkg/block/metadata" ) type ShuffleShardingPlanner struct { - logger log.Logger - ranges []int64 + logger log.Logger + ranges []int64 + noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark } -func NewShuffleShardingPlanner(logger log.Logger, ranges []int64) *ShuffleShardingPlanner { +func NewShuffleShardingPlanner(logger log.Logger, ranges []int64, noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark) *ShuffleShardingPlanner { return &ShuffleShardingPlanner{ - logger: logger, - ranges: ranges, + logger: logger, + ranges: ranges, + noCompBlocksFunc: noCompBlocksFunc, } } @@ -28,12 +31,20 @@ func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metad largestRange := p.ranges[len(p.ranges)-1] rangeStart := getRangeStart(metasByMinTime[0], largestRange) rangeEnd := rangeStart + largestRange + noCompactMarked := p.noCompBlocksFunc() + resultMetas := make([]*metadata.Meta, 0, len(metasByMinTime)) for _, b := range metasByMinTime { + if _, excluded := noCompactMarked[b.ULID]; excluded { + continue + } + if b.MinTime < rangeStart || b.MaxTime > rangeEnd { return nil, fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", b.ULID.String(), b.MinTime, b.MaxTime, rangeStart, rangeEnd) } + + resultMetas = append(resultMetas, b) } - return metasByMinTime, nil + return resultMetas, nil } diff --git a/pkg/compactor/shuffle_sharding_planner_test.go b/pkg/compactor/shuffle_sharding_planner_test.go index fdf63c9207..9a719a5abe 100644 --- a/pkg/compactor/shuffle_sharding_planner_test.go +++ b/pkg/compactor/shuffle_sharding_planner_test.go @@ -18,10 +18,11 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) { block2ulid := ulid.MustNew(2, nil) tests := map[string]struct { - ranges []int64 - blocks []*metadata.Meta - expected []*metadata.Meta - expectedErr error + ranges []int64 + noCompactBlocks map[ulid.ULID]*metadata.NoCompactMark + blocks []*metadata.Meta + expected []*metadata.Meta + expectedErr error }{ "test basic plan": { ranges: []int64{2 * time.Hour.Milliseconds()}, @@ -118,12 +119,45 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) { }, expectedErr: fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", block2ulid.String(), 0*time.Hour.Milliseconds(), 4*time.Hour.Milliseconds(), 0*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds()), }, + "test should skip blocks marked for no compact": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + noCompactBlocks: map[ulid.ULID]*metadata.NoCompactMark{block1ulid: {}}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + expected: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + }, } for testName, testData := range tests { t.Run(testName, func(t *testing.T) { p := NewShuffleShardingPlanner(nil, - testData.ranges) + testData.ranges, + func() map[ulid.ULID]*metadata.NoCompactMark { + return testData.noCompactBlocks + }, + ) actual, err := p.Plan(context.Background(), testData.blocks) if testData.expectedErr != nil {