diff --git a/src/v/storage/storage_resources.cc b/src/v/storage/storage_resources.cc index b7cef6b5eca45..83b177165986d 100644 --- a/src/v/storage/storage_resources.cc +++ b/src/v/storage/storage_resources.cc @@ -20,14 +20,17 @@ namespace storage { storage_resources::storage_resources( config::binding falloc_step, config::binding target_replay_bytes, - config::binding max_concurrent_replay) + config::binding max_concurrent_replay, + config::binding compaction_index_memory) : _segment_fallocation_step(falloc_step) , _target_replay_bytes(target_replay_bytes) , _max_concurrent_replay(max_concurrent_replay) + , _compaction_index_mem_limit(compaction_index_memory) , _append_chunk_size(config::shard_local_cfg().append_chunk_size()) , _offset_translator_dirty_bytes(_target_replay_bytes() / ss::smp::count) , _configuration_manager_dirty_bytes(_target_replay_bytes() / ss::smp::count) , _stm_dirty_bytes(_target_replay_bytes() / ss::smp::count) + , _compaction_index_bytes(_compaction_index_mem_limit()) , _inflight_recovery( std::max(_max_concurrent_replay() / ss::smp::count, uint64_t{1})) , _inflight_close_flush( @@ -51,6 +54,10 @@ storage_resources::storage_resources( _inflight_recovery.set_capacity(v); _inflight_close_flush.set_capacity(v); }); + + _compaction_index_mem_limit.watch([this] { + _compaction_index_bytes.set_capacity(_compaction_index_mem_limit()); + }); } // Unit test convenience for tests that want to control the falloc step @@ -59,17 +66,15 @@ storage_resources::storage_resources(config::binding falloc_step) : storage_resources( std::move(falloc_step), config::shard_local_cfg().storage_target_replay_bytes.bind(), - config::shard_local_cfg().storage_max_concurrent_replay.bind() - - ) {} + config::shard_local_cfg().storage_max_concurrent_replay.bind(), + config::shard_local_cfg().storage_compaction_index_memory.bind()) {} storage_resources::storage_resources() : storage_resources( config::shard_local_cfg().segment_fallocation_step.bind(), config::shard_local_cfg().storage_target_replay_bytes.bind(), - config::shard_local_cfg().storage_max_concurrent_replay.bind() - - ) {} + config::shard_local_cfg().storage_max_concurrent_replay.bind(), + config::shard_local_cfg().storage_compaction_index_memory.bind()) {} void storage_resources::update_allowance(uint64_t total, uint64_t free) { // TODO: also take as an input the disk consumption of the SI cache: diff --git a/src/v/storage/storage_resources.h b/src/v/storage/storage_resources.h index bd074d89acf2b..b9e87b61cca3f 100644 --- a/src/v/storage/storage_resources.h +++ b/src/v/storage/storage_resources.h @@ -112,6 +112,7 @@ class storage_resources { storage_resources( config::binding, config::binding, + config::binding, config::binding); storage_resources(const storage_resources&) = delete; @@ -159,6 +160,7 @@ class storage_resources { config::binding _segment_fallocation_step; config::binding _target_replay_bytes; config::binding _max_concurrent_replay; + config::binding _compaction_index_mem_limit; size_t _append_chunk_size; size_t _falloc_step{0}; @@ -183,7 +185,7 @@ class storage_resources { // How much memory may all compacted partitions on this shard // use for their spill_key_index objects - adjustable_allowance _compaction_index_bytes{128_MiB}; + adjustable_allowance _compaction_index_bytes{0}; // How many logs may be recovered (via log_manager::manage) // concurrently?