Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.1.x] storage: make compaction_backlog more efficient for large segment counts #11718

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 57 additions & 27 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,43 @@ disk_log_impl::update_configuration(ntp_config::default_overrides o) {

return ss::now();
}

/// Calculate the compaction backlog of the segments within a particular term
///
/// This is the inner part of compaction_backlog()
int64_t compaction_backlog_term(
std::vector<ss::lw_shared_ptr<segment>> segs, double cf) {
int64_t backlog = 0;

// Only compare each segment to a limited number of other segments, to
// avoid the loop below blowing up in runtime when there are many segments
// in the same term.
static constexpr size_t limit_lookahead = 8;

auto segment_count = segs.size();
if (segment_count <= 1) {
return 0;
}

for (size_t n = 1; n <= segment_count; ++n) {
auto& s = segs[n - 1];
auto sz = s->finished_self_compaction() ? s->size_bytes()
: s->size_bytes() * cf;
for (size_t k = 0; k <= segment_count - n && k < limit_lookahead; ++k) {
if (k == segment_count - 1) {
continue;
}
if (k == 0) {
backlog += static_cast<int64_t>(sz);
} else {
backlog += static_cast<int64_t>(std::pow(cf, k) * sz);
}
}
}

return backlog;
}

/**
* We express compaction backlog as the size of a data that have to be read to
* perform full compaction.
Expand Down Expand Up @@ -1705,11 +1742,19 @@ int64_t disk_log_impl::compaction_backlog() const {
return 0;
}

std::vector<std::vector<ss::lw_shared_ptr<segment>>> segments_per_term;
auto current_term = _segs.front()->offsets().term;
segments_per_term.emplace_back();
auto idx = 0;
auto cf = _compaction_ratio.get();
int64_t backlog = 0;
std::vector<ss::lw_shared_ptr<segment>> segments_this_term;

// Limit how large we will try to allocate the sgements_this_term vector:
// this protects us against corner cases where a term has a really large
// number of segments. Typical compaction use cases will have many fewer
// segments per term than this (because segments are continuously compacted
// away). Corner cases include non-compactible data in a compacted topic,
// or enabling compaction on a previously non-compacted topic.
static constexpr size_t limit_segments_this_term = 1024;

for (auto& s : _segs) {
if (!s->finished_self_compaction()) {
backlog += static_cast<int64_t>(s->size_bytes());
Expand All @@ -1720,35 +1765,20 @@ int64_t disk_log_impl::compaction_backlog() const {
}

if (current_term != s->offsets().term) {
++idx;
segments_per_term.emplace_back();
// New term: consume segments from the previous term.
backlog += compaction_backlog_term(
std::move(segments_this_term), cf);
segments_this_term.clear();
}
segments_per_term[idx].push_back(s);
}
auto cf = _compaction_ratio.get();

for (const auto& segs : segments_per_term) {
auto segment_count = segs.size();
if (segment_count == 1) {
continue;
}
for (size_t n = 1; n <= segment_count; ++n) {
auto& s = segs[n - 1];
auto sz = s->finished_self_compaction() ? s->size_bytes()
: s->size_bytes() * cf;
for (size_t k = 0; k <= segment_count - n; ++k) {
if (k == segment_count - 1) {
continue;
}
if (k == 0) {
backlog += static_cast<int64_t>(sz);
} else {
backlog += static_cast<int64_t>(std::pow(cf, k) * sz);
}
}
if (segments_this_term.size() < limit_segments_this_term) {
segments_this_term.push_back(s);
}
}

// Consume segments from last term in the log after falling out of loop
backlog += compaction_backlog_term(std::move(segments_this_term), cf);

return backlog;
}

Expand Down