Skip to content

Commit

Permalink
storage: make memory units release safer
Browse files Browse the repository at this point in the history
While in general the accounting is robust, this is a little
fragile in error paths (or when the code is changed in future),
as it is an exception-generating condition to release more
units from a sem_units than you took: a double release
could perhaps occur if we released before spill(), then
there was an exception in spill() that caused us to iterate.
  • Loading branch information
jcsp committed Aug 3, 2022
1 parent ef1a647 commit c69d869
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
7 changes: 2 additions & 5 deletions src/v/storage/spill_key_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) {
node.key(),
node.mapped(),
[this](const bytes& k, value_type o) {
_keys_mem_usage -= k.size();
_mem_units.return_units(k.size());
release_entry_memory(k);
return spill(compacted_index::entry_type::key, k, o);
});
});
Expand Down Expand Up @@ -241,9 +240,7 @@ ss::future<> spill_key_index::drain_all_keys() {
},
[this] {
auto node = _midx.extract(_midx.begin());
auto mem_usage = entry_mem_usage(node.key());
_keys_mem_usage -= mem_usage;
_mem_units.return_units(mem_usage);
release_entry_memory(node.key());
return ss::do_with(
node.key(), node.mapped(), [this](const bytes& k, value_type o) {
return spill(compacted_index::entry_type::key, k, o);
Expand Down
12 changes: 11 additions & 1 deletion src/v/storage/spill_key_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,24 @@ class spill_key_index final : public compacted_index_writer::impl {
return debug::AllocatedByteSize(_midx);
}

size_t entry_mem_usage(const compaction_key& k) const {
size_t entry_mem_usage(const bytes& k) const {
// One entry in a node hash map: key and value
// are allocated together, and the key is a basic_sstring with
// internal buffer that may be spilled if key was longer.
auto is_external = k.size() > bytes_inline_size;
return (is_external ? sizeof(k) + k.size() : sizeof(k)) + value_sz;
}

void release_entry_memory(const bytes& k) {
auto entry_memory = entry_mem_usage(k);
_keys_mem_usage -= entry_memory;

// Handle the case of a double-release, in case this comes up
// during retries/exception handling.
auto release_units = std::min(entry_memory, _mem_units.count());
_mem_units.return_units(release_units);
}

ss::future<> drain_all_keys();
ss::future<> add_key(compaction_key, value_type);
ss::future<> spill(compacted_index::entry_type, bytes_view, value_type);
Expand Down

0 comments on commit c69d869

Please sign in to comment.