diff --git a/src/v/bytes/bytes.h b/src/v/bytes/bytes.h index f8d87b2e3e9d..7c62dcddbfff 100644 --- a/src/v/bytes/bytes.h +++ b/src/v/bytes/bytes.h @@ -22,12 +22,12 @@ #include // cannot be a `std::byte` because that's not sizeof(char) +constexpr size_t bytes_inline_size = 31; using bytes = ss::basic_sstring< uint8_t, // Must be different from char to not leak to std::string_view uint32_t, // size type - 4 bytes - 4GB max - don't use a size_t or any 64-bit - 31, // NOLINT(cppcoreguidelines-avoid-magic-numbers,readability-magic-numbers) - // short string optimization size - false // not null terminated + bytes_inline_size, // short string optimization size + false // not null terminated >; using bytes_view = std::basic_string_view; diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 6b727e3cc21e..c33e05e18217 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -756,6 +756,16 @@ configuration::configuration() .visibility = visibility::tunable}, 1024, {.min = 128}) + , storage_compaction_index_memory( + *this, + "storage_compaction_index_memory", + "Maximum number of bytes that may be used on each shard by compaction" + "index writers", + {.needs_restart = needs_restart::no, + .example = "1073741824", + .visibility = visibility::tunable}, + 128_MiB, + {.min = 16_MiB, .max = 100_GiB}) , max_compacted_log_segment_size( *this, "max_compacted_log_segment_size", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index a8a717bd66ee..e7151b492a2c 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -167,6 +167,7 @@ struct configuration final : public config_store { property segment_fallocation_step; bounded_property storage_target_replay_bytes; bounded_property storage_max_concurrent_replay; + bounded_property storage_compaction_index_memory; property max_compacted_log_segment_size; property id_allocator_log_capacity; property id_allocator_batch_size; diff --git a/src/v/storage/spill_key_index.cc b/src/v/storage/spill_key_index.cc index 0feaf59bfdbf..02e9b6baa0f7 100644 --- a/src/v/storage/spill_key_index.cc +++ b/src/v/storage/spill_key_index.cc @@ -85,17 +85,37 @@ ss::future<> spill_key_index::index( ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { auto f = ss::now(); - auto const key_size = b.size(); - auto const expected_size = idx_mem_usage() + _keys_mem_usage + key_size; + auto const entry_size = entry_mem_usage(b); + auto const expected_size = idx_mem_usage() + _keys_mem_usage + entry_size; + + auto take_result = _resources.compaction_index_take_bytes(entry_size); + if (_mem_units.count() == 0) { + _mem_units = std::move(take_result.units); + } else { + _mem_units.adopt(std::move(take_result.units)); + } - // TODO call into storage_resources + // Don't spill unless we're at least this big. Prevents a situation + // where some other index has used up the memory allowance, and we + // would end up spilling on every key. + const size_t min_index_size = std::min(32_KiB, _max_mem); - if (expected_size >= _max_mem) { + if ( + (take_result.checkpoint_hint && expected_size > min_index_size) + || expected_size >= _max_mem) { f = ss::do_until( - [this, key_size] { - // stop condition - return _midx.empty() - || idx_mem_usage() + _keys_mem_usage + key_size < _max_mem; + [this, entry_size, min_index_size] { + size_t total_mem = idx_mem_usage() + _keys_mem_usage + entry_size; + + // Instance-local capacity check + bool local_ok = total_mem < _max_mem; + + // Shard-wide capacity check + bool global_ok = _resources.compaction_index_bytes_available() + || total_mem < min_index_size; + + // Stop condition: none of our size thresholds must be violated + return _midx.empty() || (local_ok && global_ok); }, [this] { /** @@ -109,15 +129,19 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { node.key(), node.mapped(), [this](const bytes& k, value_type o) { - _keys_mem_usage -= k.size(); + release_entry_memory(k); return spill(compacted_index::entry_type::key, k, o); }); }); } - return f.then([this, b = std::move(b), v]() mutable { + return f.then([this, entry_size, b = std::move(b), v]() mutable { // convert iobuf to key - _keys_mem_usage += b.size(); + _keys_mem_usage += entry_size; + + // No update to _mem_units here: we already took units at top + // of add_key before starting the write. + _midx.insert({std::move(b), v}); }); } @@ -216,7 +240,7 @@ ss::future<> spill_key_index::drain_all_keys() { }, [this] { auto node = _midx.extract(_midx.begin()); - _keys_mem_usage -= node.key().size(); + release_entry_memory(node.key()); return ss::do_with( node.key(), node.mapped(), [this](const bytes& k, value_type o) { return spill(compacted_index::entry_type::key, k, o); diff --git a/src/v/storage/spill_key_index.h b/src/v/storage/spill_key_index.h index 20f64a213a45..637724530921 100644 --- a/src/v/storage/spill_key_index.h +++ b/src/v/storage/spill_key_index.h @@ -15,6 +15,7 @@ #include "hashing/xx.h" #include "model/fundamental.h" #include "model/record_batch_types.h" +#include "ssx/semaphore.h" #include "storage/compacted_index.h" #include "storage/compacted_index_writer.h" #include "storage/segment_appender.h" @@ -90,6 +91,25 @@ class spill_key_index final : public compacted_index_writer::impl { HashtableDebugAccess; return debug::AllocatedByteSize(_midx); } + + size_t entry_mem_usage(const bytes& k) const { + // One entry in a node hash map: key and value + // are allocated together, and the key is a basic_sstring with + // internal buffer that may be spilled if key was longer. + auto is_external = k.size() > bytes_inline_size; + return (is_external ? sizeof(k) + k.size() : sizeof(k)) + value_sz; + } + + void release_entry_memory(const bytes& k) { + auto entry_memory = entry_mem_usage(k); + _keys_mem_usage -= entry_memory; + + // Handle the case of a double-release, in case this comes up + // during retries/exception handling. + auto release_units = std::min(entry_memory, _mem_units.count()); + _mem_units.return_units(release_units); + } + ss::future<> drain_all_keys(); ss::future<> add_key(compaction_key, value_type); ss::future<> spill(compacted_index::entry_type, bytes_view, value_type); @@ -100,7 +120,15 @@ class spill_key_index final : public compacted_index_writer::impl { bool _truncate; std::optional _appender; underlying_t _midx; + + // Max memory we'll use for _midx, although we may spill earlier + // if hinted to by storage_resources size_t _max_mem{512_KiB}; + + // Units handed out by storage_resources to track our consumption + // of the per-shard compaction index memory allowance. + ssx::semaphore_units _mem_units; + size_t _keys_mem_usage{0}; compacted_index::footer _footer; crc::crc32c _crc; diff --git a/src/v/storage/storage_resources.cc b/src/v/storage/storage_resources.cc index 264f5b4414d6..83b177165986 100644 --- a/src/v/storage/storage_resources.cc +++ b/src/v/storage/storage_resources.cc @@ -20,14 +20,17 @@ namespace storage { storage_resources::storage_resources( config::binding falloc_step, config::binding target_replay_bytes, - config::binding max_concurrent_replay) + config::binding max_concurrent_replay, + config::binding compaction_index_memory) : _segment_fallocation_step(falloc_step) , _target_replay_bytes(target_replay_bytes) , _max_concurrent_replay(max_concurrent_replay) + , _compaction_index_mem_limit(compaction_index_memory) , _append_chunk_size(config::shard_local_cfg().append_chunk_size()) , _offset_translator_dirty_bytes(_target_replay_bytes() / ss::smp::count) , _configuration_manager_dirty_bytes(_target_replay_bytes() / ss::smp::count) , _stm_dirty_bytes(_target_replay_bytes() / ss::smp::count) + , _compaction_index_bytes(_compaction_index_mem_limit()) , _inflight_recovery( std::max(_max_concurrent_replay() / ss::smp::count, uint64_t{1})) , _inflight_close_flush( @@ -51,6 +54,10 @@ storage_resources::storage_resources( _inflight_recovery.set_capacity(v); _inflight_close_flush.set_capacity(v); }); + + _compaction_index_mem_limit.watch([this] { + _compaction_index_bytes.set_capacity(_compaction_index_mem_limit()); + }); } // Unit test convenience for tests that want to control the falloc step @@ -59,17 +66,15 @@ storage_resources::storage_resources(config::binding falloc_step) : storage_resources( std::move(falloc_step), config::shard_local_cfg().storage_target_replay_bytes.bind(), - config::shard_local_cfg().storage_max_concurrent_replay.bind() - - ) {} + config::shard_local_cfg().storage_max_concurrent_replay.bind(), + config::shard_local_cfg().storage_compaction_index_memory.bind()) {} storage_resources::storage_resources() : storage_resources( config::shard_local_cfg().segment_fallocation_step.bind(), config::shard_local_cfg().storage_target_replay_bytes.bind(), - config::shard_local_cfg().storage_max_concurrent_replay.bind() - - ) {} + config::shard_local_cfg().storage_max_concurrent_replay.bind(), + config::shard_local_cfg().storage_compaction_index_memory.bind()) {} void storage_resources::update_allowance(uint64_t total, uint64_t free) { // TODO: also take as an input the disk consumption of the SI cache: @@ -86,6 +91,11 @@ void storage_resources::update_allowance(uint64_t total, uint64_t free) { _falloc_step = calc_falloc_step(); } +void storage_resources::update_partition_count(size_t partition_count) { + _partition_count = partition_count; + _falloc_step_dirty = true; +} + size_t storage_resources::calc_falloc_step() { // Heuristic: use at most half the available disk space for per-allocating // space to write into. @@ -189,4 +199,15 @@ storage_resources::stm_take_bytes(size_t bytes) { return _stm_dirty_bytes.take(bytes); } +adjustable_allowance::take_result +storage_resources::compaction_index_take_bytes(size_t bytes) { + vlog( + stlog.trace, + "compaction_index_take_bytes {} (current {})", + bytes, + _compaction_index_bytes.current()); + + return _compaction_index_bytes.take(bytes); +} + } // namespace storage diff --git a/src/v/storage/storage_resources.h b/src/v/storage/storage_resources.h index 34cb2d35420e..1eb566a788c8 100644 --- a/src/v/storage/storage_resources.h +++ b/src/v/storage/storage_resources.h @@ -113,6 +113,7 @@ class storage_resources { storage_resources( config::binding, config::binding, + config::binding, config::binding); storage_resources(const storage_resources&) = delete; @@ -124,10 +125,7 @@ class storage_resources { /** * Call this when topics_table gets updated */ - void update_partition_count(size_t partition_count) { - _partition_count = partition_count; - _falloc_step_dirty = true; - } + void update_partition_count(size_t partition_count); uint64_t get_space_allowance() { return _space_allowance; } @@ -142,6 +140,11 @@ class storage_resources { adjustable_allowance::take_result stm_take_bytes(size_t bytes); + adjustable_allowance::take_result compaction_index_take_bytes(size_t bytes); + bool compaction_index_bytes_available() { + return _compaction_index_bytes.current() > 0; + } + ss::future get_recovery_units() { return _inflight_recovery.get_units(1); } @@ -158,6 +161,7 @@ class storage_resources { config::binding _segment_fallocation_step; config::binding _target_replay_bytes; config::binding _max_concurrent_replay; + config::binding _compaction_index_mem_limit; size_t _append_chunk_size; size_t _falloc_step{0}; @@ -180,6 +184,10 @@ class storage_resources { // we ask them to start snapshotting their state machines? adjustable_allowance _stm_dirty_bytes{0}; + // How much memory may all compacted partitions on this shard + // use for their spill_key_index objects + adjustable_allowance _compaction_index_bytes{0}; + // How many logs may be recovered (via log_manager::manage) // concurrently? adjustable_allowance _inflight_recovery{0};