Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: per-shard limit on memory for spill_key_index #5722

Merged
merged 7 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/v/bytes/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
#include <iosfwd>

// cannot be a `std::byte` because that's not sizeof(char)
constexpr size_t bytes_inline_size = 31;
using bytes = ss::basic_sstring<
uint8_t, // Must be different from char to not leak to std::string_view
uint32_t, // size type - 4 bytes - 4GB max - don't use a size_t or any 64-bit
31, // NOLINT(cppcoreguidelines-avoid-magic-numbers,readability-magic-numbers)
// short string optimization size
false // not null terminated
bytes_inline_size, // short string optimization size
false // not null terminated
>;

using bytes_view = std::basic_string_view<uint8_t>;
Expand Down
10 changes: 10 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,16 @@ configuration::configuration()
.visibility = visibility::tunable},
1024,
{.min = 128})
, storage_compaction_index_memory(
*this,
"storage_compaction_index_memory",
"Maximum number of bytes that may be used on each shard by compaction"
"index writers",
{.needs_restart = needs_restart::no,
.example = "1073741824",
.visibility = visibility::tunable},
128_MiB,
{.min = 16_MiB, .max = 100_GiB})
, max_compacted_log_segment_size(
*this,
"max_compacted_log_segment_size",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ struct configuration final : public config_store {
property<size_t> segment_fallocation_step;
bounded_property<uint64_t> storage_target_replay_bytes;
bounded_property<uint64_t> storage_max_concurrent_replay;
bounded_property<uint64_t> storage_compaction_index_memory;
property<size_t> max_compacted_log_segment_size;
property<int16_t> id_allocator_log_capacity;
property<int16_t> id_allocator_batch_size;
Expand Down
48 changes: 36 additions & 12 deletions src/v/storage/spill_key_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,37 @@ ss::future<> spill_key_index::index(

ss::future<> spill_key_index::add_key(compaction_key b, value_type v) {
auto f = ss::now();
auto const key_size = b.size();
auto const expected_size = idx_mem_usage() + _keys_mem_usage + key_size;
auto const entry_size = entry_mem_usage(b);
auto const expected_size = idx_mem_usage() + _keys_mem_usage + entry_size;

auto take_result = _resources.compaction_index_take_bytes(entry_size);
if (_mem_units.count() == 0) {
_mem_units = std::move(take_result.units);
} else {
_mem_units.adopt(std::move(take_result.units));
}

// TODO call into storage_resources
// Don't spill unless we're at least this big. Prevents a situation
// where some other index has used up the memory allowance, and we
// would end up spilling on every key.
const size_t min_index_size = std::min(32_KiB, _max_mem);
jcsp marked this conversation as resolved.
Show resolved Hide resolved

if (expected_size >= _max_mem) {
if (
(take_result.checkpoint_hint && expected_size > min_index_size)
|| expected_size >= _max_mem) {
f = ss::do_until(
[this, key_size] {
// stop condition
return _midx.empty()
|| idx_mem_usage() + _keys_mem_usage + key_size < _max_mem;
[this, entry_size, min_index_size] {
size_t total_mem = idx_mem_usage() + _keys_mem_usage + entry_size;

// Instance-local capacity check
bool local_ok = total_mem < _max_mem;

// Shard-wide capacity check
bool global_ok = _resources.compaction_index_bytes_available()
|| total_mem < min_index_size;

// Stop condition: none of our size thresholds must be violated
return _midx.empty() || (local_ok && global_ok);
},
[this] {
/**
Expand All @@ -109,15 +129,19 @@ ss::future<> spill_key_index::add_key(compaction_key b, value_type v) {
node.key(),
node.mapped(),
[this](const bytes& k, value_type o) {
_keys_mem_usage -= k.size();
release_entry_memory(k);
return spill(compacted_index::entry_type::key, k, o);
});
});
}

return f.then([this, b = std::move(b), v]() mutable {
return f.then([this, entry_size, b = std::move(b), v]() mutable {
// convert iobuf to key
_keys_mem_usage += b.size();
_keys_mem_usage += entry_size;

// No update to _mem_units here: we already took units at top
// of add_key before starting the write.

_midx.insert({std::move(b), v});
});
}
Expand Down Expand Up @@ -216,7 +240,7 @@ ss::future<> spill_key_index::drain_all_keys() {
},
[this] {
auto node = _midx.extract(_midx.begin());
_keys_mem_usage -= node.key().size();
release_entry_memory(node.key());
return ss::do_with(
node.key(), node.mapped(), [this](const bytes& k, value_type o) {
return spill(compacted_index::entry_type::key, k, o);
Expand Down
28 changes: 28 additions & 0 deletions src/v/storage/spill_key_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "hashing/xx.h"
#include "model/fundamental.h"
#include "model/record_batch_types.h"
#include "ssx/semaphore.h"
#include "storage/compacted_index.h"
#include "storage/compacted_index_writer.h"
#include "storage/segment_appender.h"
Expand Down Expand Up @@ -90,6 +91,25 @@ class spill_key_index final : public compacted_index_writer::impl {
HashtableDebugAccess<underlying_t>;
return debug::AllocatedByteSize(_midx);
}

size_t entry_mem_usage(const bytes& k) const {
// One entry in a node hash map: key and value
// are allocated together, and the key is a basic_sstring with
// internal buffer that may be spilled if key was longer.
auto is_external = k.size() > bytes_inline_size;
return (is_external ? sizeof(k) + k.size() : sizeof(k)) + value_sz;
}

void release_entry_memory(const bytes& k) {
auto entry_memory = entry_mem_usage(k);
_keys_mem_usage -= entry_memory;

// Handle the case of a double-release, in case this comes up
// during retries/exception handling.
auto release_units = std::min(entry_memory, _mem_units.count());
_mem_units.return_units(release_units);
}

ss::future<> drain_all_keys();
ss::future<> add_key(compaction_key, value_type);
ss::future<> spill(compacted_index::entry_type, bytes_view, value_type);
Expand All @@ -100,7 +120,15 @@ class spill_key_index final : public compacted_index_writer::impl {
bool _truncate;
std::optional<segment_appender> _appender;
underlying_t _midx;

// Max memory we'll use for _midx, although we may spill earlier
// if hinted to by storage_resources
size_t _max_mem{512_KiB};

// Units handed out by storage_resources to track our consumption
// of the per-shard compaction index memory allowance.
ssx::semaphore_units _mem_units;

size_t _keys_mem_usage{0};
compacted_index::footer _footer;
crc::crc32c _crc;
Expand Down
35 changes: 28 additions & 7 deletions src/v/storage/storage_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ namespace storage {
storage_resources::storage_resources(
config::binding<size_t> falloc_step,
config::binding<uint64_t> target_replay_bytes,
config::binding<uint64_t> max_concurrent_replay)
config::binding<uint64_t> max_concurrent_replay,
config::binding<uint64_t> compaction_index_memory)
: _segment_fallocation_step(falloc_step)
, _target_replay_bytes(target_replay_bytes)
, _max_concurrent_replay(max_concurrent_replay)
, _compaction_index_mem_limit(compaction_index_memory)
, _append_chunk_size(config::shard_local_cfg().append_chunk_size())
, _offset_translator_dirty_bytes(_target_replay_bytes() / ss::smp::count)
, _configuration_manager_dirty_bytes(_target_replay_bytes() / ss::smp::count)
, _stm_dirty_bytes(_target_replay_bytes() / ss::smp::count)
, _compaction_index_bytes(_compaction_index_mem_limit())
, _inflight_recovery(
std::max(_max_concurrent_replay() / ss::smp::count, uint64_t{1}))
, _inflight_close_flush(
Expand All @@ -51,6 +54,10 @@ storage_resources::storage_resources(
_inflight_recovery.set_capacity(v);
_inflight_close_flush.set_capacity(v);
});

_compaction_index_mem_limit.watch([this] {
_compaction_index_bytes.set_capacity(_compaction_index_mem_limit());
});
}

// Unit test convenience for tests that want to control the falloc step
Expand All @@ -59,17 +66,15 @@ storage_resources::storage_resources(config::binding<size_t> falloc_step)
: storage_resources(
std::move(falloc_step),
config::shard_local_cfg().storage_target_replay_bytes.bind(),
config::shard_local_cfg().storage_max_concurrent_replay.bind()

) {}
config::shard_local_cfg().storage_max_concurrent_replay.bind(),
config::shard_local_cfg().storage_compaction_index_memory.bind()) {}

storage_resources::storage_resources()
: storage_resources(
config::shard_local_cfg().segment_fallocation_step.bind(),
config::shard_local_cfg().storage_target_replay_bytes.bind(),
config::shard_local_cfg().storage_max_concurrent_replay.bind()

) {}
config::shard_local_cfg().storage_max_concurrent_replay.bind(),
config::shard_local_cfg().storage_compaction_index_memory.bind()) {}

void storage_resources::update_allowance(uint64_t total, uint64_t free) {
// TODO: also take as an input the disk consumption of the SI cache:
Expand All @@ -86,6 +91,11 @@ void storage_resources::update_allowance(uint64_t total, uint64_t free) {
_falloc_step = calc_falloc_step();
}

void storage_resources::update_partition_count(size_t partition_count) {
_partition_count = partition_count;
_falloc_step_dirty = true;
}

size_t storage_resources::calc_falloc_step() {
// Heuristic: use at most half the available disk space for per-allocating
// space to write into.
Expand Down Expand Up @@ -189,4 +199,15 @@ storage_resources::stm_take_bytes(size_t bytes) {
return _stm_dirty_bytes.take(bytes);
}

adjustable_allowance::take_result
storage_resources::compaction_index_take_bytes(size_t bytes) {
vlog(
stlog.trace,
"compaction_index_take_bytes {} (current {})",
bytes,
_compaction_index_bytes.current());

return _compaction_index_bytes.take(bytes);
}

} // namespace storage
16 changes: 12 additions & 4 deletions src/v/storage/storage_resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class storage_resources {
storage_resources(
config::binding<size_t>,
config::binding<uint64_t>,
config::binding<uint64_t>,
config::binding<uint64_t>);
storage_resources(const storage_resources&) = delete;

Expand All @@ -124,10 +125,7 @@ class storage_resources {
/**
* Call this when topics_table gets updated
*/
void update_partition_count(size_t partition_count) {
_partition_count = partition_count;
_falloc_step_dirty = true;
}
void update_partition_count(size_t partition_count);

uint64_t get_space_allowance() { return _space_allowance; }

Expand All @@ -142,6 +140,11 @@ class storage_resources {

adjustable_allowance::take_result stm_take_bytes(size_t bytes);

adjustable_allowance::take_result compaction_index_take_bytes(size_t bytes);
bool compaction_index_bytes_available() {
return _compaction_index_bytes.current() > 0;
}

ss::future<ssx::semaphore_units> get_recovery_units() {
return _inflight_recovery.get_units(1);
}
Expand All @@ -158,6 +161,7 @@ class storage_resources {
config::binding<size_t> _segment_fallocation_step;
config::binding<uint64_t> _target_replay_bytes;
config::binding<uint64_t> _max_concurrent_replay;
config::binding<uint64_t> _compaction_index_mem_limit;
size_t _append_chunk_size;

size_t _falloc_step{0};
Expand All @@ -180,6 +184,10 @@ class storage_resources {
// we ask them to start snapshotting their state machines?
adjustable_allowance _stm_dirty_bytes{0};

// How much memory may all compacted partitions on this shard
// use for their spill_key_index objects
adjustable_allowance _compaction_index_bytes{0};

// How many logs may be recovered (via log_manager::manage)
// concurrently?
adjustable_allowance _inflight_recovery{0};
Expand Down