Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compactor: Enable TSDB block upload on per-tenant basis #2126

Merged
merged 10 commits into from
Jun 20, 2022
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* [ENHANCEMENT] Chunk Mapper: reduce memory usage of async chunk mapper. #2043
* [ENHANCEMENT] Ingesters: Added new configuration option that makes it possible for mimir ingesters to perform queries on overlapping blocks in the filesystem. Enabled with `-blocks-storage.tsdb.allow-overlapping-queries`. #2091
* [ENHANCEMENT] Ingester: reduce sleep time when reading WAL. #2098
* [ENHANCEMENT] Compactor: Add HTTP API for uploading TSDB blocks. #1694
* [ENHANCEMENT] Compactor: Add HTTP API for uploading TSDB blocks. Enabled with `-compactor.block-upload-enabled`. #1694 #2126
* [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883
* [BUGFIX] Ingester: fixed deceiving error log "failed to update cached shipped blocks after shipper initialisation", occurring for each new tenant in the ingester. #1893
* [BUGFIX] Ring: fix bug where instances may appear unhealthy in the hash ring web UI even though they are not. #1933
Expand Down
10 changes: 10 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2879,6 +2879,16 @@
"fieldFlag": "compactor.compactor-tenant-shard-size",
"fieldType": "int"
},
{
"kind": "field",
"name": "compactor_block_upload_enabled",
"required": false,
"desc": "Enable block upload API for the tenant.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "compactor.block-upload-enabled",
"fieldType": "boolean"
},
{
"kind": "field",
"name": "s3_sse_type",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,8 @@ Usage of ./cmd/mimir/mimir:
List of compaction time ranges. (default 2h0m0s,12h0m0s,24h0m0s)
-compactor.block-sync-concurrency int
Number of Go routines to use when downloading blocks for compaction and uploading resulting blocks. (default 8)
-compactor.block-upload-enabled
Enable block upload API for the tenant.
-compactor.blocks-retention-period value
Delete blocks containing samples older than the specified retention period. 0 to disable.
-compactor.cleanup-concurrency int
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ Usage of ./cmd/mimir/mimir:
Directory to store TSDBs (including WAL) in the ingesters. This directory is required to be persisted between restarts. (default "./tsdb/")
-blocks-storage.tsdb.retention-period duration
TSDB blocks retention in the ingester before a block is removed, relative to the newest block written for the tenant. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks. (default 24h0m0s)
-compactor.block-upload-enabled
Enable block upload API for the tenant.
-compactor.blocks-retention-period value
Delete blocks containing samples older than the specified retention period. 0 to disable.
-compactor.compactor-tenant-shard-size int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2855,6 +2855,10 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -compactor.compactor-tenant-shard-size
[compactor_tenant_shard_size: <int> | default = 0]

# Enable block upload API for the tenant.
# CLI flag: -compactor.block-upload-enabled
[compactor_block_upload_enabled: <boolean> | default = false]

# S3 server-side encryption type. Required to enable server-side encryption
# overrides for a specific tenant. If not set, the default S3 client settings
# are used.
Expand Down
8 changes: 8 additions & 0 deletions pkg/compactor/block_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (c *MultitenantCompactor) HandleBlockUpload(w http.ResponseWriter, r *http.
http.Error(w, "invalid tenant ID", http.StatusBadRequest)
return
}
if !c.cfgProvider.CompactorBlockUploadEnabled(tenantID) {
http.Error(w, "block upload is disabled", http.StatusBadRequest)
return
}

logger := log.With(util_log.WithContext(ctx, c.logger), "block", blockID)

Expand Down Expand Up @@ -180,6 +184,10 @@ func (c *MultitenantCompactor) UploadBlockFile(w http.ResponseWriter, r *http.Re
http.Error(w, "invalid tenant ID", http.StatusBadRequest)
return
}
if !c.cfgProvider.CompactorBlockUploadEnabled(tenantID) {
http.Error(w, "block upload is disabled", http.StatusBadRequest)
return
}

logger := util_log.WithContext(ctx, c.logger)
logger = log.With(logger, "block", blockID)
Expand Down
39 changes: 38 additions & 1 deletion pkg/compactor/block_upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func TestMultitenantCompactor_HandleBlockUpload_Create(t *testing.T) {
body string
meta *metadata.Meta
retention time.Duration
disableBlockUpload bool
expBadRequest string
expConflict string
expUnprocessableEntity string
Expand Down Expand Up @@ -376,6 +377,13 @@ func TestMultitenantCompactor_HandleBlockUpload_Create(t *testing.T) {
meta: &validMeta,
expInternalServerError: true,
},
{
name: "block upload disabled",
tenantID: tenantID,
blockID: blockID,
disableBlockUpload: true,
expBadRequest: "block upload is disabled",
},
{
name: "valid request",
tenantID: tenantID,
Expand Down Expand Up @@ -497,6 +505,7 @@ func TestMultitenantCompactor_HandleBlockUpload_Create(t *testing.T) {

cfgProvider := newMockConfigProvider()
cfgProvider.userRetentionPeriods[tenantID] = tc.retention
cfgProvider.blockUploadEnabled[tenantID] = !tc.disableBlockUpload
c := &MultitenantCompactor{
logger: log.NewNopLogger(),
bucketClient: &bkt,
Expand Down Expand Up @@ -639,10 +648,12 @@ func TestMultitenantCompactor_HandleBlockUpload_Create(t *testing.T) {
metaJSON, err := json.Marshal(meta)
require.NoError(t, err)

cfgProvider := newMockConfigProvider()
cfgProvider.blockUploadEnabled[tenantID] = true
c := &MultitenantCompactor{
logger: log.NewNopLogger(),
bucketClient: bkt,
cfgProvider: newMockConfigProvider(),
cfgProvider: cfgProvider,
}
r := httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/upload/block/%s", blockID), bytes.NewReader(metaJSON))
r = r.WithContext(user.InjectOrgID(r.Context(), tenantID))
Expand Down Expand Up @@ -702,6 +713,7 @@ func TestMultitenantCompactor_UploadBlockFile(t *testing.T) {
blockID string
path string
body string
disableBlockUpload bool
expBadRequest string
expConflict string
expNotFound string
Expand Down Expand Up @@ -764,6 +776,14 @@ func TestMultitenantCompactor_UploadBlockFile(t *testing.T) {
body: "content",
expBadRequest: fmt.Sprintf("invalid path: %q", uploadingMetaFilename),
},
{
name: "block upload disabled",
tenantID: tenantID,
blockID: blockID,
disableBlockUpload: true,
path: "chunks/000001",
expBadRequest: "block upload is disabled",
},
{
name: "complete block already exists",
tenantID: tenantID,
Expand Down Expand Up @@ -857,9 +877,12 @@ func TestMultitenantCompactor_UploadBlockFile(t *testing.T) {
tc.setUpBucketMock(&bkt)
}

cfgProvider := newMockConfigProvider()
cfgProvider.blockUploadEnabled[tc.tenantID] = !tc.disableBlockUpload
c := &MultitenantCompactor{
logger: log.NewNopLogger(),
bucketClient: &bkt,
cfgProvider: cfgProvider,
}
var rdr io.Reader
if tc.body != "" {
Expand Down Expand Up @@ -953,9 +976,12 @@ func TestMultitenantCompactor_UploadBlockFile(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
tc.setUpBucket(t, bkt)
cfgProvider := newMockConfigProvider()
cfgProvider.blockUploadEnabled[tenantID] = true
c := &MultitenantCompactor{
logger: log.NewNopLogger(),
bucketClient: bkt,
cfgProvider: cfgProvider,
}

for _, f := range tc.files {
Expand Down Expand Up @@ -1027,6 +1053,7 @@ func TestMultitenantCompactor_HandleBlockUpload_Complete(t *testing.T) {
name string
tenantID string
blockID string
disableBlockUpload bool
expMeta metadata.Meta
expBadRequest string
expConflict string
Expand All @@ -1051,6 +1078,13 @@ func TestMultitenantCompactor_HandleBlockUpload_Complete(t *testing.T) {
blockID: "1234",
expBadRequest: "invalid block ID",
},
{
name: "block upload disabled",
tenantID: tenantID,
blockID: blockID,
disableBlockUpload: true,
expBadRequest: "block upload is disabled",
},
{
name: "complete block already exists",
tenantID: tenantID,
Expand Down Expand Up @@ -1142,9 +1176,12 @@ func TestMultitenantCompactor_HandleBlockUpload_Complete(t *testing.T) {
if tc.setUpBucketMock != nil {
tc.setUpBucketMock(&bkt)
}
cfgProvider := newMockConfigProvider()
cfgProvider.blockUploadEnabled[tc.tenantID] = !tc.disableBlockUpload
c := &MultitenantCompactor{
logger: log.NewNopLogger(),
bucketClient: &bkt,
cfgProvider: cfgProvider,
}
r := httptest.NewRequest(http.MethodPost, fmt.Sprintf(
"/api/v1/upload/block/%s?uploadComplete=true", tc.blockID), nil)
Expand Down
6 changes: 6 additions & 0 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,13 +699,15 @@ type mockConfigProvider struct {
splitAndMergeShards map[string]int
instancesShardSize map[string]int
splitGroups map[string]int
blockUploadEnabled map[string]bool
}

func newMockConfigProvider() *mockConfigProvider {
return &mockConfigProvider{
userRetentionPeriods: make(map[string]time.Duration),
splitAndMergeShards: make(map[string]int),
splitGroups: make(map[string]int),
blockUploadEnabled: make(map[string]bool),
}
}

Expand Down Expand Up @@ -737,6 +739,10 @@ func (m *mockConfigProvider) CompactorTenantShardSize(user string) int {
return 0
}

func (m *mockConfigProvider) CompactorBlockUploadEnabled(tenantID string) bool {
return m.blockUploadEnabled[tenantID]
}

func (m *mockConfigProvider) S3SSEType(user string) string {
return ""
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ type ConfigProvider interface {

// CompactorTenantShardSize returns number of compactors that this user can use. 0 = all compactors.
CompactorTenantShardSize(userID string) int

// CompactorBlockUploadEnabled returns whether block upload is enabled for a given tenant.
CompactorBlockUploadEnabled(tenantID string) bool
}

// MultitenantCompactor is a multi-tenant TSDB blocks compactor based on Thanos.
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type Limits struct {
CompactorSplitAndMergeShards int `yaml:"compactor_split_and_merge_shards" json:"compactor_split_and_merge_shards"`
CompactorSplitGroups int `yaml:"compactor_split_groups" json:"compactor_split_groups"`
CompactorTenantShardSize int `yaml:"compactor_tenant_shard_size" json:"compactor_tenant_shard_size"`
CompactorBlockUploadEnabled bool `yaml:"compactor_block_upload_enabled" json:"compactor_block_upload_enabled"`

// This config doesn't have a CLI flag registered here because they're registered in
// their own original config struct.
Expand Down Expand Up @@ -204,6 +205,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.CompactorSplitAndMergeShards, "compactor.split-and-merge-shards", 0, "The number of shards to use when splitting blocks. 0 to disable splitting.")
f.IntVar(&l.CompactorSplitGroups, "compactor.split-groups", 1, "Number of groups that blocks for splitting should be grouped into. Each group of blocks is then split separately. Number of output split shards is controlled by -compactor.split-and-merge-shards.")
f.IntVar(&l.CompactorTenantShardSize, "compactor.compactor-tenant-shard-size", 0, "Max number of compactors that can compact blocks for single tenant. 0 to disable the limit and use all compactors.")
f.BoolVar(&l.CompactorBlockUploadEnabled, "compactor.block-upload-enabled", false, "Enable block upload API for the tenant.")

// Store-gateway.
f.IntVar(&l.StoreGatewayTenantShardSize, "store-gateway.tenant-shard-size", 0, "The tenant's shard size, used when store-gateway sharding is enabled. Value of 0 disables shuffle sharding for the tenant, that is all tenant blocks are sharded across all store-gateway replicas.")
Expand Down Expand Up @@ -529,6 +531,11 @@ func (o *Overrides) CompactorSplitGroups(userID string) int {
return o.getOverridesForUser(userID).CompactorSplitGroups
}

// CompactorBlockUploadEnabled returns whether block upload is enabled for a certain tenant.
func (o *Overrides) CompactorBlockUploadEnabled(tenantID string) bool {
return o.getOverridesForUser(tenantID).CompactorBlockUploadEnabled
}

// MetricRelabelConfigs returns the metric relabel configs for a given user.
func (o *Overrides) MetricRelabelConfigs(userID string) []*relabel.Config {
return o.getOverridesForUser(userID).MetricRelabelConfigs
Expand Down