Skip to content

Commit

Permalink
compact: do not cleanup blocks on boot
Browse files Browse the repository at this point in the history
Do not cleanup blocks on boot because in some very bad cases there could
be thousands of blocks ready-to-be deleted and doing that makes Thanos
Compact exceed `initialDelaySeconds` on k8s.

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
GiedriusS committed Dec 2, 2020
1 parent a57bbfb commit eb3ed62
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 24 deletions.
30 changes: 10 additions & 20 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func runCompact(
Name: "thanos_compact_iterations_total",
Help: "Total number of iterations that were executed successfully.",
})
cleanups := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_block_cleanup_loops_total",
Help: "Total number of concurrent cleanup loops of partially uploaded blocks and marked blocks that were executed successfully.",
})
partialUploadDeleteAttempts := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_aborted_partial_uploads_deletion_attempts_total",
Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.",
Expand Down Expand Up @@ -339,29 +343,20 @@ func runCompact(
cleanMtx.Lock()
defer cleanMtx.Unlock()

// No need to resync before partial uploads and delete marked blocks. Last sync should be valid.
if err := sy.SyncMetas(ctx); err != nil {
cancel()
return errors.Wrap(err, "syncing metas")
}

compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksCleaned, blockCleanupFailures)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "cleaning marked blocks")
}
cleanups.Inc()

if err := sy.SyncMetas(ctx); err != nil {
level.Error(logger).Log("msg", "failed to sync metas", "err", err)
}
return nil
}

// Do it once at the beginning to ensure that it runs at least once before
// the main loop.
if err := sy.SyncMetas(ctx); err != nil {
cancel()
return errors.Wrap(err, "syncing metas")
}
if err := cleanPartialMarked(); err != nil {
cancel()
return errors.Wrap(err, "cleaning partial and marked blocks")
}

compactMainFn := func() error {
if err := compactor.Compact(ctx); err != nil {
return errors.Wrap(err, "compaction")
Expand Down Expand Up @@ -481,11 +476,6 @@ func runCompact(
// since one iteration potentially could take a long time.
if conf.cleanupBlocksInterval > 0 {
g.Add(func() error {
// Wait the whole period at the beginning because we've executed this on boot.
select {
case <-time.After(conf.cleanupBlocksInterval):
case <-ctx.Done():
}
return runutil.Repeat(conf.cleanupBlocksInterval, ctx.Done(), cleanPartialMarked)
}, func(error) {
cancel()
Expand Down
10 changes: 6 additions & 4 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,13 +535,14 @@ func TestCompactWithStoreGateway(t *testing.T) {
c, err := e2ethanos.NewCompactor(s.SharedDir(), "expect-to-halt", svcConfig, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(c))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7)), "thanos_blocks_meta_synced"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified"))

// Expect compactor halted.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_halted"))

testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+5)), "thanos_blocks_meta_synced"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified"))

// The compact directory is still there.
dataDir := filepath.Join(s.SharedDir(), "data", "compact", "expect-to-halt")
empty, err := isEmptyDir(dataDir)
Expand All @@ -559,8 +560,9 @@ func TestCompactWithStoreGateway(t *testing.T) {
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_group_compaction_runs_completed_total"))

// However, the blocks have been cleaned because that happens concurrently.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_block_cleanup_loops_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_blocks_cleaned_total"))

// Ensure bucket UI.
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "global"))
Expand Down

0 comments on commit eb3ed62

Please sign in to comment.