Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add page for displaying compaction jobs computed from bucket-index. #7381

Merged
merged 9 commits into from
Feb 14, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-strict-mode-enabled` to control support for any UTF-8 character as part of Alertmanager configuration/API matchers and labels. It's default value is set to `false`. #6898
* [FEATURE] Querier: added `histogram_avg()` function support to PromQL. #7293
* [FEATURE] Ingester: added `-blocks-storage.tsdb.timely-head-compaction` flag, which enables more timely head compaction, and defaults to `false`. #7372
* [FEATURE] Compactor: Added `/compactor/tenants` and `/compactor/tenant/{tenant}/planned_jobs` endpoints that provide functionality that was provided by `tools/compaction-planner` -- listing of planned compaction jobs based on tenants' bucket index. #7381
* [ENHANCEMENT] Vault: add lifecycle manager for token used to authenticate to Vault. This ensures the client token is always valid. Includes a gauge (`cortex_vault_token_lease_renewal_active`) to check whether token renewal is active, and the counters `cortex_vault_token_lease_renewal_success_total` and `cortex_vault_auth_success_total` to see the total number of successful lease renewals / authentications. #7337
* [ENHANCEMENT] Store-gateway: add no-compact details column on store-gateway tenants admin UI. #6848
* [ENHANCEMENT] PromQL: ignore small errors for bucketQuantile #6766
Expand Down
18 changes: 18 additions & 0 deletions docs/sources/mimir/references/http-api/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ This document groups API endpoints by service. Note that the API endpoints are e
| [Check block upload](#check-block-upload) | Compactor | `GET /api/v1/upload/block/{block}/check` |
| [Tenant delete request](#tenant-delete-request) | Compactor | `POST /compactor/delete_tenant` |
| [Tenant delete status](#tenant-delete-status) | Compactor | `GET /compactor/delete_tenant_status` |
| [Compactor tenants](#compactor-tenants) | Compactor | `GET /compactor/tenants` |
| [Compactor tenant planned jobs](#compactor-tenant-planned-jobs) | Compactor | `GET /compactor/tenant/{tenant}/planned_jobs` |
| [Overrides-exporter ring status](#overrides-exporter-ring-status) | Overrides-exporter | `GET /overrides-exporter/ring` |
{{% /responsive-table %}}

Expand Down Expand Up @@ -1223,6 +1225,22 @@ The `blocks_deleted` field will be set to `true` if all the tenant's blocks have

Requires [authentication](#authentication).

### Compactor tenants

```
GET /compactor/tenants
```

Displays a web page with the list of tenants that have blocks in the storage configured for the compactor.

### Compactor tenant planned jobs

```
GET /compactor/tenant/{tenant}/planned_jobs
```

Displays a web page listing planned compaction jobs computed from the bucket index for the given tenant.

## Overrides-exporter

### Overrides-exporter ring status
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ func (a *API) RegisterStoreGateway(s *storegateway.StoreGateway) {
func (a *API) RegisterCompactor(c *compactor.MultitenantCompactor) {
a.indexPage.AddLinks(defaultWeight, "Compactor", []IndexPageLink{
{Desc: "Ring status", Path: "/compactor/ring"},
{Desc: "Tenants & compaction jobs", Path: "/compactor/tenants"},
})
a.RegisterRoute("/compactor/ring", http.HandlerFunc(c.RingHandler), false, true, "GET", "POST")
a.RegisterRoute("/api/v1/upload/block/{block}/start", http.HandlerFunc(c.StartBlockUpload), true, false, http.MethodPost)
Expand All @@ -388,6 +389,8 @@ func (a *API) RegisterCompactor(c *compactor.MultitenantCompactor) {
a.RegisterRoute("/api/v1/upload/block/{block}/check", http.HandlerFunc(c.GetBlockUploadStateHandler), true, false, http.MethodGet)
a.RegisterRoute("/compactor/delete_tenant", http.HandlerFunc(c.DeleteTenant), true, true, "POST")
a.RegisterRoute("/compactor/delete_tenant_status", http.HandlerFunc(c.DeleteTenantStatus), true, true, "GET")
a.RegisterRoute("/compactor/tenants", http.HandlerFunc(c.TenantsHandler), false, true, "GET")
a.RegisterRoute("/compactor/tenant/{tenant}/planned_jobs", http.HandlerFunc(c.PlannedJobsHandler), false, true, "GET")
}

func (a *API) DisableServerHTTPTimeouts(next http.Handler) http.Handler {
Expand Down
37 changes: 18 additions & 19 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()

// Compute pending compaction jobs based on current index.
splitJobs, mergeJobs, err := c.estimateCompactionJobsFrom(ctx, userID, userBucket, idx)
jobs, err := estimateCompactionJobsFromBucketIndex(ctx, userID, userBucket, idx, c.cfg.CompactionBlockRanges, c.cfgProvider.CompactorSplitAndMergeShards(userID), c.cfgProvider.CompactorSplitGroups(userID))
if err != nil {
// When compactor is shutting down, we get context cancellation. There's no reason to report that as error.
if !errors.Is(err, context.Canceled) {
Expand All @@ -483,13 +483,26 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageMerge))
}
} else {
splitJobs, mergeJobs := computeSplitAndMergeJobs(jobs)

c.bucketIndexCompactionJobs.WithLabelValues(userID, string(stageSplit)).Set(float64(splitJobs))
c.bucketIndexCompactionJobs.WithLabelValues(userID, string(stageMerge)).Set(float64(mergeJobs))
}

return nil
}

func computeSplitAndMergeJobs(jobs []*Job) (splitJobs int, mergeJobs int) {
for _, j := range jobs {
if j.UseSplitting() {
splitJobs++
} else {
mergeJobs++
}
}
return splitJobs, mergeJobs
}

// Concurrently deletes blocks marked for deletion, and removes blocks from index.
func (c *BlocksCleaner) deleteBlocksMarkedForDeletion(ctx context.Context, idx *bucketindex.Index, userBucket objstore.Bucket, userLogger log.Logger) {
blocksToDelete := make([]ulid.ULID, 0, len(idx.BlockDeletionMarks))
Expand Down Expand Up @@ -669,7 +682,7 @@ func stalePartialBlockLastModifiedTime(ctx context.Context, blockID ulid.ULID, u
return lastModified, err
}

func (c *BlocksCleaner) estimateCompactionJobsFrom(ctx context.Context, userID string, userBucket objstore.InstrumentedBucket, idx *bucketindex.Index) (int, int, error) {
func estimateCompactionJobsFromBucketIndex(ctx context.Context, userID string, userBucket objstore.InstrumentedBucket, idx *bucketindex.Index, compactionBlockRanges mimir_tsdb.DurationList, mergeShards int, splitGroups int) ([]*Job, error) {
metas := convertBucketIndexToMetasForCompactionJobPlanning(idx)

// We need to pass this metric to MetadataFilters, but we don't need to report this value from BlocksCleaner.
Expand All @@ -682,27 +695,13 @@ func (c *BlocksCleaner) estimateCompactionJobsFrom(ctx context.Context, userID s
} {
err := f.Filter(ctx, metas, synced)
if err != nil {
return 0, 0, err
return nil, err
}
}

grouper := NewSplitAndMergeGrouper(userID, c.cfg.CompactionBlockRanges.ToMilliseconds(), uint32(c.cfgProvider.CompactorSplitAndMergeShards(userID)), uint32(c.cfgProvider.CompactorSplitGroups(userID)), log.NewNopLogger())
grouper := NewSplitAndMergeGrouper(userID, compactionBlockRanges.ToMilliseconds(), uint32(mergeShards), uint32(splitGroups), log.NewNopLogger())
jobs, err := grouper.Groups(metas)
if err != nil {
return 0, 0, err
}

split := 0
merge := 0
for _, j := range jobs {
if j.UseSplitting() {
split++
} else {
merge++
}
}

return split, merge, nil
return jobs, err
}

// Convert index into map of block Metas, but ignore blocks marked for deletion.
Expand Down
9 changes: 3 additions & 6 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,10 +1045,6 @@ func TestComputeCompactionJobs(t *testing.T) {

const user = "test"

cfgProvider := newMockConfigProvider()
cfgProvider.splitGroups[user] = 0 // No grouping of jobs for split-compaction. All jobs will be in single split compaction.
cfgProvider.splitAndMergeShards[user] = 3

twoHoursMS := 2 * time.Hour.Milliseconds()
dayMS := 24 * time.Hour.Milliseconds()

Expand Down Expand Up @@ -1077,9 +1073,10 @@ func TestComputeCompactionJobs(t *testing.T) {
// Mark block for no-compaction.
require.NoError(t, block.MarkForNoCompact(context.Background(), log.NewNopLogger(), userBucket, blockMarkedForNoCompact, block.CriticalNoCompactReason, "testing", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))

cleaner := NewBlocksCleaner(cfg, bucketClient, tsdb.AllUsers, cfgProvider, log.NewNopLogger(), nil)
split, merge, err := cleaner.estimateCompactionJobsFrom(context.Background(), user, userBucket, &index)
// No grouping of jobs for split-compaction. All jobs will be in single split compaction.
jobs, err := estimateCompactionJobsFromBucketIndex(context.Background(), user, userBucket, &index, cfg.CompactionBlockRanges, 3, 0)
require.NoError(t, err)
split, merge := computeSplitAndMergeJobs(jobs)
require.Equal(t, 1, split)
require.Equal(t, 2, merge)
}
Expand Down
25 changes: 23 additions & 2 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,8 @@ type shardingStrategy interface {
// blocksCleanerOwnsUser must be concurrency-safe
blocksCleanerOwnsUser(userID string) (bool, error)
ownJob(job *Job) (bool, error)
// instanceOwningJob returns instance owning the job based on ring. It ignores per-instance allowed tenants.
instanceOwningJob(job *Job) (ring.InstanceDesc, error)
}

// splitAndMergeShardingStrategy is used by split-and-merge compactor when configured with sharding.
Expand Down Expand Up @@ -876,14 +878,33 @@ func (s *splitAndMergeShardingStrategy) ownJob(job *Job) (bool, error) {
return instanceOwnsTokenInRing(r, s.ringLifecycler.GetInstanceAddr(), job.ShardingKey())
}

func instanceOwnsTokenInRing(r ring.ReadRing, instanceAddr string, key string) (bool, error) {
func (s *splitAndMergeShardingStrategy) instanceOwningJob(job *Job) (ring.InstanceDesc, error) {
r := s.ring.ShuffleShard(job.UserID(), s.configProvider.CompactorTenantShardSize(job.UserID()))

rs, err := instancesForKey(r, job.ShardingKey())
if err != nil {
return ring.InstanceDesc{}, err
}

if len(rs.Instances) != 1 {
return ring.InstanceDesc{}, fmt.Errorf("unexpected number of compactors in the shard (expected 1, got %d)", len(rs.Instances))
}

return rs.Instances[0], nil
}

func instancesForKey(r ring.ReadRing, key string) (ring.ReplicationSet, error) {
// Hash the key.
hasher := fnv.New32a()
_, _ = hasher.Write([]byte(key))
hash := hasher.Sum32()

return r.Get(hash, RingOp, nil, nil, nil)
}

func instanceOwnsTokenInRing(r ring.ReadRing, instanceAddr string, key string) (bool, error) {
// Check whether this compactor instance owns the token.
rs, err := r.Get(hash, RingOp, nil, nil, nil)
rs, err := instancesForKey(r, key)
if err != nil {
return false, err
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/compactor/compactor_tenants.gohtml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{{- /*gotype: github.com/grafana/mimir/pkg/compactor.tenantsPageContents */ -}}
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Compactor: bucket tenants</title>
</head>
<body>
<h1>Compactor: bucket tenants</h1>
<p>Current time: {{ .Now }}</p>
<table border="1" cellpadding="5" style="border-collapse: collapse">
<thead>
<tr>
<th>Tenant</th>
</tr>
</thead>
<tbody style="font-family: monospace;">
{{ range .Tenants }}
<tr>
<td><a href="tenant/{{ . }}/planned_jobs">{{ . }}</a></td>
</tr>
{{ end }}
</tbody>
</table>
</body>
</html>
79 changes: 79 additions & 0 deletions pkg/compactor/planned_jobs.gohtml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
{{- /*gotype: github.com/grafana/mimir/pkg/compactor.plannerJobsContent */ -}}
<!DOCTYPE html>
<html xmlns="http://www.w3.org/1999/html">
<head>
<meta charset="UTF-8">
<title>Compactor: compaction jobs based on bucket-index</title>
</head>
<body style="padding: 1em;">
<h1>Compaction jobs based on bucket-index</h1>
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved
<p>
This page shows compaction jobs computed from the bucket index. This is not an up-to-date view of compaction jobs.
Compactors owning the job are computed using the current <a href="/compactor/ring">compactor ring</a>, and ignore <code>-compactor.enabled-tenants</code> and <code>-compactor.disabled-tenants</code> configuration.
</p>
<ul>
<li>Current time: {{ .Now }}</li>
<li>Tenant: <strong>{{ .Tenant }}</strong></li>
<li>Bucket index last updated: {{ .BucketIndexUpdated }}</li>
<li>Tenant Split groups: {{ .TenantSplitGroups }}</li>
<li>Tenant Merge shards: {{ .TenantMergeShards }}</li>
</ul>

<hr />

<form>
<input type="checkbox" id="show-blocks" name="show_blocks" {{ if .ShowBlocks }} checked {{ end }}>&nbsp;<label for="show-blocks">Show Blocks</label>&nbsp;&nbsp;
<input type="checkbox" id="show-compactors" name="show_compactors" {{ if .ShowCompactors }} checked {{ end }}>&nbsp;<label for="show-compactors">Show Compactors</label>&nbsp;&nbsp;
<label for="split-groups">Split groups:</label>&nbsp;<input id="split-groups" name="split_groups" type="text" value="{{ .SplitGroups }}" style="width: 6em;"/>&nbsp;&nbsp;
<label for="merge-shards">Merge shards:</label>&nbsp;<input id="merge-shards" name="merge_shards" type="text" value="{{ .MergeShards }}" style="width: 6em;"/>&nbsp;&nbsp;
<button type="submit" style="background-color: lightgrey;">
<span style="padding: 0.5em 1em; font-size: 125%;">Reload</span>
</button>
</form>

<hr />
<p>Total jobs:</p>
<ul>
<li>Split jobs: {{ .SplitJobsCount }}</li>
<li>Merge jobs: {{ .MergeJobsCount }}</li>
</ul>

<table border="1" cellpadding="5" style="border-collapse: collapse;">
<thead>
<tr>
<th>Job Number</th>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Nit] Don't we use sentence case also in our UI titles?

<th>Start Time</th>
<th>End Time</th>
<th>Number of Blocks</th>
<th>Job Key</th>
{{ if .ShowCompactors }}
<th title="Compactor that owns this job based on ring">Compactor</th>{{ end }}
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
{{ if .ShowBlocks }}
<th>Blocks</th>{{ end }}
</tr>
</thead>
<tbody style="font-family: monospace;">
{{ $page := . }}
{{ range $index, $job := .PlannedJobs }}
<tr>
<td>{{ add $index 1}}</td>
<td>{{ .MinTime }}</td>
<td>{{ .MaxTime }}</td>
<td>{{ len .Blocks }}</td>
<td>{{ $job.Key }}</td>
{{ if $page.ShowCompactors }}
<td>{{ .Compactor }}</td>{{ end }}
{{ if $page.ShowBlocks }}
<td>
{{ range $i, $b := .Blocks }}
{{ if $i }}<br>{{ end }}
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved
{{ $b }}
{{ end }}
</td>
{{ end }}
</tr>
{{ end }}
</tbody>
</table>
</body>
</html>
Loading
Loading