Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingesters: Delete blocks relative to when they were uploaded to storage #3816

Merged
merged 3 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* [CHANGE] Querier: Introduce `-querier.max-partial-query-length` to limit the time range for partial queries at the querier level and deprecate `-store.max-query-length`. #3825
* [CHANGE] Store-gateway: Remove experimental `-blocks-storage.bucket-store.max-concurrent-reject-over-limit` flag. #3706
* [CHANGE] Ingester: If shipping is enabled block retention will now be relative to the upload time to cloud storage. If shipping is disabled block retention will be relative to the creation time of the block instead of the mintime of the last block created. #3816
* [FEATURE] Store-gateway: streaming of series. The store-gateway can now stream results back to the querier instead of buffering them. This is expected to greatly reduce peak memory consumption while keeping latency the same. You can enable this feature by setting `-blocks-storage.bucket-store.batch-series-size` to a value in the high thousands (5000-10000). This is still an experimental feature and is subject to a changing API and instability. #3540 #3546 #3587 #3606 #3611 #3620 #3645 #3355 #3697 #3666 #3687 #3728 #3739 #3751 #3779 #3839
* [FEATURE] Alertmanager: Added support for the Webex receiver. #3758
* [FEATURE] Limits: Added the `-validation.separate-metrics-group-label` flag. This allows further separation of the `cortex_discarded_samples_total` metric by an additional `group` label - which is configured by this flag to be the value of a specific label on an incoming timeseries. Active groups are tracked and inactive groups are cleaned up on a defined interval. The maximum number of groups tracked is controlled by the `-max-separate-metrics-groups-per-user` flag. #3439
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -5761,7 +5761,7 @@
"kind": "field",
"name": "retention_period",
"required": false,
"desc": "TSDB blocks retention in the ingester before a block is removed, relative to the newest block written for the tenant. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks.",
"desc": "TSDB blocks retention in the ingester before a block is removed. If shipping is enabled, the retention will be relative to the time when the block was uploaded to storage. If shipping is disabled then its relative to the creation time of the block. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks.",
"fieldValue": null,
"fieldDefaultValue": 86400000000000,
"fieldFlag": "blocks-storage.tsdb.retention-period",
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ Usage of ./cmd/mimir/mimir:
-blocks-storage.tsdb.out-of-order-capacity-max int
[experimental] Maximum capacity for out of order chunks, in samples between 1 and 255. (default 32)
-blocks-storage.tsdb.retention-period duration
TSDB blocks retention in the ingester before a block is removed, relative to the newest block written for the tenant. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks. (default 24h0m0s)
TSDB blocks retention in the ingester before a block is removed. If shipping is enabled, the retention will be relative to the time when the block was uploaded to storage. If shipping is disabled then its relative to the creation time of the block. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks. (default 24h0m0s)
-blocks-storage.tsdb.series-hash-cache-max-size-bytes uint
Max size - in bytes - of the in-memory series hash cache. The cache is shared across all tenants and it's used only when query sharding is enabled. (default 1073741824)
-blocks-storage.tsdb.ship-concurrency int
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ Usage of ./cmd/mimir/mimir:
-blocks-storage.tsdb.dir string
Directory to store TSDBs (including WAL) in the ingesters. This directory is required to be persisted between restarts. (default "./tsdb/")
-blocks-storage.tsdb.retention-period duration
TSDB blocks retention in the ingester before a block is removed, relative to the newest block written for the tenant. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks. (default 24h0m0s)
TSDB blocks retention in the ingester before a block is removed. If shipping is enabled, the retention will be relative to the time when the block was uploaded to storage. If shipping is disabled then its relative to the creation time of the block. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks. (default 24h0m0s)
-common.storage.azure.account-key string
Azure storage account key
-common.storage.azure.account-name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3135,8 +3135,10 @@ tsdb:
# CLI flag: -blocks-storage.tsdb.dir
[dir: <string> | default = "./tsdb/"]

# TSDB blocks retention in the ingester before a block is removed, relative to
# the newest block written for the tenant. This should be larger than the
# TSDB blocks retention in the ingester before a block is removed. If shipping
# is enabled, the retention will be relative to the time when the block was
# uploaded to storage. If shipping is disabled then its relative to the
# creation time of the block. This should be larger than the
# -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and
# large enough to give store-gateways and queriers enough time to discover
# newly uploaded blocks.
Expand Down
2 changes: 1 addition & 1 deletion integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
commonFlags := mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-blocks-storage.tsdb.retention-period": "1ms", // Retention period counts from the moment the block was uploaded to storage so we're setting it deliberatelly small so block gets deleted as soon as possible
})

