Skip to content

Commit

Permalink
Merge pull request #5722 from jcsp/issue-4645-spill-key-index
Browse files Browse the repository at this point in the history
storage: per-shard limit on memory for spill_key_index
  • Loading branch information
dotnwat committed Aug 5, 2022
2 parents 36a1945 + 5481557 commit d3bb917
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 26 deletions.
6 changes: 3 additions & 3 deletions src/v/bytes/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
#include <iosfwd>

// cannot be a `std::byte` because that's not sizeof(char)
constexpr size_t bytes_inline_size = 31;
using bytes = ss::basic_sstring<
uint8_t, // Must be different from char to not leak to std::string_view
uint32_t, // size type - 4 bytes - 4GB max - don't use a size_t or any 64-bit
31, // NOLINT(cppcoreguidelines-avoid-magic-numbers,readability-magic-numbers)
// short string optimization size
false // not null terminated
bytes_inline_size, // short string optimization size
false // not null terminated
>;

using bytes_view = std::basic_string_view<uint8_t>;
Expand Down
10 changes: 10 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,16 @@ configuration::configuration()
.visibility = visibility::tunable},
1024,
{.min = 128})
, storage_compaction_index_memory(
*this,
"storage_compaction_index_memory",
"Maximum number of bytes that may be used on each shard by compaction"
"index writers",
{.needs_restart = needs_restart::no,
.example = "1073741824",
.visibility = visibility::tunable},
128_MiB,
{.min = 16_MiB, .max = 100_GiB})
, max_compacted_log_segment_size(
*this,
"max_compacted_log_segment_size",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ struct configuration final : public config_store {
property<size_t> segment_fallocation_step;
bounded_property<uint64_t> storage_target_replay_bytes;
bounded_property<uint64_t> storage_max_concurrent_replay;
bounded_property<uint64_t> storage_compaction_index_memory;
property<size_t> max_compacted_log_segment_size;
property<int16_t> id_allocator_log_capacity;
property<int16_t> id_allocator_batch_size;
Expand Down
48 changes: 36 additions & 12 deletions src/v/storage/spill_key_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,37 @@ ss::future<> spill_key_index::index(

ss::future<> spill_key_index::add_key(compaction_key b, value_type v) {
auto f = ss::now();
auto const key_size = b.size();
auto const expected_size = idx_mem_usage() + _keys_mem_usage + key_size;
auto const entry_size = entry_mem_usage(b);
auto const expected_size = idx_mem_usage() + _keys_mem_usage + entry_size;

auto take_result = _resources.compaction_index_take_bytes(entry_size);
if (_mem_units.count() == 0) {
_mem_units = std::move(take_result.units);
} else {
_mem_units.adopt(std::move(take_result.units));
}

// TODO call into storage_resources
// Don't spill unless we're at least this big. Prevents a situation
// where some other index has used up the memory allowance, and we
// would end up spilling on every key.
const size_t min_index_size = std::min(32_KiB, _max_mem);

if (expected_size >= _max_mem) {
if (
(take_result.checkpoint_hint && expected_size > min_index_size)
|| expected_size >= _max_mem) {
f = ss::do_until(
[this, key_size] {
// stop condition
return _midx.empty()
|| idx_mem_usage() + _keys_mem_usage + key_size < _max_mem;
[this, entry_size, min_index_size] {
size_t total_mem = idx_mem_usage() + _keys_mem_usage + entry_size;

// Instance-local capacity check
bool local_ok = total_mem < _max_mem;

// Shard-wide capacity check
bool global_ok = _resources.compaction_index_bytes_available()
|| total_mem < min_index_size;

// Stop condition: none of our size thresholds must be violated
return _midx.empty() || (local_ok && global_ok);
},
[this] {
/**
Expand All @@ -109,15 +129,19 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) {
node.key(),
node.mapped(),
[this](const bytes& k, value_type o) {
_keys_mem_usage -= k.size();
release_entry_memory(k);
return spill(compacted_index::entry_type::key, k, o);
});
});
}

return f.then([this, b = std::move(b), v]() mutable {
return f.then([this, entry_size, b = std::move(b), v]() mutable {
// convert iobuf to key
_keys_mem_usage += b.size();
_keys_mem_usage += entry_size;

// No update to _mem_units here: we already took units at top
// of add_key before starting the write.

_midx.insert({std::move(b), v});
});
}
Expand Down Expand Up @@ -216,7 +240,7 @@ ss::future<> spill_key_index::drain_all_keys() {
},
[this] {
auto node = _midx.extract(_midx.begin());
_keys_mem_usage -= node.key().size();
release_entry_memory(node.key());
return ss::do_with(
node.key(), node.mapped(), [this](const bytes& k, value_type o) {
return spill(compacted_index::entry_type::key, k, o);
Expand Down
28 changes: 28 additions & 0 deletions src/v/storage/spill_key_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "hashing/xx.h"
#include "model/fundamental.h"
#include "model/record_batch_types.h"
#include "ssx/semaphore.h"
#include "storage/compacted_index.h"
#include "storage/compacted_index_writer.h"
#include "storage/segment_appender.h"
Expand Down Expand Up @@ -90,6 +91,25 @@ class spill_key_index final : public compacted_index_writer::impl {
HashtableDebugAccess<underlying_t>;
return debug::AllocatedByteSize(_midx);
}

size_t entry_mem_usage(const bytes& k) const {
// One entry in a node hash map: key and value
// are allocated together, and the key is a basic_sstring with
// internal buffer that may be spilled if key was longer.
auto is_external = k.size() > bytes_inline_size;
return (is_external ? sizeof(k) + k.size() : sizeof(k)) + value_sz;
}

void release_entry_memory(const bytes& k) {
auto entry_memory = entry_mem_usage(k);
_keys_mem_usage -= entry_memory;

// Handle the case of a double-release, in case this comes up
// during retries/exception handling.
auto release_units = std::min(entry_memory, _mem_units.count());
_mem_units.return_units(release_units);
}

ss::future<> drain_all_keys();
ss::future<> add_key(compaction_key, value_type);
ss::future<> spill(compacted_index::entry_type, bytes_view, value_type);
Expand All @@ -100,7 +120,15 @@ class spill_key_index final : public compacted_index_writer::impl {
bool _truncate;
std::optional<segment_appender> _appender;
underlying_t _midx;

// Max memory we'll use for _midx, although we may spill earlier
// if hinted to by storage_resources
size_t _max_mem{512_KiB};

// Units handed out by storage_resources to track our consumption
// of the per-shard compaction index memory allowance.
ssx::semaphore_units _mem_units;

size_t _keys_mem_usage{0};
compacted_index::footer _footer;
crc::crc32c _crc;
Expand Down
35 changes: 28 additions & 7 deletions src/v/storage/storage_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ namespace storage {
storage_resources::storage_resources(
config::binding<size_t> falloc_step,
config::binding<uint64_t> target_replay_bytes,
config::binding<uint64_t> max_concurrent_replay)
config::binding<uint64_t> max_concurrent_replay,
config::binding<uint64_t> compaction_index_memory)
: _segment_fallocation_step(falloc_step)
, _target_replay_bytes(target_replay_bytes)
, _max_concurrent_replay(max_concurrent_replay)
, _compaction_index_mem_limit(compaction_index_memory)
, _append_chunk_size(config::shard_local_cfg().append_chunk_size())
, _offset_translator_dirty_bytes(_target_replay_bytes() / ss::smp::count)
, _configuration_manager_dirty_bytes(_target_replay_bytes() / ss::smp::count)
, _stm_dirty_bytes(_target_replay_bytes() / ss::smp::count)
, _compaction_index_bytes(_compaction_index_mem_limit())
, _inflight_recovery(
std::max(_max_concurrent_replay() / ss::smp::count, uint64_t{1}))
, _inflight_close_flush(
Expand All @@ -51,6 +54,10 @@ storage_resources::storage_resources(
_inflight_recovery.set_capacity(v);
_inflight_close_flush.set_capacity(v);
});

_compaction_index_mem_limit.watch([this] {
_compaction_index_bytes.set_capacity(_compaction_index_mem_limit());
});
}

// Unit test convenience for tests that want to control the falloc step
Expand All @@ -59,17 +66,15 @@ storage_resources::storage_resources(config::binding<size_t> falloc_step)
: storage_resources(
std::move(falloc_step),
config::shard_local_cfg().storage_target_replay_bytes.bind(),
config::shard_local_cfg().storage_max_concurrent_replay.bind()

) {}
config::shard_local_cfg().storage_max_concurrent_replay.bind(),
config::shard_local_cfg().storage_compaction_index_memory.bind()) {}

storage_resources::storage_resources()
: storage_resources(
config::shard_local_cfg().segment_fallocation_step.bind(),
config::shard_local_cfg().storage_target_replay_bytes.bind(),
config::shard_local_cfg().storage_max_concurrent_replay.bind()

) {}
config::shard_local_cfg().storage_max_concurrent_replay.bind(),
config::shard_local_cfg().storage_compaction_index_memory.bind()) {}

void storage_resources::update_allowance(uint64_t total, uint64_t free) {
// TODO: also take as an input the disk consumption of the SI cache:
Expand All @@ -86,6 +91,11 @@ void storage_resources::update_allowance(uint64_t total, uint64_t free) {
_falloc_step = calc_falloc_step();
}

void storage_resources::update_partition_count(size_t partition_count) {
_partition_count = partition_count;
_falloc_step_dirty = true;
}

size_t storage_resources::calc_falloc_step() {
// Heuristic: use at most half the available disk space for per-allocating
// space to write into.
Expand Down Expand Up @@ -189,4 +199,15 @@ storage_resources::stm_take_bytes(size_t bytes) {
return _stm_dirty_bytes.take(bytes);
}

adjustable_allowance::take_result
storage_resources::compaction_index_take_bytes(size_t bytes) {
vlog(
stlog.trace,
"compaction_index_take_bytes {} (current {})",
bytes,
_compaction_index_bytes.current());

return _compaction_index_bytes.take(bytes);
}

} // namespace storage
16 changes: 12 additions & 4 deletions src/v/storage/storage_resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class storage_resources {
storage_resources(
config::binding<size_t>,
config::binding<uint64_t>,
config::binding<uint64_t>,
config::binding<uint64_t>);
storage_resources(const storage_resources&) = delete;

Expand All @@ -124,10 +125,7 @@ class storage_resources {
/**
* Call this when topics_table gets updated
*/
void update_partition_count(size_t partition_count) {
_partition_count = partition_count;
_falloc_step_dirty = true;
}
void update_partition_count(size_t partition_count);

uint64_t get_space_allowance() { return _space_allowance; }

Expand All @@ -142,6 +140,11 @@ class storage_resources {

adjustable_allowance::take_result stm_take_bytes(size_t bytes);

adjustable_allowance::take_result compaction_index_take_bytes(size_t bytes);
bool compaction_index_bytes_available() {
return _compaction_index_bytes.current() > 0;
}

ss::future<ssx::semaphore_units> get_recovery_units() {
return _inflight_recovery.get_units(1);
}
Expand All @@ -158,6 +161,7 @@ class storage_resources {
config::binding<size_t> _segment_fallocation_step;
config::binding<uint64_t> _target_replay_bytes;
config::binding<uint64_t> _max_concurrent_replay;
config::binding<uint64_t> _compaction_index_mem_limit;
size_t _append_chunk_size;

size_t _falloc_step{0};
Expand All @@ -180,6 +184,10 @@ class storage_resources {
// we ask them to start snapshotting their state machines?
adjustable_allowance _stm_dirty_bytes{0};

// How much memory may all compacted partitions on this shard
// use for their spill_key_index objects
adjustable_allowance _compaction_index_bytes{0};

// How many logs may be recovered (via log_manager::manage)
// concurrently?
adjustable_allowance _inflight_recovery{0};
Expand Down

0 comments on commit d3bb917

Please sign in to comment.