diff --git a/src/v/storage/spill_key_index.cc b/src/v/storage/spill_key_index.cc index 0feaf59bfdbf6..a492676e0d9d5 100644 --- a/src/v/storage/spill_key_index.cc +++ b/src/v/storage/spill_key_index.cc @@ -88,14 +88,34 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { auto const key_size = b.size(); auto const expected_size = idx_mem_usage() + _keys_mem_usage + key_size; - // TODO call into storage_resources + auto take_result = _resources.compaction_index_take_bytes(key_size); + if (_mem_units.count() == 0) { + _mem_units = std::move(take_result.units); + } else { + _mem_units.adopt(std::move(take_result.units)); + } + + // Don't spill unless we're at least this big. Prevents a situation + // where some other index has used up the memory allowance, and we + // would end up spilling on every key. + const size_t min_index_size = std::min(32_KiB, _max_mem); - if (expected_size >= _max_mem) { + if ( + (take_result.checkpoint_hint && expected_size > min_index_size) + || expected_size >= _max_mem) { f = ss::do_until( - [this, key_size] { - // stop condition - return _midx.empty() - || idx_mem_usage() + _keys_mem_usage + key_size < _max_mem; + [this, key_size, min_index_size] { + size_t total_mem = idx_mem_usage() + _keys_mem_usage + key_size; + + // Instance-local capacity check + bool local_ok = total_mem < _max_mem; + + // Shard-wide capacity check + bool global_ok = _resources.compaction_index_bytes_available() + || total_mem < min_index_size; + + // Stop condition: none of our size thresholds must be violated + return _midx.empty() || (local_ok && global_ok); }, [this] { /** @@ -110,6 +130,7 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { node.mapped(), [this](const bytes& k, value_type o) { _keys_mem_usage -= k.size(); + _mem_units.return_units(k.size()); return spill(compacted_index::entry_type::key, k, o); }); }); @@ -118,6 +139,10 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { return f.then([this, b = std::move(b), v]() mutable { // convert iobuf to key _keys_mem_usage += b.size(); + + // No update to _mem_units here: we already took units at top + // of add_key before starting the write. + _midx.insert({std::move(b), v}); }); } @@ -217,6 +242,7 @@ ss::future<> spill_key_index::drain_all_keys() { [this] { auto node = _midx.extract(_midx.begin()); _keys_mem_usage -= node.key().size(); + _mem_units.return_units(node.key().size()); return ss::do_with( node.key(), node.mapped(), [this](const bytes& k, value_type o) { return spill(compacted_index::entry_type::key, k, o); diff --git a/src/v/storage/spill_key_index.h b/src/v/storage/spill_key_index.h index 20f64a213a45c..2c086f82f33d7 100644 --- a/src/v/storage/spill_key_index.h +++ b/src/v/storage/spill_key_index.h @@ -100,7 +100,15 @@ class spill_key_index final : public compacted_index_writer::impl { bool _truncate; std::optional _appender; underlying_t _midx; + + // Max memory we'll use for _midx, although we may spill earlier + // if hinted to by storage_resources size_t _max_mem{512_KiB}; + + // Units handed out by storage_resources to track our consumption + // of the per-shard compaction index memory allowance. + ss::semaphore_units<> _mem_units; + size_t _keys_mem_usage{0}; compacted_index::footer _footer; crc::crc32c _crc;