// Start dependencies in common with all test cases.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1596,9 +1596,9 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()),
ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),
ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),

instanceLimitsFn: i.getInstanceLimits,
instanceSeriesCount: &i.seriesCount,
blockMinRetention: i.cfg.BlocksStorageConfig.TSDB.Retention,
}

maxExemplars := i.limiter.convertGlobalToLocalLimit(userID, i.limits.MaxGlobalExemplarsPerUser(userID))
Expand Down
164 changes: 157 additions & 7 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3263,7 +3263,7 @@ func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInP

// Mock the shipper meta (no blocks).
db := i.getTSDB(userID)
require.NoError(t, writeShipperMetaFile(log.NewNopLogger(), db.db.Dir(), &shipperMeta{
require.NoError(t, writeShipperMetaFile(log.NewNopLogger(), db.db.Dir(), shipperMeta{
Version: shipperMetaVersion1,
}))

Expand Down Expand Up @@ -4182,9 +4182,9 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
`, oldBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))

// Saying that we have shipped the second block, so only that should get deleted.
require.Nil(t, writeShipperMetaFile(nil, db.db.Dir(), &shipperMeta{
Version: shipperMetaVersion1,
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID},
require.Nil(t, writeShipperMetaFile(nil, db.db.Dir(), shipperMeta{
Version: shipperMetaVersion1,
Shipped: map[ulid.ULID]model.Time{oldBlocks[1].Meta().ULID: model.TimeFromUnixNano(time.Now().UnixNano())},
}))
require.NoError(t, db.updateCachedShippedBlocks())

Expand All @@ -4210,9 +4210,13 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
`, newBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))

// Shipping 2 more blocks, hence all the blocks from first round.
require.Nil(t, writeShipperMetaFile(nil, db.db.Dir(), &shipperMeta{
Version: shipperMetaVersion1,
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID, newBlocks[0].Meta().ULID, newBlocks[1].Meta().ULID},
require.Nil(t, writeShipperMetaFile(nil, db.db.Dir(), shipperMeta{
Version: shipperMetaVersion1,
Shipped: map[ulid.ULID]model.Time{
oldBlocks[1].Meta().ULID: model.TimeFromUnixNano(time.Now().UnixNano()),
newBlocks[0].Meta().ULID: model.TimeFromUnixNano(time.Now().UnixNano()),
newBlocks[1].Meta().ULID: model.TimeFromUnixNano(time.Now().UnixNano()),
},
}))
require.NoError(t, db.updateCachedShippedBlocks())

Expand Down Expand Up @@ -4241,6 +4245,152 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
`, newBlocks2[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))
}

func TestIngesterNotDeleteShippedBlocksUntilRetentionExpires(t *testing.T) {
chunkRange := 2 * time.Hour
chunkRangeMilliSec := chunkRange.Milliseconds()
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{chunkRange}
cfg.BlocksStorageConfig.TSDB.Retention = 1 * time.Hour // This means only blocks that are shipped for more than an hour can be deleted

// Create ingester
reg := prometheus.NewPedanticRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, reg)
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.Background(), i)
})

// Wait until it's healthy
test.Poll(t, 1*time.Second, 1, func() interface{} {
return i.lifecycler.HealthyInstancesCount()
})

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingester_oldest_unshipped_block_timestamp_seconds Unix timestamp of the oldest TSDB block not shipped to the storage yet. 0 if ingester has no blocks or all blocks have been shipped.
# TYPE cortex_ingester_oldest_unshipped_block_timestamp_seconds gauge
cortex_ingester_oldest_unshipped_block_timestamp_seconds 0
`), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))

// Push some data to create 3 blocks.
ctx := user.InjectOrgID(context.Background(), userID)
for j := int64(0); j < 5; j++ {
req, _, _, _ := mockWriteRequest(t, labels.FromStrings(labels.MetricName, "test"), 0, j*chunkRangeMilliSec)
_, err := i.Push(ctx, req)
require.NoError(t, err)
}

db := i.getTSDB(userID)
require.NotNil(t, db)
require.Nil(t, db.Compact())

