Skip to content

Commit

Permalink
compactor set index stats on block
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed Jun 13, 2023
1 parent a0b37fd commit 4ef25a6
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 12 deletions.
12 changes: 12 additions & 0 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ type Thanos struct {

// Rewrites is present when any rewrite (deletion, relabel etc) were applied to this block. Optional.
Rewrites []Rewrite `json:"rewrites,omitempty"`

IndexStats IndexStats `json:"index_stats,omitempty"`
}

type IndexStats struct {
SeriesMinSize int64 `json:"series_min_size,omitempty"`
SeriesMaxSize int64 `json:"series_max_size,omitempty"`
SeriesAvgSize int64 `json:"series_avg_size,omitempty"`

ChunkMinSize int64 `json:"chunk_min_size,omitempty"`
ChunkMaxSize int64 `json:"chunk_max_size,omitempty"`
ChunkAvgSize int64 `json:"chunk_avg_size,omitempty"`
}

type Rewrite struct {
Expand Down
42 changes: 30 additions & 12 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ func outOfOrderChunkError(err error, brokenBlock ulid.ULID) OutOfOrderChunksErro
return OutOfOrderChunksError{err: err, id: brokenBlock}
}

// IsOutOfOrderChunk returns true if the base error is a OutOfOrderChunkError.
// IsOutOfOrderChunkError returns true if the base error is a OutOfOrderChunkError.
func IsOutOfOrderChunkError(err error) bool {
_, ok := errors.Cause(err).(OutOfOrderChunksError)
return ok
Expand Down Expand Up @@ -1100,28 +1100,46 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
bdir := filepath.Join(dir, compID.String())
index := filepath.Join(bdir, block.IndexFilename)

newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{
Labels: cg.labels.Map(),
Downsample: metadata.ThanosDownsample{Resolution: cg.resolution},
Source: metadata.CompactorSource,
SegmentFiles: block.GetSegmentFiles(bdir),
}, nil)
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir)
if err := os.Remove(filepath.Join(bdir, "tombstones")); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "remove tombstones")
}

if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "remove tombstones")
newMeta, err := metadata.ReadFromDir(bdir)
if err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "read new meta")
}

var stats block.HealthStats
// Ensure the output block is valid.
err = tracing.DoInSpanWithErr(ctx, "compaction_verify_index", func(ctx context.Context) error {
return block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime)
stats, err = block.GatherIndexHealthStats(cg.logger, index, newMeta.MinTime, newMeta.MaxTime)
if err != nil {
return err
}
return stats.AnyErr()
})
if !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

newMeta, err = metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{
Labels: cg.labels.Map(),
Downsample: metadata.ThanosDownsample{Resolution: cg.resolution},
Source: metadata.CompactorSource,
SegmentFiles: block.GetSegmentFiles(bdir),
IndexStats: metadata.IndexStats{
SeriesMinSize: stats.SeriesMinSize,
SeriesMaxSize: stats.SeriesMaxSize,
SeriesAvgSize: stats.SeriesAvgSize,
ChunkMinSize: stats.ChunkMinSize,
ChunkMaxSize: stats.ChunkMaxSize,
ChunkAvgSize: stats.ChunkAvgSize,
},
}, nil)
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir)
}

// Ensure the output block is not overlapping with anything else,
// unless vertical compaction is enabled.
if !cg.enableVerticalCompaction {
Expand Down

0 comments on commit 4ef25a6

Please sign in to comment.