Skip to content

Commit

Permalink
Compact: Make GatherNoCompactionMarkFilter.NoCompactMarkedBlocks conc…
Browse files Browse the repository at this point in the history
…urrency safe (#5736)

This fixes a crash due to concurrent access.

    concurrent map read and map write

fixes #5735

Signed-off-by: Igor Wiedler <iwiedler@gitlab.com>

Signed-off-by: Igor Wiedler <iwiedler@gitlab.com>
  • Loading branch information
igorwwwwwwwwwwwwwwwwwwww committed Sep 28, 2022
1 parent 5e84ea7 commit e414597
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5642](https://github.com/thanos-io/thanos/pull/5642) Receive: Log labels correctly in writer debug messages.
- [#5655](https://github.com/thanos-io/thanos/pull/5655) Receive: Fix recreating already pruned tenants.
- [#5702](https://github.com/thanos-io/thanos/pull/5702) Store: Upgrade minio-go/v7 to fix panic caused by leaked goroutines.
- [#5736](https://github.com/thanos-io/thanos/pull/5736) Compact: Fix crash in GatherNoCompactionMarkFilter.NoCompactMarkedBlocks.

### Added
* [#5654](https://github.com/thanos-io/thanos/pull/5654) Query: add `--grpc-compression` flag that controls the compression used in gRPC client. With the flag it is now possible to compress the traffic between Query and StoreAPI nodes - you get lower network usage in exchange for a bit higher CPU/RAM usage.
Expand Down
21 changes: 15 additions & 6 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,7 @@ type GatherNoCompactionMarkFilter struct {
bkt objstore.InstrumentedBucketReader
noCompactMarkedMap map[ulid.ULID]*metadata.NoCompactMark
concurrency int
mtx sync.Mutex
}

// NewGatherNoCompactionMarkFilter creates GatherNoCompactionMarkFilter.
Expand All @@ -1374,12 +1375,21 @@ func NewGatherNoCompactionMarkFilter(logger log.Logger, bkt objstore.Instrumente

// NoCompactMarkedBlocks returns block ids that were marked for no compaction.
func (f *GatherNoCompactionMarkFilter) NoCompactMarkedBlocks() map[ulid.ULID]*metadata.NoCompactMark {
return f.noCompactMarkedMap
f.mtx.Lock()
copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(f.noCompactMarkedMap))
for k, v := range f.noCompactMarkedMap {
copiedNoCompactMarked[k] = v
}
f.mtx.Unlock()

return copiedNoCompactMarked
}

// Filter passes all metas, while gathering no compact markers.
func (f *GatherNoCompactionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
f.mtx.Lock()
f.noCompactMarkedMap = make(map[ulid.ULID]*metadata.NoCompactMark)
f.mtx.Unlock()

// Make a copy of block IDs to check, in order to avoid concurrency issues
// between the scheduler and workers.
Expand All @@ -1389,9 +1399,8 @@ func (f *GatherNoCompactionMarkFilter) Filter(ctx context.Context, metas map[uli
}

var (
eg errgroup.Group
ch = make(chan ulid.ULID, f.concurrency)
mtx sync.Mutex
eg errgroup.Group
ch = make(chan ulid.ULID, f.concurrency)
)

for i := 0; i < f.concurrency; i++ {
Expand All @@ -1413,9 +1422,9 @@ func (f *GatherNoCompactionMarkFilter) Filter(ctx context.Context, metas map[uli
continue
}

mtx.Lock()
f.mtx.Lock()
f.noCompactMarkedMap[id] = m
mtx.Unlock()
f.mtx.Unlock()
synced.WithLabelValues(block.MarkedForNoCompactionMeta).Inc()
}

Expand Down

0 comments on commit e414597

Please sign in to comment.