oldBlocks := db.Blocks()
require.Equal(t, 3, len(oldBlocks))

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingester_oldest_unshipped_block_timestamp_seconds Unix timestamp of the oldest TSDB block not shipped to the storage yet. 0 if ingester has no blocks or all blocks have been shipped.
# TYPE cortex_ingester_oldest_unshipped_block_timestamp_seconds gauge
cortex_ingester_oldest_unshipped_block_timestamp_seconds %d
`, oldBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))

// Lets say that the first block was shipped 2 hours ago and the second block only 30 minutes ago.
require.Nil(t, writeShipperMetaFile(nil, db.db.Dir(), shipperMeta{
Version: shipperMetaVersion1,
Shipped: map[ulid.ULID]model.Time{
oldBlocks[0].Meta().ULID: model.TimeFromUnixNano(time.Now().Add(-2 * time.Hour).UnixNano()),
oldBlocks[1].Meta().ULID: model.TimeFromUnixNano(time.Now().Add(-30 * time.Minute).UnixNano()),
},
}))
require.NoError(t, db.updateCachedShippedBlocks())

// Add more samples that could trigger another compaction and hence reload of blocks.
for j := int64(5); j < 6; j++ {
req, _, _, _ := mockWriteRequest(t, labels.FromStrings(labels.MetricName, "test"), 0, j*chunkRangeMilliSec)
_, err := i.Push(ctx, req)
require.NoError(t, err)
}
require.Nil(t, db.Compact())

// Only the last two old blocks plus the one containing the newly added samples should remain.
newBlocks := db.Blocks()
require.Equal(t, 3, len(newBlocks))
require.Equal(t, oldBlocks[1].Meta().ULID, newBlocks[0].Meta().ULID) // Second block becomes first block.
require.Equal(t, oldBlocks[2].Meta().ULID, newBlocks[1].Meta().ULID) // Third block becomes second block.

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingester_oldest_unshipped_block_timestamp_seconds Unix timestamp of the oldest TSDB block not shipped to the storage yet. 0 if ingester has no blocks or all blocks have been shipped.
# TYPE cortex_ingester_oldest_unshipped_block_timestamp_seconds gauge
cortex_ingester_oldest_unshipped_block_timestamp_seconds %d
`, newBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))
}

func TestIngesterWithShippingDisabledDeletesBlocksOnlyAfterRetentionExpires(t *testing.T) {
chunkRange := 2 * time.Hour
chunkRangeMilliSec := chunkRange.Milliseconds()
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{chunkRange}
cfg.BlocksStorageConfig.TSDB.ShipInterval = 0 // Disabled shipping
cfg.BlocksStorageConfig.TSDB.Retention = 1 * time.Second // With shipping disabled this means will only expire 1 hour after the block creation time.

// Create ingester
reg := prometheus.NewPedanticRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, reg)
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.Background(), i)
})

// Wait until it's healthy
test.Poll(t, 1*time.Second, 1, func() interface{} {
return i.lifecycler.HealthyInstancesCount()
})

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingester_tsdb_compactions_total Total number of TSDB compactions that were executed.
# TYPE cortex_ingester_tsdb_compactions_total counter
cortex_ingester_tsdb_compactions_total 0
`), "cortex_ingester_tsdb_compactions_total"))

// Push some data to create 3 blocks.
ctx := user.InjectOrgID(context.Background(), userID)
for j := int64(0); j < 5; j++ {
req, _, _, _ := mockWriteRequest(t, labels.FromStrings(labels.MetricName, "test"), 0, j*chunkRangeMilliSec)
_, err := i.Push(ctx, req)
require.NoError(t, err)
}

db := i.getTSDB(userID)
require.NotNil(t, db)
require.Nil(t, db.Compact())

oldBlocks := db.Blocks()
require.Equal(t, 3, len(oldBlocks))

// Yes, we're sleeping in this test to let the retention of the newly compacted blocks expire
time.Sleep(1 * time.Second)

// Add more samples that could trigger another compaction and hence reload of blocks.
req, _, _, _ := mockWriteRequest(t, labels.FromStrings(labels.MetricName, "test"), 0, 5*chunkRangeMilliSec)
_, err = i.Push(ctx, req)
require.NoError(t, err)
require.Nil(t, db.Compact())

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingester_tsdb_compactions_total Total number of TSDB compactions that were executed.
# TYPE cortex_ingester_tsdb_compactions_total counter
cortex_ingester_tsdb_compactions_total 4
`), "cortex_ingester_tsdb_compactions_total"))

// Only last compacted block should remain.
newBlocks := db.Blocks()
require.Equal(t, 1, len(newBlocks))
require.NotContains(t, []ulid.ULID{oldBlocks[0].Meta().ULID, oldBlocks[1].Meta().ULID, oldBlocks[2].Meta().ULID}, newBlocks[0].Meta().ULID)

}

func TestIngesterPushErrorDuringForcedCompaction(t *testing.T) {
i, err := prepareIngesterWithBlocksStorage(t, defaultIngesterTestConfig(t), nil)
require.NoError(t, err)
Expand Down
Loading