Skip to content

Commit

Permalink
storage: add compaction index memory semaphore to storage_resources
Browse files Browse the repository at this point in the history
This is an additional bound, on top of the existing _max_mem
in spill_key_index: it will now also avoid using more memory
in total per shard.

This commit uses a static total of 128MB, which enables up to 256
compacted partitions to use the same 512kiB per-partition allowance
that they were using before, then as the partition count gets higher
it starts throttling back, although each partition always gets to
use at least 32kib memory each, to avoid a pathological case where
they spill on every key add.
  • Loading branch information
jcsp committed Aug 5, 2022
1 parent 39c94a3 commit 823d672
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
16 changes: 16 additions & 0 deletions src/v/storage/storage_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ void storage_resources::update_allowance(uint64_t total, uint64_t free) {
_falloc_step = calc_falloc_step();
}

void storage_resources::update_partition_count(size_t partition_count) {
_partition_count = partition_count;
_falloc_step_dirty = true;
}

size_t storage_resources::calc_falloc_step() {
// Heuristic: use at most half the available disk space for per-allocating
// space to write into.
Expand Down Expand Up @@ -189,4 +194,15 @@ storage_resources::stm_take_bytes(size_t bytes) {
return _stm_dirty_bytes.take(bytes);
}

adjustable_allowance::take_result
storage_resources::compaction_index_take_bytes(size_t bytes) {
vlog(
stlog.trace,
"compaction_index_take_bytes {} (current {})",
bytes,
_compaction_index_bytes.current());

return _compaction_index_bytes.take(bytes);
}

} // namespace storage
14 changes: 10 additions & 4 deletions src/v/storage/storage_resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,7 @@ class storage_resources {
/**
* Call this when topics_table gets updated
*/
void update_partition_count(size_t partition_count) {
_partition_count = partition_count;
_falloc_step_dirty = true;
}
void update_partition_count(size_t partition_count);

uint64_t get_space_allowance() { return _space_allowance; }

Expand All @@ -142,6 +139,11 @@ class storage_resources {

adjustable_allowance::take_result stm_take_bytes(size_t bytes);

adjustable_allowance::take_result compaction_index_take_bytes(size_t bytes);
bool compaction_index_bytes_available() {
return _compaction_index_bytes.current() > 0;
}

ss::future<ssx::semaphore_units> get_recovery_units() {
return _inflight_recovery.get_units(1);
}
Expand Down Expand Up @@ -180,6 +182,10 @@ class storage_resources {
// we ask them to start snapshotting their state machines?
adjustable_allowance _stm_dirty_bytes{0};

// How much memory may all compacted partitions on this shard
// use for their spill_key_index objects
adjustable_allowance _compaction_index_bytes{128_MiB};

// How many logs may be recovered (via log_manager::manage)
// concurrently?
adjustable_allowance _inflight_recovery{0};
Expand Down

0 comments on commit 823d672

Please sign in to comment.