Skip to content

Commit

Permalink
Delete blocks relative to when they were uploaded
Browse files Browse the repository at this point in the history
This commit changes the logic of when blocks will be deleted on
ingesters. Before it was relative to the newest block for the tenant
minus a certain retention. Now its relative to when the block was
uploaded minus a certain retention.

We do this for two reasons:
- It will allow us to lower the retention on ingesters making them
  ligther.
- If the out-of-order window is bigger than the retention time, with the
  previous logic blocks would be deleted right after upload, leaving a
time where data would not be queryable.

Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com>
  • Loading branch information
jesusvazquez committed Dec 23, 2022
1 parent a0f5ad1 commit 650bfb6
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -5740,7 +5740,7 @@
"kind": "field",
"name": "retention_period",
"required": false,
"desc": "TSDB blocks retention in the ingester before a block is removed, relative to the newest block written for the tenant. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks.",
"desc": "TSDB blocks retention in the ingester before a block is removed, relative to the time when the block was uploaded to storage. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks.",
"fieldValue": null,
"fieldDefaultValue": 86400000000000,
"fieldFlag": "blocks-storage.tsdb.retention-period",
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ Usage of ./cmd/mimir/mimir:
-blocks-storage.tsdb.out-of-order-capacity-max int
[experimental] Maximum capacity for out of order chunks, in samples between 1 and 255. (default 32)
-blocks-storage.tsdb.retention-period duration
TSDB blocks retention in the ingester before a block is removed, relative to the newest block written for the tenant. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks. (default 24h0m0s)
TSDB blocks retention in the ingester before a block is removed, relative to the time when the block was uploaded to storage. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks. (default 24h0m0s)
-blocks-storage.tsdb.series-hash-cache-max-size-bytes uint
Max size - in bytes - of the in-memory series hash cache. The cache is shared across all tenants and it's used only when query sharding is enabled. (default 1073741824)
-blocks-storage.tsdb.ship-concurrency int
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ Usage of ./cmd/mimir/mimir:
-blocks-storage.tsdb.dir string
Directory to store TSDBs (including WAL) in the ingesters. This directory is required to be persisted between restarts. (default "./tsdb/")
-blocks-storage.tsdb.retention-period duration
TSDB blocks retention in the ingester before a block is removed, relative to the newest block written for the tenant. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks. (default 24h0m0s)
TSDB blocks retention in the ingester before a block is removed, relative to the time when the block was uploaded to storage. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks. (default 24h0m0s)
-common.storage.azure.account-key string
Azure storage account key
-common.storage.azure.account-name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3041,8 +3041,8 @@ tsdb:
[dir: <string> | default = "./tsdb/"]
# TSDB blocks retention in the ingester before a block is removed, relative to
# the newest block written for the tenant. This should be larger than the
# -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and
# the time when the block was uploaded to storage. This should be larger than
# the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and
# large enough to give store-gateways and queriers enough time to discover
# newly uploaded blocks.
# CLI flag: -blocks-storage.tsdb.retention-period
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1575,9 +1575,9 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()),
ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),
ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),

instanceLimitsFn: i.getInstanceLimits,
instanceSeriesCount: &i.seriesCount,
blockMinRetention: i.cfg.BlocksStorageConfig.TSDB.Retention,
}

maxExemplars := i.limiter.convertGlobalToLocalLimit(userID, i.limits.MaxGlobalExemplarsPerUser(userID))
Expand Down
114 changes: 106 additions & 8 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/test"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
Expand Down Expand Up @@ -3263,7 +3262,7 @@ func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInP

