From abf7ed51df7a95d1912289ea0c94fe9561b28a2a Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 29 Jul 2022 09:03:31 +0100 Subject: [PATCH] storage: respect shard-wide memory limit in spill_key_index Fixes https://github.com/redpanda-data/redpanda/issues/4645 --- src/v/storage/spill_key_index.cc | 37 ++++++++++++++++++++++++++------ src/v/storage/spill_key_index.h | 8 +++++++ 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/src/v/storage/spill_key_index.cc b/src/v/storage/spill_key_index.cc index 0feaf59bfdbf6..f6b517deeb0e1 100644 --- a/src/v/storage/spill_key_index.cc +++ b/src/v/storage/spill_key_index.cc @@ -88,14 +88,34 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { auto const key_size = b.size(); auto const expected_size = idx_mem_usage() + _keys_mem_usage + key_size; - // TODO call into storage_resources + auto take_result = _resources.compaction_index_take_bytes(key_size); + if (_mem_units.count() == 0) { + _mem_units = std::move(take_result.units); + } else { + _mem_units.adopt(std::move(take_result.units)); + } + + // Don't spill unless we're at least this big. Prevents a situation + // where some other index has used up the memory allowance, and we + // would end up spilling on every key. + const size_t min_index_size = std::min(32_KiB, _max_mem); - if (expected_size >= _max_mem) { + if ( + (take_result.checkpoint_hint && expected_size > min_index_size) + || expected_size >= _max_mem) { f = ss::do_until( - [this, key_size] { - // stop condition - return _midx.empty() - || idx_mem_usage() + _keys_mem_usage + key_size < _max_mem; + [this, key_size, min_index_size] { + size_t total_mem = idx_mem_usage() + _keys_mem_usage + key_size; + + // Instance-local capacity check + bool local_ok = total_mem < _max_mem; + + // Shard-wide capacity check + bool global_ok = _resources.compaction_index_bytes_available() + || total_mem < min_index_size; + + // Stop condition: none of our size thresholds must be violated + return _midx.empty() || (local_ok && global_ok); }, [this] { /** @@ -110,6 +130,7 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { node.mapped(), [this](const bytes& k, value_type o) { _keys_mem_usage -= k.size(); + _mem_units.return_units(k.size()); return spill(compacted_index::entry_type::key, k, o); }); }); @@ -118,6 +139,10 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { return f.then([this, b = std::move(b), v]() mutable { // convert iobuf to key _keys_mem_usage += b.size(); + + // No update to _mem_units here: we already took units at top + // of add_key before starting the write. + _midx.insert({std::move(b), v}); }); } diff --git a/src/v/storage/spill_key_index.h b/src/v/storage/spill_key_index.h index 20f64a213a45c..2c086f82f33d7 100644 --- a/src/v/storage/spill_key_index.h +++ b/src/v/storage/spill_key_index.h @@ -100,7 +100,15 @@ class spill_key_index final : public compacted_index_writer::impl { bool _truncate; std::optional _appender; underlying_t _midx; + + // Max memory we'll use for _midx, although we may spill earlier + // if hinted to by storage_resources size_t _max_mem{512_KiB}; + + // Units handed out by storage_resources to track our consumption + // of the per-shard compaction index memory allowance. + ss::semaphore_units<> _mem_units; + size_t _keys_mem_usage{0}; compacted_index::footer _footer; crc::crc32c _crc;