Skip to content

Commit

Permalink
Add block compaction delay metric (grafana#7635)
Browse files Browse the repository at this point in the history
Add block compaction delay summary metric

Co-authored-by: Peter Štibraný <pstibrany@gmail.com>
(cherry picked from commit 96952cf)
  • Loading branch information
jhalterman committed Apr 2, 2024
1 parent 4ffcb00 commit 6eda561
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
* [ENHANCEMENT] Store-gateway: Add `stage="wait_max_concurrent"` to `cortex_bucket_store_series_request_stage_duration_seconds` which records how long the query had to wait for its turn for `-blocks-storage.bucket-store.max-concurrent`. #7609
* [ENHANCEMENT] Compactor: Add `cortex_compactor_block_compaction_delay_seconds` metric to track how long it takes to compact blocks. #7635
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
* [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624
Expand Down
17 changes: 17 additions & 0 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -433,6 +434,13 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
// into the next planning cycle.
// Eventually the block we just uploaded should get synced into the job again (including sync-delay).
for _, meta := range toCompact {
attrs, err := block.GetMetaAttributes(ctx, meta, c.bkt)
if err != nil {
level.Warn(jobLogger).Log("msg", "failed to determine block upload time", "block", meta.ULID.String(), "err", err)
} else {
c.metrics.blockCompactionDelay.WithLabelValues(strconv.Itoa(meta.Compaction.Level)).Observe(compactionBegin.Sub(attrs.LastModified).Seconds())
}

if err := deleteBlock(c.bkt, meta.ULID, filepath.Join(subDir, meta.ULID.String()), jobLogger, c.metrics.blocksMarkedForDeletion); err != nil {
return false, nil, errors.Wrapf(err, "mark old block for deletion from bucket")
}
Expand Down Expand Up @@ -645,6 +653,7 @@ type BucketCompactorMetrics struct {
groupCompactionRunsCompleted prometheus.Counter
groupCompactionRunsFailed prometheus.Counter
groupCompactions prometheus.Counter
blockCompactionDelay *prometheus.HistogramVec
compactionBlocksVerificationFailed prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact *prometheus.CounterVec
Expand All @@ -670,6 +679,14 @@ func NewBucketCompactorMetrics(blocksMarkedForDeletion prometheus.Counter, reg p
Name: "cortex_compactor_group_compactions_total",
Help: "Total number of group compaction attempts that resulted in new block(s).",
}),
blockCompactionDelay: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_compactor_block_compaction_delay_seconds",
Help: "Delay between a block being uploaded and successfully compacting it.",
Buckets: []float64{60.0, 300.0, 600.0, 1800.0, 3600.0, 7200.0, 10800.0, 14400.0, 18000.0, 36000.0, 72000.0},
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{"level"}),
compactionBlocksVerificationFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_blocks_verification_failures_total",
Help: "Total number of failures when verifying min/max time ranges of compacted blocks.",
Expand Down
7 changes: 2 additions & 5 deletions pkg/compactor/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"fmt"
"math"
"path"
"sort"
"time"

Expand Down Expand Up @@ -174,11 +173,9 @@ func jobWaitPeriodElapsed(ctx context.Context, job *Job, waitPeriod time.Duratio
continue
}

metaPath := path.Join(meta.ULID.String(), block.MetaFilename)

attrs, err := userBucket.Attributes(ctx, metaPath)
attrs, err := block.GetMetaAttributes(ctx, meta, userBucket)
if err != nil {
return false, meta, errors.Wrapf(err, "unable to get object attributes for %s", metaPath)
return false, meta, err
}

if attrs.LastModified.After(threshold) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/tsdb/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,16 @@ func GatherFileStats(blockDir string) (res []File, _ error) {
return res, err
}

// GetMetaAttributes returns the attributes for the block associated with the meta, using the userBucket to read the attributes.
func GetMetaAttributes(ctx context.Context, meta *Meta, bucketReader objstore.BucketReader) (objstore.ObjectAttributes, error) {
metaPath := path.Join(meta.ULID.String(), MetaFilename)
attrs, err := bucketReader.Attributes(ctx, metaPath)
if err != nil {
return objstore.ObjectAttributes{}, errors.Wrapf(err, "unable to get object attributes for %s", metaPath)
}
return attrs, nil
}

// MarkForNoCompact creates a file which marks block to be not compacted.
func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason NoCompactReason, details string, markedForNoCompact prometheus.Counter) error {
m := path.Join(id.String(), NoCompactMarkFilename)
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/tsdb/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error)
//
// - The block upload is completed: this is the normal case. meta.json file still exists in the
// object storage and it's expected to match the locally cached one (because it's immutable by design).
//
// - The block has been marked for deletion: the deletion hasn't started yet, so the full block (including
// the meta.json file) is still in the object storage. This case is not different than the previous one.
//
Expand Down Expand Up @@ -417,7 +418,7 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*Meta, par
}

// FetchWithoutMarkedForDeletion returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket.
// This function excludes all blocks for deletion (no deletion delay applied).
// This function excludes all blocks marked for deletion (no deletion delay applied).
// It's caller responsibility to not change the returned metadata files. Maps can be modified.
//
// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing.
Expand Down

0 comments on commit 6eda561

Please sign in to comment.