Skip to content

Commit

Permalink
[v23.2.x] cloud: Add config options to trigger cache trim
Browse files Browse the repository at this point in the history
When the cloud storage cache is completely filled, puts to the cache
are blocked. This prevents readers from progressing. In some clusters
the trim operation can take a long time, up to hours. When this happens,
readers will time out, generating errors.

This commit adds two configuration options that control when the trim
is triggered. By setting these lower than the caches maximum size,
cloud storage can begin a trim operation before the cache is completely
filled, allowing readers to progress as the trim is going on in the
background.
- cloud_storage_cache_trim_threshold_size
- cloud_storage_cache_trim_threshold_objects

When a put occurs, if the cache is over these limits it triggers a trim
in the background.

(cherry picked from commit 027632c)
  • Loading branch information
jcipar committed Jun 26, 2024
1 parent 7ebdc0a commit 167c443
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 14 deletions.
74 changes: 62 additions & 12 deletions src/v/cloud_storage/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <algorithm>
#include <exception>
#include <filesystem>
#include <optional>
#include <stdexcept>
#include <string_view>

Expand Down Expand Up @@ -134,10 +135,7 @@ void cache::update_max_bytes() {
_max_percent());

if (_current_cache_size > _max_bytes) {
ssx::spawn_with_gate(_gate, [this]() {
return ss::with_semaphore(
_cleanup_sm, 1, [this]() { return trim_throttled(); });
});
ssx::spawn_with_gate(_gate, [this]() { return trim_throttled(); });
}
}

Expand Down Expand Up @@ -263,7 +261,9 @@ std::optional<std::chrono::milliseconds> cache::get_trim_delay() const {
}
}

ss::future<> cache::trim_throttled() {
ss::future<> cache::trim_throttled_unlocked(
std::optional<uint64_t> size_limit_override,
std::optional<size_t> object_limit_override) {
// If we trimmed very recently then do not do it immediately:
// this reduces load and improves chance of currently promoted
// segments finishing their read work before we demote their
Expand All @@ -279,7 +279,15 @@ ss::future<> cache::trim_throttled() {
co_await ss::sleep_abortable(*trim_delay, _as);
}

co_await trim();
co_await trim(size_limit_override, object_limit_override);
}

ss::future<> cache::trim_throttled(
std::optional<uint64_t> size_limit_override,
std::optional<size_t> object_limit_override) {
auto units = co_await ss::get_units(_cleanup_sm, 1);
co_await trim_throttled_unlocked(
size_limit_override, object_limit_override);
}

ss::future<> cache::trim_manually(
Expand Down Expand Up @@ -1033,10 +1041,7 @@ ss::future<> cache::put(
// Trim proactively: if many fibers hit this concurrently,
// they'll contend for cleanup_sm and the losers will skip
// trim due to throttling.
{
auto units = co_await ss::get_units(_cleanup_sm, 1);
co_await trim_throttled();
}
co_await trim_throttled();

throw disk_full_error;
}
Expand Down Expand Up @@ -1213,9 +1218,55 @@ bool cache::may_exceed_limits(uint64_t bytes, size_t objects) {
&& !would_fit_in_cache;
}

void cache::maybe_background_trim() {
auto& trim_threshold_pct_objects
= config::shard_local_cfg()
.cloud_storage_cache_trim_threshold_percent_objects;
auto& trim_threshold_pct_size
= config::shard_local_cfg()
.cloud_storage_cache_trim_threshold_percent_size;
if (
!trim_threshold_pct_size.value().has_value()
&& !trim_threshold_pct_objects.value().has_value()) {
return;
}

uint64_t target_bytes = uint64_t(
_max_bytes * trim_threshold_pct_size.value().value_or(100.0) / 100.0);
uint32_t target_objects = uint32_t(
_max_objects() * trim_threshold_pct_objects.value().value_or(100.0)
/ 100.0);

bool bytes_over_limit = _current_cache_size + _reserved_cache_size
> target_bytes;
bool objects_over_limit = _current_cache_objects + _reserved_cache_objects
> target_objects;

if (bytes_over_limit || objects_over_limit) {
auto units = ss::try_get_units(_cleanup_sm, 1);
if (units.has_value()) {
vlog(cst_log.debug, "Spawning background trim");
ssx::spawn_with_gate(
_gate,
[this,
target_bytes,
target_objects,
u = std::move(units)]() mutable {
return trim_throttled_unlocked(target_bytes, target_objects)
.finally([u = std::move(u)] {});
});
} else {
vlog(
cst_log.debug, "Not spawning background trim: already started");
}
}
}

ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
vassert(ss::this_shard_id() == ss::shard_id{0}, "Only call on shard 0");

maybe_background_trim();

if (may_reserve_space(bytes, objects)) {
// Fast path: space was available.
_reserved_cache_size += bytes;
Expand Down Expand Up @@ -1263,7 +1314,7 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
// After taking lock, there still isn't space: means someone
// else didn't take it and free space for us already, so we will
// do the trim.
co_await trim_throttled();
co_await trim_throttled_unlocked();
did_trim = true;
}

Expand Down Expand Up @@ -1411,5 +1462,4 @@ ss::future<> cache::initialize(std::filesystem::path cache_dir) {
co_await ss::recursive_touch_directory(cache_dir.string());
}
}

} // namespace cloud_storage
14 changes: 13 additions & 1 deletion src/v/cloud_storage/cache_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
#include <seastar/core/gate.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/thread.hh>

