diff --git a/CHANGELOG.md b/CHANGELOG.md index 9aeefb24a24..e4e03c75e71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ * [FEATURE] Alertmanager: Added `-alertmanager.utf8-strict-mode-enabled` to control support for any UTF-8 character as part of Alertmanager configuration/API matchers and labels. It's default value is set to `false`. #6898 * [FEATURE] Querier: added `histogram_avg()` function support to PromQL. #7293 * [FEATURE] Ingester: added `-blocks-storage.tsdb.timely-head-compaction` flag, which enables more timely head compaction, and defaults to `false`. #7372 +* [FEATURE] Compactor: Added `/compactor/tenants` and `/compactor/tenant/{tenant}/planned_jobs` endpoints that provide functionality that was provided by `tools/compaction-planner` -- listing of planned compaction jobs based on tenants' bucket index. #7381 * [ENHANCEMENT] Vault: add lifecycle manager for token used to authenticate to Vault. This ensures the client token is always valid. Includes a gauge (`cortex_vault_token_lease_renewal_active`) to check whether token renewal is active, and the counters `cortex_vault_token_lease_renewal_success_total` and `cortex_vault_auth_success_total` to see the total number of successful lease renewals / authentications. #7337 * [ENHANCEMENT] Store-gateway: add no-compact details column on store-gateway tenants admin UI. #6848 * [ENHANCEMENT] PromQL: ignore small errors for bucketQuantile #6766 diff --git a/docs/sources/mimir/references/http-api/index.md b/docs/sources/mimir/references/http-api/index.md index ade00fd54b5..00cceb8c399 100644 --- a/docs/sources/mimir/references/http-api/index.md +++ b/docs/sources/mimir/references/http-api/index.md @@ -97,6 +97,8 @@ This document groups API endpoints by service. Note that the API endpoints are e | [Check block upload](#check-block-upload) | Compactor | `GET /api/v1/upload/block/{block}/check` | | [Tenant delete request](#tenant-delete-request) | Compactor | `POST /compactor/delete_tenant` | | [Tenant delete status](#tenant-delete-status) | Compactor | `GET /compactor/delete_tenant_status` | +| [Compactor tenants](#compactor-tenants) | Compactor | `GET /compactor/tenants` | +| [Compactor tenant planned jobs](#compactor-tenant-planned-jobs) | Compactor | `GET /compactor/tenant/{tenant}/planned_jobs` | | [Overrides-exporter ring status](#overrides-exporter-ring-status) | Overrides-exporter | `GET /overrides-exporter/ring` | {{% /responsive-table %}} @@ -1223,6 +1225,22 @@ The `blocks_deleted` field will be set to `true` if all the tenant's blocks have Requires [authentication](#authentication). +### Compactor tenants + +``` +GET /compactor/tenants +``` + +Displays a web page with the list of tenants that have blocks in the storage configured for the compactor. + +### Compactor tenant planned jobs + +``` +GET /compactor/tenant/{tenant}/planned_jobs +``` + +Displays a web page listing planned compaction jobs computed from the bucket index for the given tenant. + ## Overrides-exporter ### Overrides-exporter ring status diff --git a/pkg/api/api.go b/pkg/api/api.go index d814cf67a16..8349897524b 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -380,6 +380,7 @@ func (a *API) RegisterStoreGateway(s *storegateway.StoreGateway) { func (a *API) RegisterCompactor(c *compactor.MultitenantCompactor) { a.indexPage.AddLinks(defaultWeight, "Compactor", []IndexPageLink{ {Desc: "Ring status", Path: "/compactor/ring"}, + {Desc: "Tenants & compaction jobs", Path: "/compactor/tenants"}, }) a.RegisterRoute("/compactor/ring", http.HandlerFunc(c.RingHandler), false, true, "GET", "POST") a.RegisterRoute("/api/v1/upload/block/{block}/start", http.HandlerFunc(c.StartBlockUpload), true, false, http.MethodPost) @@ -388,6 +389,8 @@ func (a *API) RegisterCompactor(c *compactor.MultitenantCompactor) { a.RegisterRoute("/api/v1/upload/block/{block}/check", http.HandlerFunc(c.GetBlockUploadStateHandler), true, false, http.MethodGet) a.RegisterRoute("/compactor/delete_tenant", http.HandlerFunc(c.DeleteTenant), true, true, "POST") a.RegisterRoute("/compactor/delete_tenant_status", http.HandlerFunc(c.DeleteTenantStatus), true, true, "GET") + a.RegisterRoute("/compactor/tenants", http.HandlerFunc(c.TenantsHandler), false, true, "GET") + a.RegisterRoute("/compactor/tenant/{tenant}/planned_jobs", http.HandlerFunc(c.PlannedJobsHandler), false, true, "GET") } func (a *API) DisableServerHTTPTimeouts(next http.Handler) http.Handler { diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index b212cb4514a..1439182f934 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -473,7 +473,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime() // Compute pending compaction jobs based on current index. - splitJobs, mergeJobs, err := c.estimateCompactionJobsFrom(ctx, userID, userBucket, idx) + jobs, err := estimateCompactionJobsFromBucketIndex(ctx, userID, userBucket, idx, c.cfg.CompactionBlockRanges, c.cfgProvider.CompactorSplitAndMergeShards(userID), c.cfgProvider.CompactorSplitGroups(userID)) if err != nil { // When compactor is shutting down, we get context cancellation. There's no reason to report that as error. if !errors.Is(err, context.Canceled) { @@ -483,6 +483,8 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageMerge)) } } else { + splitJobs, mergeJobs := computeSplitAndMergeJobs(jobs) + c.bucketIndexCompactionJobs.WithLabelValues(userID, string(stageSplit)).Set(float64(splitJobs)) c.bucketIndexCompactionJobs.WithLabelValues(userID, string(stageMerge)).Set(float64(mergeJobs)) } @@ -490,6 +492,17 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger return nil } +func computeSplitAndMergeJobs(jobs []*Job) (splitJobs int, mergeJobs int) { + for _, j := range jobs { + if j.UseSplitting() { + splitJobs++ + } else { + mergeJobs++ + } + } + return splitJobs, mergeJobs +} + // Concurrently deletes blocks marked for deletion, and removes blocks from index. func (c *BlocksCleaner) deleteBlocksMarkedForDeletion(ctx context.Context, idx *bucketindex.Index, userBucket objstore.Bucket, userLogger log.Logger) { blocksToDelete := make([]ulid.ULID, 0, len(idx.BlockDeletionMarks)) @@ -669,7 +682,7 @@ func stalePartialBlockLastModifiedTime(ctx context.Context, blockID ulid.ULID, u return lastModified, err } -func (c *BlocksCleaner) estimateCompactionJobsFrom(ctx context.Context, userID string, userBucket objstore.InstrumentedBucket, idx *bucketindex.Index) (int, int, error) { +func estimateCompactionJobsFromBucketIndex(ctx context.Context, userID string, userBucket objstore.InstrumentedBucket, idx *bucketindex.Index, compactionBlockRanges mimir_tsdb.DurationList, mergeShards int, splitGroups int) ([]*Job, error) { metas := convertBucketIndexToMetasForCompactionJobPlanning(idx) // We need to pass this metric to MetadataFilters, but we don't need to report this value from BlocksCleaner. @@ -682,27 +695,13 @@ func (c *BlocksCleaner) estimateCompactionJobsFrom(ctx context.Context, userID s } { err := f.Filter(ctx, metas, synced) if err != nil { - return 0, 0, err + return nil, err } } - grouper := NewSplitAndMergeGrouper(userID, c.cfg.CompactionBlockRanges.ToMilliseconds(), uint32(c.cfgProvider.CompactorSplitAndMergeShards(userID)), uint32(c.cfgProvider.CompactorSplitGroups(userID)), log.NewNopLogger()) + grouper := NewSplitAndMergeGrouper(userID, compactionBlockRanges.ToMilliseconds(), uint32(mergeShards), uint32(splitGroups), log.NewNopLogger()) jobs, err := grouper.Groups(metas) - if err != nil { - return 0, 0, err - } - - split := 0 - merge := 0 - for _, j := range jobs { - if j.UseSplitting() { - split++ - } else { - merge++ - } - } - - return split, merge, nil + return jobs, err } // Convert index into map of block Metas, but ignore blocks marked for deletion. diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 5eae5679f55..2e883184f5c 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -1045,10 +1045,6 @@ func TestComputeCompactionJobs(t *testing.T) { const user = "test" - cfgProvider := newMockConfigProvider() - cfgProvider.splitGroups[user] = 0 // No grouping of jobs for split-compaction. All jobs will be in single split compaction. - cfgProvider.splitAndMergeShards[user] = 3 - twoHoursMS := 2 * time.Hour.Milliseconds() dayMS := 24 * time.Hour.Milliseconds() @@ -1077,9 +1073,10 @@ func TestComputeCompactionJobs(t *testing.T) { // Mark block for no-compaction. require.NoError(t, block.MarkForNoCompact(context.Background(), log.NewNopLogger(), userBucket, blockMarkedForNoCompact, block.CriticalNoCompactReason, "testing", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) - cleaner := NewBlocksCleaner(cfg, bucketClient, tsdb.AllUsers, cfgProvider, log.NewNopLogger(), nil) - split, merge, err := cleaner.estimateCompactionJobsFrom(context.Background(), user, userBucket, &index) + // No grouping of jobs for split-compaction. All jobs will be in single split compaction. + jobs, err := estimateCompactionJobsFromBucketIndex(context.Background(), user, userBucket, &index, cfg.CompactionBlockRanges, 3, 0) require.NoError(t, err) + split, merge := computeSplitAndMergeJobs(jobs) require.Equal(t, 1, split) require.Equal(t, 2, merge) } diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 5153a2aa411..ef54b33524b 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -820,6 +820,8 @@ type shardingStrategy interface { // blocksCleanerOwnsUser must be concurrency-safe blocksCleanerOwnsUser(userID string) (bool, error) ownJob(job *Job) (bool, error) + // instanceOwningJob returns instance owning the job based on ring. It ignores per-instance allowed tenants. + instanceOwningJob(job *Job) (ring.InstanceDesc, error) } // splitAndMergeShardingStrategy is used by split-and-merge compactor when configured with sharding. @@ -876,14 +878,33 @@ func (s *splitAndMergeShardingStrategy) ownJob(job *Job) (bool, error) { return instanceOwnsTokenInRing(r, s.ringLifecycler.GetInstanceAddr(), job.ShardingKey()) } -func instanceOwnsTokenInRing(r ring.ReadRing, instanceAddr string, key string) (bool, error) { +func (s *splitAndMergeShardingStrategy) instanceOwningJob(job *Job) (ring.InstanceDesc, error) { + r := s.ring.ShuffleShard(job.UserID(), s.configProvider.CompactorTenantShardSize(job.UserID())) + + rs, err := instancesForKey(r, job.ShardingKey()) + if err != nil { + return ring.InstanceDesc{}, err + } + + if len(rs.Instances) != 1 { + return ring.InstanceDesc{}, fmt.Errorf("unexpected number of compactors in the shard (expected 1, got %d)", len(rs.Instances)) + } + + return rs.Instances[0], nil +} + +func instancesForKey(r ring.ReadRing, key string) (ring.ReplicationSet, error) { // Hash the key. hasher := fnv.New32a() _, _ = hasher.Write([]byte(key)) hash := hasher.Sum32() + return r.Get(hash, RingOp, nil, nil, nil) +} + +func instanceOwnsTokenInRing(r ring.ReadRing, instanceAddr string, key string) (bool, error) { // Check whether this compactor instance owns the token. - rs, err := r.Get(hash, RingOp, nil, nil, nil) + rs, err := instancesForKey(r, key) if err != nil { return false, err } diff --git a/pkg/compactor/compactor_tenants.gohtml b/pkg/compactor/compactor_tenants.gohtml new file mode 100644 index 00000000000..73bb4cb3bea --- /dev/null +++ b/pkg/compactor/compactor_tenants.gohtml @@ -0,0 +1,26 @@ +{{- /*gotype: github.com/grafana/mimir/pkg/compactor.tenantsPageContents */ -}} + + +
+ +Current time: {{ .Now }}
+Tenant | +
---|
{{ . }} | +
+ This page shows compaction jobs computed from the bucket index. This is not an up-to-date view of compaction jobs.
+ Compactors owning the job are computed using the current compactor ring, and ignore -compactor.enabled-tenants
and -compactor.disabled-tenants
configuration.
+
Total jobs:
+Job Number | +Start Time | +End Time | +Number of Blocks | +Job Key | + {{ if .ShowCompactors }} +Compactor | {{ end }} + {{ if .ShowBlocks }} +Blocks | {{ end }} +
---|---|---|---|---|---|---|
{{ add $index 1}} | +{{ .MinTime }} | +{{ .MaxTime }} | +{{ len .Blocks }} | +{{ $job.Key }} | + {{ if $page.ShowCompactors }} +{{ .Compactor }} | {{ end }} + {{ if $page.ShowBlocks }} +
+ {{ range $i, $b := .Blocks }}
+ {{ if $i }} {{ end }} + {{ $b }} + {{ end }} + |
+ {{ end }}
+