Skip to content

Commit

Permalink
verify: Added extra block statistic; print on debug when using index_…
Browse files Browse the repository at this point in the history
…issue

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Oct 30, 2020
1 parent 664e87f commit 93c3985
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 104 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type reloaderConfig struct {

func (rc *reloaderConfig) registerFlag(cmd extkingpin.FlagClause) *reloaderConfig {
cmd.Flag("reloader.config-file",
"Config file watched by the reloader.").
"Context file watched by the reloader.").
Default("").StringVar(&rc.confFile)
cmd.Flag("reloader.config-envsubst-file",
"Output file for environment variable substituted config file.").
Expand Down
14 changes: 3 additions & 11 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,6 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path
return err
}

ctx := context.Background()
v := verifier.NewManager(reg, verifier.Config{
Logger: logger,
Bkt: bkt,
BackupBkt: backupBkt,
Fetcher: fetcher,
DeleteDelay: time.Duration(*deleteDelay),
}, r)

var idMatcher func(ulid.ULID) bool = nil
if len(*ids) > 0 {
idsMap := map[string]struct{}{}
Expand All @@ -164,11 +155,12 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path
}
}

v := verifier.NewManager(reg, logger, bkt, backupBkt, fetcher, time.Duration(*deleteDelay), r)
if *repair {
return v.VerifyAndRepair(ctx, idMatcher)
return v.VerifyAndRepair(context.Background(), idMatcher)
}

return v.Verify(ctx, idMatcher)
return v.Verify(context.Background(), idMatcher)
})
}