// Mock the shipper meta (no blocks).
db := i.getTSDB(userID)
require.NoError(t, writeThanosShipperMetaFile(log.NewNopLogger(), db.db.Dir(), &thanosShipperMeta{
require.NoError(t, writeShipperMetaFile(log.NewNopLogger(), db.db.Dir(), shipperMeta{
Version: shipperMetaVersion1,
}))

Expand Down Expand Up @@ -4182,9 +4181,9 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
`, oldBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))

// Saying that we have shipped the second block, so only that should get deleted.
require.Nil(t, writeThanosShipperMetaFile(nil, db.db.Dir(), &thanosShipperMeta{
Version: shipperMetaVersion1,
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID},
require.Nil(t, writeShipperMetaFile(nil, db.db.Dir(), shipperMeta{
Version: shipperMetaVersion1,
Shipped: []shippedBlock{{BlockID: oldBlocks[1].Meta().ULID, ShippedTime: time.Now()}},
}))
require.NoError(t, db.updateCachedShippedBlocks())

Expand All @@ -4210,9 +4209,22 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
`, newBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))

// Shipping 2 more blocks, hence all the blocks from first round.
require.Nil(t, writeThanosShipperMetaFile(nil, db.db.Dir(), &thanosShipperMeta{
Version: shipperMetaVersion1,
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID, newBlocks[0].Meta().ULID, newBlocks[1].Meta().ULID},
require.Nil(t, writeShipperMetaFile(nil, db.db.Dir(), shipperMeta{
Version: shipperMetaVersion1,
Shipped: []shippedBlock{
{
BlockID: oldBlocks[1].Meta().ULID,
ShippedTime: time.Now(),
},
{
BlockID: newBlocks[0].Meta().ULID,
ShippedTime: time.Now(),
},
{
BlockID: newBlocks[1].Meta().ULID,
ShippedTime: time.Now(),
},
},
}))
require.NoError(t, db.updateCachedShippedBlocks())

Expand Down Expand Up @@ -4241,6 +4253,92 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
`, newBlocks2[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))
}

func TestIngesterNotDeleteShippedBlocksUntilRetentionExpires(t *testing.T) {
chunkRange := 2 * time.Hour
chunkRangeMilliSec := chunkRange.Milliseconds()
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{chunkRange}
cfg.BlocksStorageConfig.TSDB.Retention = 1 * time.Hour // This means only blocks that are shipped for more than an hour can be deleted

// Create ingester
reg := prometheus.NewPedanticRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, reg)
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.Background(), i)
})

// Wait until it's healthy
test.Poll(t, 1*time.Second, 1, func() interface{} {
return i.lifecycler.HealthyInstancesCount()
})

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingester_oldest_unshipped_block_timestamp_seconds Unix timestamp of the oldest TSDB block not shipped to the storage yet. 0 if ingester has no blocks or all blocks have been shipped.
# TYPE cortex_ingester_oldest_unshipped_block_timestamp_seconds gauge
cortex_ingester_oldest_unshipped_block_timestamp_seconds 0
`), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))

// Push some data to create 3 blocks.
ctx := user.InjectOrgID(context.Background(), userID)
for j := int64(0); j < 5; j++ {
req, _, _, _ := mockWriteRequest(t, labels.FromStrings(labels.MetricName, "test"), 0, j*chunkRangeMilliSec)
_, err := i.Push(ctx, req)
require.NoError(t, err)
}

db := i.getTSDB(userID)
require.NotNil(t, db)
require.Nil(t, db.Compact())

