From 823d672abf03745b708af392edef19329ad2a85b Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 29 Jul 2022 08:58:05 +0100 Subject: [PATCH 1/7] storage: add compaction index memory semaphore to storage_resources This is an additional bound, on top of the existing _max_mem in spill_key_index: it will now also avoid using more memory in total per shard. This commit uses a static total of 128MB, which enables up to 256 compacted partitions to use the same 512kiB per-partition allowance that they were using before, then as the partition count gets higher it starts throttling back, although each partition always gets to use at least 32kib memory each, to avoid a pathological case where they spill on every key add. --- src/v/storage/storage_resources.cc | 16 ++++++++++++++++ src/v/storage/storage_resources.h | 14 ++++++++++---- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/v/storage/storage_resources.cc b/src/v/storage/storage_resources.cc index 264f5b4414d6..b7cef6b5eca4 100644 --- a/src/v/storage/storage_resources.cc +++ b/src/v/storage/storage_resources.cc @@ -86,6 +86,11 @@ void storage_resources::update_allowance(uint64_t total, uint64_t free) { _falloc_step = calc_falloc_step(); } +void storage_resources::update_partition_count(size_t partition_count) { + _partition_count = partition_count; + _falloc_step_dirty = true; +} + size_t storage_resources::calc_falloc_step() { // Heuristic: use at most half the available disk space for per-allocating // space to write into. @@ -189,4 +194,15 @@ storage_resources::stm_take_bytes(size_t bytes) { return _stm_dirty_bytes.take(bytes); } +adjustable_allowance::take_result +storage_resources::compaction_index_take_bytes(size_t bytes) { + vlog( + stlog.trace, + "compaction_index_take_bytes {} (current {})", + bytes, + _compaction_index_bytes.current()); + + return _compaction_index_bytes.take(bytes); +} + } // namespace storage diff --git a/src/v/storage/storage_resources.h b/src/v/storage/storage_resources.h index 34cb2d35420e..a187f39b85ab 100644 --- a/src/v/storage/storage_resources.h +++ b/src/v/storage/storage_resources.h @@ -124,10 +124,7 @@ class storage_resources { /** * Call this when topics_table gets updated */ - void update_partition_count(size_t partition_count) { - _partition_count = partition_count; - _falloc_step_dirty = true; - } + void update_partition_count(size_t partition_count); uint64_t get_space_allowance() { return _space_allowance; } @@ -142,6 +139,11 @@ class storage_resources { adjustable_allowance::take_result stm_take_bytes(size_t bytes); + adjustable_allowance::take_result compaction_index_take_bytes(size_t bytes); + bool compaction_index_bytes_available() { + return _compaction_index_bytes.current() > 0; + } + ss::future get_recovery_units() { return _inflight_recovery.get_units(1); } @@ -180,6 +182,10 @@ class storage_resources { // we ask them to start snapshotting their state machines? adjustable_allowance _stm_dirty_bytes{0}; + // How much memory may all compacted partitions on this shard + // use for their spill_key_index objects + adjustable_allowance _compaction_index_bytes{128_MiB}; + // How many logs may be recovered (via log_manager::manage) // concurrently? adjustable_allowance _inflight_recovery{0}; From a60f1f2ee666ebc474fffb5b78f8dd892710e679 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 29 Jul 2022 09:03:31 +0100 Subject: [PATCH 2/7] storage: respect shard-wide memory limit in spill_key_index Fixes https://github.com/redpanda-data/redpanda/issues/4645 --- src/v/storage/spill_key_index.cc | 38 +++++++++++++++++++++++++++----- src/v/storage/spill_key_index.h | 9 ++++++++ 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/src/v/storage/spill_key_index.cc b/src/v/storage/spill_key_index.cc index 0feaf59bfdbf..a492676e0d9d 100644 --- a/src/v/storage/spill_key_index.cc +++ b/src/v/storage/spill_key_index.cc @@ -88,14 +88,34 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { auto const key_size = b.size(); auto const expected_size = idx_mem_usage() + _keys_mem_usage + key_size; - // TODO call into storage_resources + auto take_result = _resources.compaction_index_take_bytes(key_size); + if (_mem_units.count() == 0) { + _mem_units = std::move(take_result.units); + } else { + _mem_units.adopt(std::move(take_result.units)); + } + + // Don't spill unless we're at least this big. Prevents a situation + // where some other index has used up the memory allowance, and we + // would end up spilling on every key. + const size_t min_index_size = std::min(32_KiB, _max_mem); - if (expected_size >= _max_mem) { + if ( + (take_result.checkpoint_hint && expected_size > min_index_size) + || expected_size >= _max_mem) { f = ss::do_until( - [this, key_size] { - // stop condition - return _midx.empty() - || idx_mem_usage() + _keys_mem_usage + key_size < _max_mem; + [this, key_size, min_index_size] { + size_t total_mem = idx_mem_usage() + _keys_mem_usage + key_size; + + // Instance-local capacity check + bool local_ok = total_mem < _max_mem; + + // Shard-wide capacity check + bool global_ok = _resources.compaction_index_bytes_available() + || total_mem < min_index_size; + + // Stop condition: none of our size thresholds must be violated + return _midx.empty() || (local_ok && global_ok); }, [this] { /** @@ -110,6 +130,7 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { node.mapped(), [this](const bytes& k, value_type o) { _keys_mem_usage -= k.size(); + _mem_units.return_units(k.size()); return spill(compacted_index::entry_type::key, k, o); }); }); @@ -118,6 +139,10 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { return f.then([this, b = std::move(b), v]() mutable { // convert iobuf to key _keys_mem_usage += b.size(); + + // No update to _mem_units here: we already took units at top + // of add_key before starting the write. + _midx.insert({std::move(b), v}); }); } @@ -217,6 +242,7 @@ ss::future<> spill_key_index::drain_all_keys() { [this] { auto node = _midx.extract(_midx.begin()); _keys_mem_usage -= node.key().size(); + _mem_units.return_units(node.key().size()); return ss::do_with( node.key(), node.mapped(), [this](const bytes& k, value_type o) { return spill(compacted_index::entry_type::key, k, o); diff --git a/src/v/storage/spill_key_index.h b/src/v/storage/spill_key_index.h index 20f64a213a45..076c4c1dde29 100644 --- a/src/v/storage/spill_key_index.h +++ b/src/v/storage/spill_key_index.h @@ -15,6 +15,7 @@ #include "hashing/xx.h" #include "model/fundamental.h" #include "model/record_batch_types.h" +#include "ssx/semaphore.h" #include "storage/compacted_index.h" #include "storage/compacted_index_writer.h" #include "storage/segment_appender.h" @@ -100,7 +101,15 @@ class spill_key_index final : public compacted_index_writer::impl { bool _truncate; std::optional _appender; underlying_t _midx; + + // Max memory we'll use for _midx, although we may spill earlier + // if hinted to by storage_resources size_t _max_mem{512_KiB}; + + // Units handed out by storage_resources to track our consumption + // of the per-shard compaction index memory allowance. + ssx::semaphore_units _mem_units; + size_t _keys_mem_usage{0}; compacted_index::footer _footer; crc::crc32c _crc; From 3244040ae18daa7f1ed1e54663b7abb735ba19da Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 29 Jul 2022 14:40:50 +0100 Subject: [PATCH 3/7] config: add storage_compaction_index_memory This property controls the new bound on per-shard memory used for compaction indices at scale. --- src/v/config/configuration.cc | 10 ++++++++++ src/v/config/configuration.h | 1 + 2 files changed, 11 insertions(+) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 6b727e3cc21e..c33e05e18217 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -756,6 +756,16 @@ configuration::configuration() .visibility = visibility::tunable}, 1024, {.min = 128}) + , storage_compaction_index_memory( + *this, + "storage_compaction_index_memory", + "Maximum number of bytes that may be used on each shard by compaction" + "index writers", + {.needs_restart = needs_restart::no, + .example = "1073741824", + .visibility = visibility::tunable}, + 128_MiB, + {.min = 16_MiB, .max = 100_GiB}) , max_compacted_log_segment_size( *this, "max_compacted_log_segment_size", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index a8a717bd66ee..e7151b492a2c 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -167,6 +167,7 @@ struct configuration final : public config_store { property segment_fallocation_step; bounded_property storage_target_replay_bytes; bounded_property storage_max_concurrent_replay; + bounded_property storage_compaction_index_memory; property max_compacted_log_segment_size; property id_allocator_log_capacity; property id_allocator_batch_size; From 53fcd5de5bf8c15f9d892623842b04776e7b823f Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 29 Jul 2022 14:41:49 +0100 Subject: [PATCH 4/7] storage: make compaction memory limit configurable This will probably be rarely changed in practice, but it mitigates the risk that we have people using compaction at high scale and experiencing performance issues from index spills happening more often than they hoped. --- src/v/storage/storage_resources.cc | 19 ++++++++++++------- src/v/storage/storage_resources.h | 4 +++- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/v/storage/storage_resources.cc b/src/v/storage/storage_resources.cc index b7cef6b5eca4..83b177165986 100644 --- a/src/v/storage/storage_resources.cc +++ b/src/v/storage/storage_resources.cc @@ -20,14 +20,17 @@ namespace storage { storage_resources::storage_resources( config::binding falloc_step, config::binding target_replay_bytes, - config::binding max_concurrent_replay) + config::binding max_concurrent_replay, + config::binding compaction_index_memory) : _segment_fallocation_step(falloc_step) , _target_replay_bytes(target_replay_bytes) , _max_concurrent_replay(max_concurrent_replay) + , _compaction_index_mem_limit(compaction_index_memory) , _append_chunk_size(config::shard_local_cfg().append_chunk_size()) , _offset_translator_dirty_bytes(_target_replay_bytes() / ss::smp::count) , _configuration_manager_dirty_bytes(_target_replay_bytes() / ss::smp::count) , _stm_dirty_bytes(_target_replay_bytes() / ss::smp::count) + , _compaction_index_bytes(_compaction_index_mem_limit()) , _inflight_recovery( std::max(_max_concurrent_replay() / ss::smp::count, uint64_t{1})) , _inflight_close_flush( @@ -51,6 +54,10 @@ storage_resources::storage_resources( _inflight_recovery.set_capacity(v); _inflight_close_flush.set_capacity(v); }); + + _compaction_index_mem_limit.watch([this] { + _compaction_index_bytes.set_capacity(_compaction_index_mem_limit()); + }); } // Unit test convenience for tests that want to control the falloc step @@ -59,17 +66,15 @@ storage_resources::storage_resources(config::binding falloc_step) : storage_resources( std::move(falloc_step), config::shard_local_cfg().storage_target_replay_bytes.bind(), - config::shard_local_cfg().storage_max_concurrent_replay.bind() - - ) {} + config::shard_local_cfg().storage_max_concurrent_replay.bind(), + config::shard_local_cfg().storage_compaction_index_memory.bind()) {} storage_resources::storage_resources() : storage_resources( config::shard_local_cfg().segment_fallocation_step.bind(), config::shard_local_cfg().storage_target_replay_bytes.bind(), - config::shard_local_cfg().storage_max_concurrent_replay.bind() - - ) {} + config::shard_local_cfg().storage_max_concurrent_replay.bind(), + config::shard_local_cfg().storage_compaction_index_memory.bind()) {} void storage_resources::update_allowance(uint64_t total, uint64_t free) { // TODO: also take as an input the disk consumption of the SI cache: diff --git a/src/v/storage/storage_resources.h b/src/v/storage/storage_resources.h index a187f39b85ab..1eb566a788c8 100644 --- a/src/v/storage/storage_resources.h +++ b/src/v/storage/storage_resources.h @@ -113,6 +113,7 @@ class storage_resources { storage_resources( config::binding, config::binding, + config::binding, config::binding); storage_resources(const storage_resources&) = delete; @@ -160,6 +161,7 @@ class storage_resources { config::binding _segment_fallocation_step; config::binding _target_replay_bytes; config::binding _max_concurrent_replay; + config::binding _compaction_index_mem_limit; size_t _append_chunk_size; size_t _falloc_step{0}; @@ -184,7 +186,7 @@ class storage_resources { // How much memory may all compacted partitions on this shard // use for their spill_key_index objects - adjustable_allowance _compaction_index_bytes{128_MiB}; + adjustable_allowance _compaction_index_bytes{0}; // How many logs may be recovered (via log_manager::manage) // concurrently? From c1b1e9c46203d8ddca88d43f5927270146a0e613 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 2 Aug 2022 15:22:40 +0100 Subject: [PATCH 5/7] bytes: publicly expose the inline buffer size of `bytes` This enables other code to accurately account for memory use. --- src/v/bytes/bytes.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/v/bytes/bytes.h b/src/v/bytes/bytes.h index f8d87b2e3e9d..7c62dcddbfff 100644 --- a/src/v/bytes/bytes.h +++ b/src/v/bytes/bytes.h @@ -22,12 +22,12 @@ #include // cannot be a `std::byte` because that's not sizeof(char) +constexpr size_t bytes_inline_size = 31; using bytes = ss::basic_sstring< uint8_t, // Must be different from char to not leak to std::string_view uint32_t, // size type - 4 bytes - 4GB max - don't use a size_t or any 64-bit - 31, // NOLINT(cppcoreguidelines-avoid-magic-numbers,readability-magic-numbers) - // short string optimization size - false // not null terminated + bytes_inline_size, // short string optimization size + false // not null terminated >; using bytes_view = std::basic_string_view; From 87437ff5796b995be2a5640de35c63537aa7e9f0 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 2 Aug 2022 15:24:58 +0100 Subject: [PATCH 6/7] storage: more accurate accounting in spill_key_index Previously, we ignored: - inline buffer in `bytes` which affects the actual memory utilization - the map value. Actual memory footprint still depends on how these sizes get rounded up to allocator boundaries, but this is an improvement. --- src/v/storage/spill_key_index.cc | 19 ++++++++++--------- src/v/storage/spill_key_index.h | 9 +++++++++ 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/v/storage/spill_key_index.cc b/src/v/storage/spill_key_index.cc index a492676e0d9d..0406c22e83f0 100644 --- a/src/v/storage/spill_key_index.cc +++ b/src/v/storage/spill_key_index.cc @@ -85,10 +85,10 @@ ss::future<> spill_key_index::index( ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { auto f = ss::now(); - auto const key_size = b.size(); - auto const expected_size = idx_mem_usage() + _keys_mem_usage + key_size; + auto const entry_size = entry_mem_usage(b); + auto const expected_size = idx_mem_usage() + _keys_mem_usage + entry_size; - auto take_result = _resources.compaction_index_take_bytes(key_size); + auto take_result = _resources.compaction_index_take_bytes(entry_size); if (_mem_units.count() == 0) { _mem_units = std::move(take_result.units); } else { @@ -104,8 +104,8 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { (take_result.checkpoint_hint && expected_size > min_index_size) || expected_size >= _max_mem) { f = ss::do_until( - [this, key_size, min_index_size] { - size_t total_mem = idx_mem_usage() + _keys_mem_usage + key_size; + [this, entry_size, min_index_size] { + size_t total_mem = idx_mem_usage() + _keys_mem_usage + entry_size; // Instance-local capacity check bool local_ok = total_mem < _max_mem; @@ -136,9 +136,9 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { }); } - return f.then([this, b = std::move(b), v]() mutable { + return f.then([this, entry_size, b = std::move(b), v]() mutable { // convert iobuf to key - _keys_mem_usage += b.size(); + _keys_mem_usage += entry_size; // No update to _mem_units here: we already took units at top // of add_key before starting the write. @@ -241,8 +241,9 @@ ss::future<> spill_key_index::drain_all_keys() { }, [this] { auto node = _midx.extract(_midx.begin()); - _keys_mem_usage -= node.key().size(); - _mem_units.return_units(node.key().size()); + auto mem_usage = entry_mem_usage(node.key()); + _keys_mem_usage -= mem_usage; + _mem_units.return_units(mem_usage); return ss::do_with( node.key(), node.mapped(), [this](const bytes& k, value_type o) { return spill(compacted_index::entry_type::key, k, o); diff --git a/src/v/storage/spill_key_index.h b/src/v/storage/spill_key_index.h index 076c4c1dde29..9c3ca1514e1f 100644 --- a/src/v/storage/spill_key_index.h +++ b/src/v/storage/spill_key_index.h @@ -91,6 +91,15 @@ class spill_key_index final : public compacted_index_writer::impl { HashtableDebugAccess; return debug::AllocatedByteSize(_midx); } + + size_t entry_mem_usage(const compaction_key& k) const { + // One entry in a node hash map: key and value + // are allocated together, and the key is a basic_sstring with + // internal buffer that may be spilled if key was longer. + auto is_external = k.size() > bytes_inline_size; + return (is_external ? sizeof(k) + k.size() : sizeof(k)) + value_sz; + } + ss::future<> drain_all_keys(); ss::future<> add_key(compaction_key, value_type); ss::future<> spill(compacted_index::entry_type, bytes_view, value_type); From 5481557d8792155b8b42024d7c04be21264ef599 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 2 Aug 2022 15:55:36 +0100 Subject: [PATCH 7/7] storage: make memory units release safer While in general the accounting is robust, this is a little fragile in error paths (or when the code is changed in future), as it is an exception-generating condition to release more units from a sem_units than you took: a double release could perhaps occur if we released before spill(), then there was an exception in spill() that caused us to iterate. --- src/v/storage/spill_key_index.cc | 7 ++----- src/v/storage/spill_key_index.h | 12 +++++++++++- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/v/storage/spill_key_index.cc b/src/v/storage/spill_key_index.cc index 0406c22e83f0..02e9b6baa0f7 100644 --- a/src/v/storage/spill_key_index.cc +++ b/src/v/storage/spill_key_index.cc @@ -129,8 +129,7 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) { node.key(), node.mapped(), [this](const bytes& k, value_type o) { - _keys_mem_usage -= k.size(); - _mem_units.return_units(k.size()); + release_entry_memory(k); return spill(compacted_index::entry_type::key, k, o); }); }); @@ -241,9 +240,7 @@ ss::future<> spill_key_index::drain_all_keys() { }, [this] { auto node = _midx.extract(_midx.begin()); - auto mem_usage = entry_mem_usage(node.key()); - _keys_mem_usage -= mem_usage; - _mem_units.return_units(mem_usage); + release_entry_memory(node.key()); return ss::do_with( node.key(), node.mapped(), [this](const bytes& k, value_type o) { return spill(compacted_index::entry_type::key, k, o); diff --git a/src/v/storage/spill_key_index.h b/src/v/storage/spill_key_index.h index 9c3ca1514e1f..637724530921 100644 --- a/src/v/storage/spill_key_index.h +++ b/src/v/storage/spill_key_index.h @@ -92,7 +92,7 @@ class spill_key_index final : public compacted_index_writer::impl { return debug::AllocatedByteSize(_midx); } - size_t entry_mem_usage(const compaction_key& k) const { + size_t entry_mem_usage(const bytes& k) const { // One entry in a node hash map: key and value // are allocated together, and the key is a basic_sstring with // internal buffer that may be spilled if key was longer. @@ -100,6 +100,16 @@ class spill_key_index final : public compacted_index_writer::impl { return (is_external ? sizeof(k) + k.size() : sizeof(k)) + value_sz; } + void release_entry_memory(const bytes& k) { + auto entry_memory = entry_mem_usage(k); + _keys_mem_usage -= entry_memory; + + // Handle the case of a double-release, in case this comes up + // during retries/exception handling. + auto release_units = std::min(entry_memory, _mem_units.count()); + _mem_units.return_units(release_units); + } + ss::future<> drain_all_keys(); ss::future<> add_key(compaction_key, value_type); ss::future<> spill(compacted_index::entry_type, bytes_view, value_type);