#include <filesystem>
#include <iterator>
#include <optional>
#include <set>
#include <string_view>

Expand Down Expand Up @@ -216,7 +219,16 @@ class cache : public ss::peering_sharded_service<cache> {
std::optional<std::chrono::milliseconds> get_trim_delay() const;

/// Invoke trim, waiting if not enough time passed since the last trim
ss::future<> trim_throttled();
ss::future<> trim_throttled_unlocked(
std::optional<uint64_t> size_limit_override = std::nullopt,
std::optional<size_t> object_limit_override = std::nullopt);

// Take the cleanup semaphore before calling trim_throttled
ss::future<> trim_throttled(
std::optional<uint64_t> size_limit_override = std::nullopt,
std::optional<size_t> object_limit_override = std::nullopt);

void maybe_background_trim();

/// Whether an objects path makes it impervious to pinning, like
/// the access time tracker.
Expand Down
43 changes: 43 additions & 0 deletions src/v/cloud_storage/tests/cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cache_test_fixture.h"
#include "cloud_storage/access_time_tracker.h"
#include "cloud_storage/cache_service.h"
#include "random/generators.h"
#include "test_utils/fixture.h"
#include "units.h"
#include "utils/file_io.h"
Expand Down Expand Up @@ -538,3 +539,45 @@ FIXTURE_TEST(test_log_segment_cleanup, cache_test_fixture) {
return !std::filesystem::exists(path);
}));
}

FIXTURE_TEST(test_background_maybe_trim, cache_test_fixture) {
std::string write_buf(1_KiB, ' ');
random_generators::fill_buffer_randomchars(
write_buf.data(), write_buf.size());
size_t num_objects = 100;
std::vector<std::filesystem::path> object_keys;
for (int i = 0; i < num_objects; i++) {
object_keys.emplace_back(fmt::format("test_{}.log.1", i));
std::ofstream segment{CACHE_DIR / object_keys.back()};
segment.write(
write_buf.data(), static_cast<std::streamsize>(write_buf.size()));
segment.flush();
}

// Account all files in the cache (100 MiB).
clean_up_at_start().get();
BOOST_REQUIRE_EQUAL(get_object_count(), 100);

for (const auto& key : object_keys) {
// Touch every object so they have access times assigned to them
auto item = sharded_cache.local().get(key).get();
item->body.close().get();
}
BOOST_REQUIRE_EQUAL(get_object_count(), 100);

set_trim_thresholds(100.0, 50.0, 100);

// Do a put which should trigger a background trim and reduce the
// object count to 40, followed by a put that will increase it to 41.
auto data_string = create_data_string('a', 1_KiB);
put_into_cache(data_string, KEY);
wait_for_trim();

// 40: we set a trigger at 50. However, the new object reserves space so the
// target is adjusted to 49. The low water mark is 80%, which gives an
// adjusted target of 39. We add one for the new object to get a value
// of 40. NB: This calculation is specific to the 23.2.x backport. The
// reservation works a bit differently in newer versions of the code, so we
// end up with 41 objects in those cases.
BOOST_REQUIRE_EQUAL(get_object_count(), 40);
}
45 changes: 44 additions & 1 deletion src/v/cloud_storage/tests/cache_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@
#include "bytes/iobuf.h"
#include "cloud_storage/cache_service.h"
#include "config/property.h"
#include "seastarx.h"
#include "test_utils/scoped_config.h"
#include "test_utils/tmp_dir.h"
#include "units.h"