Expand Down
117 changes: 106 additions & 11 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"hash/crc32"
"math"
"math/rand"
"path/filepath"
"sort"
Expand All @@ -29,17 +30,17 @@ import (

// VerifyIndex does a full run over a block index and verifies that it fulfills the order invariants.
func VerifyIndex(logger log.Logger, fn string, minTime int64, maxTime int64) error {
stats, err := GatherIndexIssueStats(logger, fn, minTime, maxTime)
stats, err := GatherIndexHealthStats(logger, fn, minTime, maxTime)
if err != nil {
return err
}

return stats.AnyErr()
}

type Stats struct {
type HealthStats struct {
// TotalSeries represents total number of series in block.
TotalSeries int
TotalSeries int64
// OutOfOrderSeries represents number of series that have out of order chunks.
OutOfOrderSeries int

Expand All @@ -60,12 +61,34 @@ type Stats struct {
// OutOfOrderLabels represents the number of postings that contained out
// of order labels, a bug present in Prometheus 2.8.0 and below.
OutOfOrderLabels int

// Debug Statistics.
SeriesMinLifeDuration time.Duration
SeriesAvgLifeDuration time.Duration
SeriesMaxLifeDuration time.Duration

SeriesMinChunks int64
SeriesAvgChunks int64
SeriesMaxChunks int64

TotalChunks int64

ChunkMinDuration time.Duration
ChunkAvgDuration time.Duration
ChunkMaxDuration time.Duration

ChunkMinSize int64
ChunkAvgSize int64
ChunkMaxSize int64

SingleSampleSeries int64
SingleSampleChunks int64
}

// PrometheusIssue5372Err returns an error if the Stats object indicates
// PrometheusIssue5372Err returns an error if the HealthStats object indicates
// postings with out of order labels. This is corrected by Prometheus Issue
// #5372 and affects Prometheus versions 2.8.0 and below.
func (i Stats) PrometheusIssue5372Err() error {
func (i HealthStats) PrometheusIssue5372Err() error {
if i.OutOfOrderLabels > 0 {
return errors.Errorf("index contains %d postings with out of order labels",
i.OutOfOrderLabels)
Expand All @@ -74,15 +97,15 @@ func (i Stats) PrometheusIssue5372Err() error {
}

// Issue347OutsideChunksErr returns error if stats indicates issue347 block issue, that is repaired explicitly before compaction (on plan block).
func (i Stats) Issue347OutsideChunksErr() error {
func (i HealthStats) Issue347OutsideChunksErr() error {
if i.Issue347OutsideChunks > 0 {
return errors.Errorf("found %d chunks outside the block time range introduced by https://github.com/prometheus/tsdb/issues/347", i.Issue347OutsideChunks)
}
return nil
}

// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
func (i Stats) CriticalErr() error {
func (i HealthStats) CriticalErr() error {
var errMsg []string

if i.OutOfOrderSeries > 0 {
Expand Down Expand Up @@ -113,7 +136,7 @@ func (i Stats) CriticalErr() error {
}

// AnyErr returns error if stats indicates any block issue.
func (i Stats) AnyErr() error {
func (i HealthStats) AnyErr() error {
var errMsg []string

if err := i.CriticalErr(); err != nil {
Expand All @@ -135,11 +158,41 @@ func (i Stats) AnyErr() error {
return nil
}

// GatherIndexIssueStats returns useful counters as well as outsider chunks (chunks outside of block time range) that
type minMaxSumInt64 struct {
sum int64
min int64
max int64

cnt int64
}

func newMinMaxSumInt64() minMaxSumInt64 {
return minMaxSumInt64{
min: math.MaxInt64,
max: math.MinInt64,
}
}

func (n *minMaxSumInt64) Add(v int64) {
n.cnt++
n.sum += v
if n.min > v {
n.min = v
}
if n.max < v {
n.max = v
}
}

func (n *minMaxSumInt64) Avg() int64 {
return n.sum / n.cnt
}

// GatherIndexHealthStats returns useful counters as well as outsider chunks (chunks outside of block time range) that
// helps to assess index health.
// It considers https://github.com/prometheus/tsdb/issues/347 as something that Thanos can handle.
// See Stats.Issue347OutsideChunks for details.
func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime int64) (stats Stats, err error) {
// See HealthStats.Issue347OutsideChunks for details.
func GatherIndexHealthStats(logger log.Logger, fn string, minTime int64, maxTime int64) (stats HealthStats, err error) {
r, err := index.NewFileReader(fn)
if err != nil {
return stats, errors.Wrap(err, "open index file")
Expand All @@ -154,6 +207,11 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime
lastLset labels.Labels
lset labels.Labels
chks []chunks.Meta

seriesLifeDuration = newMinMaxSumInt64()
seriesChunks = newMinMaxSumInt64()
chunkDuration = newMinMaxSumInt64()
chunkSize = newMinMaxSumInt64()
)

// Per series.
Expand Down Expand Up @@ -189,8 +247,23 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime
}

ooo := 0
seriesLifeTimeMs := int64(0)
// Per chunk in series.
for i, c := range chks {
stats.TotalChunks++

chkDur := c.MaxTime - c.MinTime
seriesLifeTimeMs += chkDur
chunkDuration.Add(chkDur)
if chkDur == 0 {
stats.SingleSampleChunks++
}

// Approximate size.
if i < len(chks)-2 {
chunkSize.Add(int64(chks[i+1].Ref - c.Ref))
}

// Chunk vs the block ranges.
if c.MinTime < minTime || c.MaxTime > maxTime {
stats.OutsideChunks++
Expand Down Expand Up @@ -226,11 +299,33 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime
stats.OutOfOrderSeries++
stats.OutOfOrderChunks += ooo
}

seriesChunks.Add(int64(len(chks)))
seriesLifeDuration.Add(seriesLifeTimeMs)

if seriesLifeTimeMs == 0 {
stats.SingleSampleSeries++
}
}
if p.Err() != nil {
return stats, errors.Wrap(err, "walk postings")
}

stats.SeriesMaxLifeDuration = time.Duration(seriesLifeDuration.max) * time.Millisecond
stats.SeriesAvgLifeDuration = time.Duration(seriesLifeDuration.Avg()) * time.Millisecond
stats.SeriesMinLifeDuration = time.Duration(seriesLifeDuration.min) * time.Millisecond

stats.SeriesMaxChunks = seriesChunks.max
stats.SeriesAvgChunks = seriesChunks.Avg()
stats.SeriesMinChunks = seriesChunks.min

stats.ChunkMaxSize = chunkSize.max
stats.ChunkAvgSize = chunkSize.Avg()
stats.ChunkMinSize = chunkSize.min

stats.ChunkMaxDuration = time.Duration(chunkDuration.max) * time.Millisecond
stats.ChunkAvgDuration = time.Duration(chunkDuration.Avg()) * time.Millisecond
stats.ChunkMinDuration = time.Duration(chunkDuration.min) * time.Millisecond
return stats, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
}

// Ensure all input blocks are valid.
stats, err := block.GatherIndexIssueStats(cg.logger, filepath.Join(pdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
stats, err := block.GatherIndexHealthStats(cg.logger, filepath.Join(pdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", pdir)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/objstore/azure/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestConfig_validate(t *testing.T) {
}
err := conf.validate()
if (err != nil) != tt.wantErr {
t.Errorf("Config.validate() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("Context.validate() error = %v, wantErr %v", err, tt.wantErr)
} else {
testutil.Equals(t, tt.wantEndpoint, conf.Endpoint)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/objstore/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B

default:
sseErrMsg := errors.Errorf("Unsupported type %q was provided. Supported types are SSE-S3, SSE-KMS, SSE-C", config.SSEConfig.Type)
return nil, errors.Wrap(sseErrMsg, "Initialize s3 client SSE Config")
return nil, errors.Wrap(sseErrMsg, "Initialize s3 client SSE Context")
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/store/cache/caching_bucket_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type CachingWithBackendConfig struct {
// How long to cache result of Iter call in root directory.
BlocksIterTTL time.Duration `yaml:"blocks_iter_ttl"`

// Config for Exists and Get operations for metadata files.
// Context for Exists and Get operations for metadata files.
MetafileExistsTTL time.Duration `yaml:"metafile_exists_ttl"`
MetafileDoesntExistTTL time.Duration `yaml:"metafile_doesnt_exist_ttl"`
MetafileContentTTL time.Duration `yaml:"metafile_content_ttl"`
Expand Down
21 changes: 9 additions & 12 deletions pkg/verifier/duplicated_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
package verifier

import (
"context"

"fmt"

"strings"
"time"

Expand All @@ -26,14 +23,14 @@ type DuplicatedCompactionBlocks struct{}

func (DuplicatedCompactionBlocks) IssueID() string { return "duplicated_compaction" }

func (DuplicatedCompactionBlocks) VerifyRepair(ctx context.Context, conf Config, idMatcher func(ulid.ULID) bool, repair bool) error {
func (DuplicatedCompactionBlocks) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool, repair bool) error {
if idMatcher != nil {
return errors.Errorf("id matching is not supported")
}

level.Info(conf.Logger).Log("msg", "started verifying issue", "with-repair", repair)
level.Info(ctx.Logger).Log("msg", "started verifying issue", "with-repair", repair)

overlaps, err := fetchOverlaps(ctx, conf.Fetcher)
overlaps, err := fetchOverlaps(ctx, ctx.Fetcher)
if err != nil {
return errors.Wrap(err, "fetch overlaps")
}
Expand All @@ -57,7 +54,7 @@ func (DuplicatedCompactionBlocks) VerifyRepair(ctx context.Context, conf Config,

// Loop over duplicates sets.
for _, d := range dups {
level.Warn(conf.Logger).Log("msg", "found duplicated blocks", "group", k, "range-min", r.Min, "range-max", r.Max, "kill", sprintMetas(d[1:]))
level.Warn(ctx.Logger).Log("msg", "found duplicated blocks", "group", k, "range-min", r.Min, "range-max", r.Max, "kill", sprintMetas(d[1:]))

for _, m := range d[1:] {
if _, ok := toKillLookup[m.ULID]; ok {
Expand All @@ -70,25 +67,25 @@ func (DuplicatedCompactionBlocks) VerifyRepair(ctx context.Context, conf Config,
}

if len(dups) == 0 {
level.Warn(conf.Logger).Log("msg", "found overlapped blocks, but all of the blocks are unique. Seems like unrelated issue. Ignoring overlap", "group", k,
level.Warn(ctx.Logger).Log("msg", "found overlapped blocks, but all of the blocks are unique. Seems like unrelated issue. Ignoring overlap", "group", k,
"range", fmt.Sprintf("%v", r), "overlap", sprintMetas(blocks))
}
}
}

level.Warn(conf.Logger).Log("msg", "Found duplicated blocks that are ok to be removed", "ULIDs", fmt.Sprintf("%v", toKill), "num", len(toKill))
level.Warn(ctx.Logger).Log("msg", "Found duplicated blocks that are ok to be removed", "ULIDs", fmt.Sprintf("%v", toKill), "num", len(toKill))
if !repair {
return nil
}

for i, id := range toKill {
if err := BackupAndDelete(ctx, conf, id); err != nil {
if err := BackupAndDelete(ctx, id); err != nil {
return err
}
level.Info(conf.Logger).Log("msg", "Removed duplicated block", "id", id, "to-be-removed", len(toKill)-(i+1), "removed", i+1)
level.Info(ctx.Logger).Log("msg", "Removed duplicated block", "id", id, "to-be-removed", len(toKill)-(i+1), "removed", i+1)
}

level.Info(conf.Logger).Log("msg", "Removed all duplicated blocks. You might want to rerun this verify to check if there is still any unrelated overlap")
level.Info(ctx.Logger).Log("msg", "Removed all duplicated blocks. You might want to rerun this verify to check if there is still any unrelated overlap")
return nil
}

Expand Down
Loading

0 comments on commit 93c3985

Please sign in to comment.