From f6871f75279a3f3f69a1a24db32aec02bb3eb48f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 26 Apr 2023 13:54:18 +0300 Subject: [PATCH] compact: atomically replace no compact marked map (#6319) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With lots of blocks it could take some time to fill this no compact marked map hence replace it atomically. I believe this leads to problems in the compaction planner where it picks up no compact marked blocks because meta syncer does synchronizations concurrently. Signed-off-by: Giedrius Statkevičius --- pkg/compact/compact.go | 16 +++++--- pkg/compact/compact_test.go | 78 +++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 6 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index c6c36bf3cc..585d5d6b4d 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -1392,9 +1392,9 @@ func (f *GatherNoCompactionMarkFilter) NoCompactMarkedBlocks() map[ulid.ULID]*me // Filter passes all metas, while gathering no compact markers. func (f *GatherNoCompactionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error { - f.mtx.Lock() - f.noCompactMarkedMap = make(map[ulid.ULID]*metadata.NoCompactMark) - f.mtx.Unlock() + var localNoCompactMapMtx sync.Mutex + + noCompactMarkedMap := make(map[ulid.ULID]*metadata.NoCompactMark) // Make a copy of block IDs to check, in order to avoid concurrency issues // between the scheduler and workers. @@ -1427,9 +1427,9 @@ func (f *GatherNoCompactionMarkFilter) Filter(ctx context.Context, metas map[uli continue } - f.mtx.Lock() - f.noCompactMarkedMap[id] = m - f.mtx.Unlock() + localNoCompactMapMtx.Lock() + noCompactMarkedMap[id] = m + localNoCompactMapMtx.Unlock() synced.WithLabelValues(block.MarkedForNoCompactionMeta).Inc() } @@ -1457,5 +1457,9 @@ func (f *GatherNoCompactionMarkFilter) Filter(ctx context.Context, metas map[uli return errors.Wrap(err, "filter blocks marked for no compaction") } + f.mtx.Lock() + f.noCompactMarkedMap = noCompactMarkedMap + f.mtx.Unlock() + return nil } diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index bde42c1420..0112c0a8e2 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/oklog/run" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -23,6 +24,7 @@ import ( "github.com/thanos-io/objstore" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/errutil" @@ -595,3 +597,79 @@ func TestDownsampleProgressCalculate(t *testing.T) { } } } + +func TestNoMarkFilterAtomic(t *testing.T) { + ctx := context.TODO() + logger := log.NewLogfmtLogger(io.Discard) + + m := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}) + + blocksNum := 200 + bkt := objstore.NewInMemBucket() + + metas := make(map[ulid.ULID]*metadata.Meta, blocksNum) + + noMarkCounter := promauto.NewCounter(prometheus.CounterOpts{ + Name: "coolcounter", + }) + + for i := 0; i < blocksNum; i++ { + var meta metadata.Meta + meta.Version = 1 + meta.ULID = ulid.MustNew(uint64(i), nil) + metas[meta.ULID] = &meta + + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + if i%2 == 0 { + testutil.Ok( + t, + block.MarkForNoCompact(ctx, logger, bkt, meta.ULID, metadata.NoCompactReason("test"), "nodetails", noMarkCounter), + ) + } + } + + slowBucket := objstore.WithNoopInstr(objstore.WithDelay(bkt, time.Millisecond*200)) + f := NewGatherNoCompactionMarkFilter(logger, slowBucket, 10) + + ctx, cancel := context.WithCancel(ctx) + + g := &run.Group{} + + // Fill the map initially. + testutil.Ok(t, f.Filter(ctx, metas, m, nil)) + testutil.Assert(t, len(f.NoCompactMarkedBlocks()) > 0, "expected to always have not compacted blocks") + + g.Add(func() error { + for { + if ctx.Err() != nil { + return nil + } + if err := f.Filter(ctx, metas, m, nil); err != nil && !errors.Is(err, context.Canceled) { + testutil.Ok(t, err) + } + } + }, func(err error) { + cancel() + }) + + g.Add(func() error { + for { + if ctx.Err() != nil { + return nil + } + + if len(f.NoCompactMarkedBlocks()) == 0 { + return fmt.Errorf("expected to always have not compacted blocks") + } + } + }, func(err error) { + cancel() + }) + + time.AfterFunc(10*time.Second, func() { + cancel() + }) + testutil.Ok(t, g.Run()) +}