#include <seastar/core/lowres_clock.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/sstring.hh>

#include <boost/filesystem/operations.hpp>

#include <chrono>
#include <cstdint>
#include <filesystem>
#include <optional>

using namespace std::chrono_literals;

Expand Down Expand Up @@ -119,6 +122,46 @@ class cache_test_fixture {
})
.get();
}

void set_trim_thresholds(
double size_limit_percent,
double object_limit_percent,
uint32_t max_objects) {
cfg.get("cloud_storage_cache_trim_threshold_percent_size")
.set_value(std::make_optional<double>(size_limit_percent));
cfg.get("cloud_storage_cache_trim_threshold_percent_objects")
.set_value(std::make_optional<double>(object_limit_percent));

sharded_cache
.invoke_on(
ss::shard_id{0},
[max_objects](cloud_storage::cache& c) {
c._max_objects = config::mock_binding<uint32_t>(max_objects);
})
.get();
}

void wait_for_trim() {
// Waits for the cleanup semaphore to be available. This ensures that
// there are no trim operations in progress.
sharded_cache
.invoke_on(
ss::shard_id{0},
[](cloud_storage::cache& c) {
auto units = ss::get_units(c._cleanup_sm, 1).get();
})
.get();
}

uint32_t get_object_count() {
return sharded_cache
.invoke_on(
ss::shard_id{0},
[](cloud_storage::cache& c) { return c._current_cache_objects; })
.get();
}

scoped_config cfg;
};

} // namespace cloud_storage
18 changes: 18 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1941,6 +1941,24 @@ configuration::configuration()
"Number of chunks to prefetch ahead of every downloaded chunk",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
0)
, cloud_storage_cache_trim_threshold_percent_size(
*this,
"cloud_storage_cache_trim_threshold_percent_size",
"Trim is triggered when the cache reaches this percent of the maximum "
"cache size. If this is unset, the default behavior"
"is to start trim when the cache is about 100\% full.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::nullopt,
{.min = 1.0, .max = 100.0})
, cloud_storage_cache_trim_threshold_percent_objects(
*this,
"cloud_storage_cache_trim_threshold_percent_objects",
"Trim is triggered when the cache reaches this percent of the maximum "
"object count. If this is unset, the default behavior"
"is to start trim when the cache is about 100\% full.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::nullopt,
{.min = 1.0, .max = 100.0})
, superusers(
*this,
"superusers",
Expand Down
4 changes: 4 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ struct configuration final : public config_store {
enum_property<model::cloud_storage_chunk_eviction_strategy>
cloud_storage_chunk_eviction_strategy;
property<uint16_t> cloud_storage_chunk_prefetch;
bounded_property<std::optional<double>, numeric_bounds>
cloud_storage_cache_trim_threshold_percent_size;
bounded_property<std::optional<double>, numeric_bounds>
cloud_storage_cache_trim_threshold_percent_objects;

one_or_many_property<ss::sstring> superusers;

Expand Down
7 changes: 7 additions & 0 deletions src/v/config/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ class binding : public binding_base<T> {
friend class mock_property<T>;
template<typename U>
friend inline binding<U> mock_binding(U&&);
template<typename U>
friend inline binding<U> mock_binding(U const&);
};

/**
Expand All @@ -474,6 +476,11 @@ inline binding<T> mock_binding(T&& value) {
return binding<T>(std::forward<T>(value));
}

template<typename T>
inline binding<T> mock_binding(T const& value) {
return binding<T>(T(value));
}

/**
* A conversion property binding contains the result of application of
* a conversion function to property value. The result is update in-place
Expand Down

0 comments on commit 167c443

Please sign in to comment.