oldBlocks := db.Blocks()
require.Equal(t, 3, len(oldBlocks))

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingester_oldest_unshipped_block_timestamp_seconds Unix timestamp of the oldest TSDB block not shipped to the storage yet. 0 if ingester has no blocks or all blocks have been shipped.
# TYPE cortex_ingester_oldest_unshipped_block_timestamp_seconds gauge
cortex_ingester_oldest_unshipped_block_timestamp_seconds %d
`, oldBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))

// Lets say that the first block was shipped 2 hours ago and the second block only 30 minutes ago.
require.Nil(t, writeShipperMetaFile(nil, db.db.Dir(), shipperMeta{
Version: shipperMetaVersion1,
Shipped: []shippedBlock{
{
BlockID: oldBlocks[0].Meta().ULID,
ShippedTime: time.Now().Add(-2 * time.Hour),
},
{
BlockID: oldBlocks[1].Meta().ULID,
ShippedTime: time.Now().Add(-30 * time.Minute),
},
},
}))
require.NoError(t, db.updateCachedShippedBlocks())

// Add more samples that could trigger another compaction and hence reload of blocks.
for j := int64(5); j < 6; j++ {
req, _, _, _ := mockWriteRequest(t, labels.FromStrings(labels.MetricName, "test"), 0, j*chunkRangeMilliSec)
_, err := i.Push(ctx, req)
require.NoError(t, err)
}
require.Nil(t, db.Compact())

// Only the last two old blocks plus the one containing the newly added samples should remain.
newBlocks := db.Blocks()
require.Equal(t, 3, len(newBlocks))
require.Equal(t, oldBlocks[1].Meta().ULID, newBlocks[0].Meta().ULID) // Second block becomes first block.
require.Equal(t, oldBlocks[2].Meta().ULID, newBlocks[1].Meta().ULID) // Third block becomes second block.

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingester_oldest_unshipped_block_timestamp_seconds Unix timestamp of the oldest TSDB block not shipped to the storage yet. 0 if ingester has no blocks or all blocks have been shipped.
# TYPE cortex_ingester_oldest_unshipped_block_timestamp_seconds gauge
cortex_ingester_oldest_unshipped_block_timestamp_seconds %d
`, newBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))
}

func TestIngesterPushErrorDuringForcedCompaction(t *testing.T) {
i, err := prepareIngesterWithBlocksStorage(t, defaultIngesterTestConfig(t), nil)
require.NoError(t, err)
Expand Down
13 changes: 8 additions & 5 deletions pkg/ingester/user_tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ type userTSDB struct {
ingestedRuleSamples *util_math.EwmaRate

// Cached shipped blocks.
shippedBlocksMtx sync.Mutex
shippedBlocks map[ulid.ULID]time.Time
shippedBlocksMtx sync.Mutex
shippedBlocks map[ulid.ULID]time.Time
blockMinRetention time.Duration
}

// Explicitly wrapping the tsdb.DB functions that we use.
Expand Down Expand Up @@ -245,9 +246,11 @@ func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
shippedBlocks := u.getCachedShippedBlocks()

result := map[ulid.ULID]struct{}{}
for shippedID := range shippedBlocks {
if _, ok := deletable[shippedID]; ok {
result[shippedID] = struct{}{}
for blockID, blockShippedTime := range shippedBlocks {
if blockShippedTime.Before(time.Now().Add(-u.blockMinRetention)) {
if _, ok := deletable[blockID]; ok {
result[blockID] = struct{}{}
}
}
}
return result
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {

f.StringVar(&cfg.Dir, "blocks-storage.tsdb.dir", "./tsdb/", "Directory to store TSDBs (including WAL) in the ingesters. This directory is required to be persisted between restarts.")
f.Var(&cfg.BlockRanges, "blocks-storage.tsdb.block-ranges-period", "TSDB blocks range period.")
f.DurationVar(&cfg.Retention, "blocks-storage.tsdb.retention-period", 24*time.Hour, "TSDB blocks retention in the ingester before a block is removed, relative to the newest block written for the tenant. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks.")
f.DurationVar(&cfg.Retention, "blocks-storage.tsdb.retention-period", 24*time.Hour, "TSDB blocks retention in the ingester before a block is removed, relative to the time when the block was uploaded to storage. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks.")
f.DurationVar(&cfg.ShipInterval, "blocks-storage.tsdb.ship-interval", 1*time.Minute, "How frequently the TSDB blocks are scanned and new ones are shipped to the storage. 0 means shipping is disabled.")
f.IntVar(&cfg.ShipConcurrency, "blocks-storage.tsdb.ship-concurrency", 10, "Maximum number of tenants concurrently shipping blocks to the storage.")
f.Uint64Var(&cfg.SeriesHashCacheMaxBytes, "blocks-storage.tsdb.series-hash-cache-max-size-bytes", uint64(1*units.Gibibyte), "Max size - in bytes - of the in-memory series hash cache. The cache is shared across all tenants and it's used only when query sharding is enabled.")
Expand Down

0 comments on commit 650bfb6

Please sign in to comment.