-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Compactor: Add tracing support #4903
Changes from 21 commits
9d3b32f
aa0ede1
2f342ac
47c3171
e1dd35a
031aeaf
8104e73
e329f85
ffd55fd
480ffe7
a3a8f6d
9305a89
2f50d22
6d4f0de
7a5795e
ae6b6d5
3783abd
f12a18e
7620389
a5ef615
ced45c4
9296528
01812fc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -17,6 +17,7 @@ import ( | |||||
"github.com/go-kit/log" | ||||||
"github.com/go-kit/log/level" | ||||||
"github.com/oklog/ulid" | ||||||
"github.com/opentracing/opentracing-go" | ||||||
"github.com/pkg/errors" | ||||||
"github.com/prometheus/client_golang/prometheus" | ||||||
"github.com/prometheus/client_golang/prometheus/promauto" | ||||||
|
@@ -31,6 +32,7 @@ import ( | |||||
"github.com/thanos-io/thanos/pkg/extprom" | ||||||
"github.com/thanos-io/thanos/pkg/objstore" | ||||||
"github.com/thanos-io/thanos/pkg/runutil" | ||||||
"github.com/thanos-io/thanos/pkg/tracing" | ||||||
) | ||||||
|
||||||
type ResolutionLevel int64 | ||||||
|
@@ -764,7 +766,11 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp | |||||
return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") | ||||||
} | ||||||
|
||||||
shouldRerun, compID, err := cg.compact(ctx, subDir, planner, comp) | ||||||
var err error | ||||||
tracing.DoInSpanWithErr(ctx, "group_compaction", func(ctx context.Context) error { | ||||||
shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp) | ||||||
return err | ||||||
}, opentracing.Tags{"group.key": cg.Key()}) | ||||||
if err != nil { | ||||||
cg.compactionFailures.Inc() | ||||||
return false, ulid.ULID{}, err | ||||||
|
@@ -980,7 +986,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp | |||||
overlappingBlocks = true | ||||||
} | ||||||
|
||||||
toCompact, err := planner.Plan(ctx, cg.metasByMinTime) | ||||||
var toCompact []*metadata.Meta | ||||||
tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) error { | ||||||
toCompact, err = planner.Plan(ctx, cg.metasByMinTime) | ||||||
return err | ||||||
}) | ||||||
if err != nil { | ||||||
return false, ulid.ULID{}, errors.Wrap(err, "plan compaction") | ||||||
} | ||||||
|
@@ -1008,12 +1018,21 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp | |||||
uniqueSources[s] = struct{}{} | ||||||
} | ||||||
|
||||||
if err := block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir); err != nil { | ||||||
tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { | ||||||
err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir) | ||||||
return err | ||||||
}, opentracing.Tags{"block.id": meta.ULID}) | ||||||
|
||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tiny nit
Suggested change
|
||||||
if err != nil { | ||||||
return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", meta.ULID)) | ||||||
} | ||||||
|
||||||
// Ensure all input blocks are valid. | ||||||
stats, err := block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) | ||||||
var stats block.HealthStats | ||||||
tracing.DoInSpanWithErr(ctx, "compaction_block_healthstats", func(ctx context.Context) error { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the Golang name is
Suggested change
|
||||||
stats, err = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) | ||||||
return err | ||||||
}, opentracing.Tags{"block.id": meta.ULID}) | ||||||
if err != nil { | ||||||
return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", bdir) | ||||||
} | ||||||
|
@@ -1039,7 +1058,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp | |||||
level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", fmt.Sprintf("%v", toCompactDirs), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) | ||||||
|
||||||
begin = time.Now() | ||||||
compID, err = comp.Compact(dir, toCompactDirs, nil) | ||||||
tracing.DoInSpanWithErr(ctx, "compaction", func(ctx context.Context) error { | ||||||
compID, err = comp.Compact(dir, toCompactDirs, nil) | ||||||
return err | ||||||
}) | ||||||
if err != nil { | ||||||
return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs)) | ||||||
} | ||||||
|
@@ -1081,7 +1103,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp | |||||
} | ||||||
|
||||||
// Ensure the output block is valid. | ||||||
if err := block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil { | ||||||
tracing.DoInSpanWithErr(ctx, "compaction_verify_index", func(ctx context.Context) error { | ||||||
err = block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime) | ||||||
return err | ||||||
}) | ||||||
if !cg.acceptMalformedIndex && err != nil { | ||||||
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) | ||||||
} | ||||||
|
||||||
|
@@ -1095,7 +1121,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp | |||||
|
||||||
begin = time.Now() | ||||||
|
||||||
if err := block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc); err != nil { | ||||||
tracing.DoInSpanWithErr(ctx, "compaction_block_upload", func(ctx context.Context) error { | ||||||
err = block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc) | ||||||
return err | ||||||
}) | ||||||
if err != nil { | ||||||
return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID)) | ||||||
} | ||||||
level.Info(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) | ||||||
|
@@ -1104,7 +1134,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp | |||||
// into the next planning cycle. | ||||||
// Eventually the block we just uploaded should get synced into the group again (including sync-delay). | ||||||
for _, meta := range toCompact { | ||||||
if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil { | ||||||
tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error { | ||||||
err = cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())) | ||||||
return err | ||||||
}, opentracing.Tags{"block.id": meta.ULID}) | ||||||
if err != nil { | ||||||
return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) | ||||||
} | ||||||
cg.groupGarbageCollectedBlocks.Inc() | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ import ( | |
"context" | ||
|
||
"github.com/opentracing/opentracing-go" | ||
opentracing_log "github.com/opentracing/opentracing-go/log" | ||
) | ||
|
||
const ( | ||
|
@@ -72,6 +73,16 @@ func StartSpan(ctx context.Context, operationName string, opts ...opentracing.St | |
return span, opentracing.ContextWithSpan(ctx, span) | ||
} | ||
|
||
// DoInSpanWtihErr executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any. | ||
// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification. | ||
// It logs the error inside the new span created, which differentiates it from DoInSpan and DoWithSpan. | ||
func DoInSpanWithErr(ctx context.Context, operationName string, doFn func(context.Context) error, opts ...opentracing.StartSpanOption) { | ||
span, newCtx := StartSpan(ctx, operationName, opts...) | ||
defer span.Finish() | ||
err := doFn(newCtx) | ||
span.LogFields(opentracing_log.Error(err)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point @matej-g. Thanks! |
||
} | ||
|
||
// DoInSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any. | ||
// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification. | ||
func DoInSpan(ctx context.Context, operationName string, doFn func(context.Context), opts ...opentracing.StartSpanOption) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All other spans are named
compaction_<x>
, should we make this follow the same convention?