Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compactor: Add tracing support #4903

Merged
merged 23 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9d3b32f
first draft for tracing
metonymic-smokey Nov 18, 2021
aa0ede1
add tracer to context
metonymic-smokey Nov 19, 2021
2f342ac
minor fixes
metonymic-smokey Nov 19, 2021
47c3171
create common block spans; log block errors
metonymic-smokey Nov 20, 2021
e1dd35a
reverted to generic block spans
metonymic-smokey Nov 22, 2021
031aeaf
Merge branch 'thanos-io:main' into tracing
metonymic-smokey Nov 22, 2021
8104e73
changed block ID tag; add span for block delete
metonymic-smokey Nov 22, 2021
e329f85
minor fix
metonymic-smokey Nov 22, 2021
ffd55fd
removed extra spans
metonymic-smokey Nov 22, 2021
480ffe7
Merge branch 'main' into tracing
metonymic-smokey Nov 24, 2021
a3a8f6d
addressed some review comments
metonymic-smokey Nov 24, 2021
9305a89
removed block spans
metonymic-smokey Nov 24, 2021
2f50d22
changed group key
metonymic-smokey Nov 25, 2021
6d4f0de
removed extra var declarations
metonymic-smokey Nov 25, 2021
7a5795e
Merge branch 'thanos-io:main' into tracing
metonymic-smokey Nov 25, 2021
ae6b6d5
removed comments
metonymic-smokey Nov 25, 2021
3783abd
added changelog entry
metonymic-smokey Nov 26, 2021
f12a18e
update healthcheck to healthstats.
metonymic-smokey Nov 27, 2021
7620389
Merge branch 'thanos-io:main' into tracing
metonymic-smokey Nov 29, 2021
a5ef615
draft: logging errors
metonymic-smokey Nov 29, 2021
ced45c4
removed extra line
metonymic-smokey Nov 30, 2021
9296528
used alternate function to log errors
metonymic-smokey Nov 30, 2021
01812fc
minor nits
metonymic-smokey Dec 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4874](https://github.com/thanos-io/thanos/pull/4874) Query: Add `--endpoint-strict` flag to statically configure Thanos API server endpoints. It is similar to `--store-strict` but supports passing any Thanos gRPC APIs: StoreAPI, MetadataAPI, RulesAPI, TargetsAPI and ExemplarsAPI.
- [#4868](https://github.com/thanos-io/thanos/pull/4868) Rule: Support ruleGroup limit introduced by Prometheus v2.31.0.
- [#4897](https://github.com/thanos-io/thanos/pull/4897) Querier: Add validation for querier address flags.
- [#4903](https://github.com/thanos-io/thanos/pull/4903) Compactor: Added tracing support for compaction.

### Fixed

Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/ui"
)

Expand Down Expand Up @@ -300,6 +301,7 @@ func runCompact(
}

ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)

defer func() {
if rerr != nil {
Expand Down
49 changes: 41 additions & 8 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/tracing"
)

type ResolutionLevel int64
Expand Down Expand Up @@ -764,7 +766,11 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp
return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir")
}

shouldRerun, compID, err := cg.compact(ctx, subDir, planner, comp)
var err error
tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) error {
shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp)
return err
}, opentracing.Tags{"group.key": cg.Key()})
if err != nil {
cg.compactionFailures.Inc()
return false, ulid.ULID{}, err
Expand Down Expand Up @@ -980,7 +986,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
overlappingBlocks = true
}

toCompact, err := planner.Plan(ctx, cg.metasByMinTime)
var toCompact []*metadata.Meta
tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) error {
toCompact, err = planner.Plan(ctx, cg.metasByMinTime)
return err
})
if err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "plan compaction")
}
Expand Down Expand Up @@ -1008,12 +1018,20 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
uniqueSources[s] = struct{}{}
}

if err := block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir); err != nil {
tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir)
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", meta.ULID))
}

// Ensure all input blocks are valid.
stats, err := block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
var stats block.HealthStats
tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) error {
stats, err = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", bdir)
}
Expand All @@ -1039,7 +1057,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", fmt.Sprintf("%v", toCompactDirs), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())

begin = time.Now()
compID, err = comp.Compact(dir, toCompactDirs, nil)
tracing.DoInSpanWithErr(ctx, "compaction", func(ctx context.Context) error {
compID, err = comp.Compact(dir, toCompactDirs, nil)
return err
})
if err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs))
}
Expand Down Expand Up @@ -1081,7 +1102,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
}

// Ensure the output block is valid.
if err := block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil {
tracing.DoInSpanWithErr(ctx, "compaction_verify_index", func(ctx context.Context) error {
err = block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime)
return err
})
if !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

Expand All @@ -1095,7 +1120,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp

begin = time.Now()

if err := block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc); err != nil {
tracing.DoInSpanWithErr(ctx, "compaction_block_upload", func(ctx context.Context) error {
err = block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc)
return err
})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID))
}
level.Info(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
Expand All @@ -1104,7 +1133,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
// into the next planning cycle.
// Eventually the block we just uploaded should get synced into the group again (including sync-delay).
for _, meta := range toCompact {
if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil {
tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error {
err = cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()))
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket"))
}
cg.groupGarbageCollectedBlocks.Inc()
Expand Down
13 changes: 13 additions & 0 deletions pkg/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)

const (
Expand Down Expand Up @@ -72,6 +73,18 @@ func StartSpan(ctx context.Context, operationName string, opts ...opentracing.St
return span, opentracing.ContextWithSpan(ctx, span)
}

// DoInSpanWtihErr executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any.
// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification.
// It logs the error inside the new span created, which differentiates it from DoInSpan and DoWithSpan.
func DoInSpanWithErr(ctx context.Context, operationName string, doFn func(context.Context) error, opts ...opentracing.StartSpanOption) {
span, newCtx := StartSpan(ctx, operationName, opts...)
defer span.Finish()
err := doFn(newCtx)
if err != nil {
ext.LogError(span, err)
}
}

// DoInSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any.
// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification.
func DoInSpan(ctx context.Context, operationName string, doFn func(context.Context), opts ...opentracing.StartSpanOption) {
Expand Down