From 167c4430530be25ca643fb7b6c463416c1c73de6 Mon Sep 17 00:00:00 2001 From: Jim Cipar Date: Fri, 31 May 2024 17:49:33 -0400 Subject: [PATCH] [v23.2.x] cloud: Add config options to trigger cache trim When the cloud storage cache is completely filled, puts to the cache are blocked. This prevents readers from progressing. In some clusters the trim operation can take a long time, up to hours. When this happens, readers will time out, generating errors. This commit adds two configuration options that control when the trim is triggered. By setting these lower than the caches maximum size, cloud storage can begin a trim operation before the cache is completely filled, allowing readers to progress as the trim is going on in the background. - cloud_storage_cache_trim_threshold_size - cloud_storage_cache_trim_threshold_objects When a put occurs, if the cache is over these limits it triggers a trim in the background. (cherry picked from commit 027632c42438a845bfa47904eabe5f9f1797fc24) --- src/v/cloud_storage/cache_service.cc | 74 ++++++++++++++++--- src/v/cloud_storage/cache_service.h | 14 +++- src/v/cloud_storage/tests/cache_test.cc | 43 +++++++++++ .../cloud_storage/tests/cache_test_fixture.h | 45 ++++++++++- src/v/config/configuration.cc | 18 +++++ src/v/config/configuration.h | 4 + src/v/config/property.h | 7 ++ 7 files changed, 191 insertions(+), 14 deletions(-) diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 4cddaac265fa..e58c0352da30 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -134,10 +135,7 @@ void cache::update_max_bytes() { _max_percent()); if (_current_cache_size > _max_bytes) { - ssx::spawn_with_gate(_gate, [this]() { - return ss::with_semaphore( - _cleanup_sm, 1, [this]() { return trim_throttled(); }); - }); + ssx::spawn_with_gate(_gate, [this]() { return trim_throttled(); }); } } @@ -263,7 +261,9 @@ std::optional cache::get_trim_delay() const { } } -ss::future<> cache::trim_throttled() { +ss::future<> cache::trim_throttled_unlocked( + std::optional size_limit_override, + std::optional object_limit_override) { // If we trimmed very recently then do not do it immediately: // this reduces load and improves chance of currently promoted // segments finishing their read work before we demote their @@ -279,7 +279,15 @@ ss::future<> cache::trim_throttled() { co_await ss::sleep_abortable(*trim_delay, _as); } - co_await trim(); + co_await trim(size_limit_override, object_limit_override); +} + +ss::future<> cache::trim_throttled( + std::optional size_limit_override, + std::optional object_limit_override) { + auto units = co_await ss::get_units(_cleanup_sm, 1); + co_await trim_throttled_unlocked( + size_limit_override, object_limit_override); } ss::future<> cache::trim_manually( @@ -1033,10 +1041,7 @@ ss::future<> cache::put( // Trim proactively: if many fibers hit this concurrently, // they'll contend for cleanup_sm and the losers will skip // trim due to throttling. - { - auto units = co_await ss::get_units(_cleanup_sm, 1); - co_await trim_throttled(); - } + co_await trim_throttled(); throw disk_full_error; } @@ -1213,9 +1218,55 @@ bool cache::may_exceed_limits(uint64_t bytes, size_t objects) { && !would_fit_in_cache; } +void cache::maybe_background_trim() { + auto& trim_threshold_pct_objects + = config::shard_local_cfg() + .cloud_storage_cache_trim_threshold_percent_objects; + auto& trim_threshold_pct_size + = config::shard_local_cfg() + .cloud_storage_cache_trim_threshold_percent_size; + if ( + !trim_threshold_pct_size.value().has_value() + && !trim_threshold_pct_objects.value().has_value()) { + return; + } + + uint64_t target_bytes = uint64_t( + _max_bytes * trim_threshold_pct_size.value().value_or(100.0) / 100.0); + uint32_t target_objects = uint32_t( + _max_objects() * trim_threshold_pct_objects.value().value_or(100.0) + / 100.0); + + bool bytes_over_limit = _current_cache_size + _reserved_cache_size + > target_bytes; + bool objects_over_limit = _current_cache_objects + _reserved_cache_objects + > target_objects; + + if (bytes_over_limit || objects_over_limit) { + auto units = ss::try_get_units(_cleanup_sm, 1); + if (units.has_value()) { + vlog(cst_log.debug, "Spawning background trim"); + ssx::spawn_with_gate( + _gate, + [this, + target_bytes, + target_objects, + u = std::move(units)]() mutable { + return trim_throttled_unlocked(target_bytes, target_objects) + .finally([u = std::move(u)] {}); + }); + } else { + vlog( + cst_log.debug, "Not spawning background trim: already started"); + } + } +} + ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { vassert(ss::this_shard_id() == ss::shard_id{0}, "Only call on shard 0"); + maybe_background_trim(); + if (may_reserve_space(bytes, objects)) { // Fast path: space was available. _reserved_cache_size += bytes; @@ -1263,7 +1314,7 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { // After taking lock, there still isn't space: means someone // else didn't take it and free space for us already, so we will // do the trim. - co_await trim_throttled(); + co_await trim_throttled_unlocked(); did_trim = true; } @@ -1411,5 +1462,4 @@ ss::future<> cache::initialize(std::filesystem::path cache_dir) { co_await ss::recursive_touch_directory(cache_dir.string()); } } - } // namespace cloud_storage diff --git a/src/v/cloud_storage/cache_service.h b/src/v/cloud_storage/cache_service.h index 5ed72e2a35b0..467e78377a06 100644 --- a/src/v/cloud_storage/cache_service.h +++ b/src/v/cloud_storage/cache_service.h @@ -24,8 +24,11 @@ #include #include #include +#include #include +#include +#include #include #include @@ -216,7 +219,16 @@ class cache : public ss::peering_sharded_service { std::optional get_trim_delay() const; /// Invoke trim, waiting if not enough time passed since the last trim - ss::future<> trim_throttled(); + ss::future<> trim_throttled_unlocked( + std::optional size_limit_override = std::nullopt, + std::optional object_limit_override = std::nullopt); + + // Take the cleanup semaphore before calling trim_throttled + ss::future<> trim_throttled( + std::optional size_limit_override = std::nullopt, + std::optional object_limit_override = std::nullopt); + + void maybe_background_trim(); /// Whether an objects path makes it impervious to pinning, like /// the access time tracker. diff --git a/src/v/cloud_storage/tests/cache_test.cc b/src/v/cloud_storage/tests/cache_test.cc index fa84f4eb6ee9..9e7c19d39e26 100644 --- a/src/v/cloud_storage/tests/cache_test.cc +++ b/src/v/cloud_storage/tests/cache_test.cc @@ -13,6 +13,7 @@ #include "cache_test_fixture.h" #include "cloud_storage/access_time_tracker.h" #include "cloud_storage/cache_service.h" +#include "random/generators.h" #include "test_utils/fixture.h" #include "units.h" #include "utils/file_io.h" @@ -538,3 +539,45 @@ FIXTURE_TEST(test_log_segment_cleanup, cache_test_fixture) { return !std::filesystem::exists(path); })); } + +FIXTURE_TEST(test_background_maybe_trim, cache_test_fixture) { + std::string write_buf(1_KiB, ' '); + random_generators::fill_buffer_randomchars( + write_buf.data(), write_buf.size()); + size_t num_objects = 100; + std::vector object_keys; + for (int i = 0; i < num_objects; i++) { + object_keys.emplace_back(fmt::format("test_{}.log.1", i)); + std::ofstream segment{CACHE_DIR / object_keys.back()}; + segment.write( + write_buf.data(), static_cast(write_buf.size())); + segment.flush(); + } + + // Account all files in the cache (100 MiB). + clean_up_at_start().get(); + BOOST_REQUIRE_EQUAL(get_object_count(), 100); + + for (const auto& key : object_keys) { + // Touch every object so they have access times assigned to them + auto item = sharded_cache.local().get(key).get(); + item->body.close().get(); + } + BOOST_REQUIRE_EQUAL(get_object_count(), 100); + + set_trim_thresholds(100.0, 50.0, 100); + + // Do a put which should trigger a background trim and reduce the + // object count to 40, followed by a put that will increase it to 41. + auto data_string = create_data_string('a', 1_KiB); + put_into_cache(data_string, KEY); + wait_for_trim(); + + // 40: we set a trigger at 50. However, the new object reserves space so the + // target is adjusted to 49. The low water mark is 80%, which gives an + // adjusted target of 39. We add one for the new object to get a value + // of 40. NB: This calculation is specific to the 23.2.x backport. The + // reservation works a bit differently in newer versions of the code, so we + // end up with 41 objects in those cases. + BOOST_REQUIRE_EQUAL(get_object_count(), 40); +} diff --git a/src/v/cloud_storage/tests/cache_test_fixture.h b/src/v/cloud_storage/tests/cache_test_fixture.h index 6562cfe4abcc..c21b82f16206 100644 --- a/src/v/cloud_storage/tests/cache_test_fixture.h +++ b/src/v/cloud_storage/tests/cache_test_fixture.h @@ -12,19 +12,22 @@ #include "bytes/iobuf.h" #include "cloud_storage/cache_service.h" #include "config/property.h" -#include "seastarx.h" +#include "test_utils/scoped_config.h" #include "test_utils/tmp_dir.h" #include "units.h" #include #include +#include #include #include #include #include +#include #include +#include using namespace std::chrono_literals; @@ -119,6 +122,46 @@ class cache_test_fixture { }) .get(); } + + void set_trim_thresholds( + double size_limit_percent, + double object_limit_percent, + uint32_t max_objects) { + cfg.get("cloud_storage_cache_trim_threshold_percent_size") + .set_value(std::make_optional(size_limit_percent)); + cfg.get("cloud_storage_cache_trim_threshold_percent_objects") + .set_value(std::make_optional(object_limit_percent)); + + sharded_cache + .invoke_on( + ss::shard_id{0}, + [max_objects](cloud_storage::cache& c) { + c._max_objects = config::mock_binding(max_objects); + }) + .get(); + } + + void wait_for_trim() { + // Waits for the cleanup semaphore to be available. This ensures that + // there are no trim operations in progress. + sharded_cache + .invoke_on( + ss::shard_id{0}, + [](cloud_storage::cache& c) { + auto units = ss::get_units(c._cleanup_sm, 1).get(); + }) + .get(); + } + + uint32_t get_object_count() { + return sharded_cache + .invoke_on( + ss::shard_id{0}, + [](cloud_storage::cache& c) { return c._current_cache_objects; }) + .get(); + } + + scoped_config cfg; }; } // namespace cloud_storage diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index ad60fbd49cc6..6b30ba341608 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -1941,6 +1941,24 @@ configuration::configuration() "Number of chunks to prefetch ahead of every downloaded chunk", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, 0) + , cloud_storage_cache_trim_threshold_percent_size( + *this, + "cloud_storage_cache_trim_threshold_percent_size", + "Trim is triggered when the cache reaches this percent of the maximum " + "cache size. If this is unset, the default behavior" + "is to start trim when the cache is about 100\% full.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + std::nullopt, + {.min = 1.0, .max = 100.0}) + , cloud_storage_cache_trim_threshold_percent_objects( + *this, + "cloud_storage_cache_trim_threshold_percent_objects", + "Trim is triggered when the cache reaches this percent of the maximum " + "object count. If this is unset, the default behavior" + "is to start trim when the cache is about 100\% full.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + std::nullopt, + {.min = 1.0, .max = 100.0}) , superusers( *this, "superusers", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 4e3f3fb564f4..7a7a77b7ab9d 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -368,6 +368,10 @@ struct configuration final : public config_store { enum_property cloud_storage_chunk_eviction_strategy; property cloud_storage_chunk_prefetch; + bounded_property, numeric_bounds> + cloud_storage_cache_trim_threshold_percent_size; + bounded_property, numeric_bounds> + cloud_storage_cache_trim_threshold_percent_objects; one_or_many_property superusers; diff --git a/src/v/config/property.h b/src/v/config/property.h index 3ef63b37cca2..e48484eb7fab 100644 --- a/src/v/config/property.h +++ b/src/v/config/property.h @@ -458,6 +458,8 @@ class binding : public binding_base { friend class mock_property; template friend inline binding mock_binding(U&&); + template + friend inline binding mock_binding(U const&); }; /** @@ -474,6 +476,11 @@ inline binding mock_binding(T&& value) { return binding(std::forward(value)); } +template +inline binding mock_binding(T const& value) { + return binding(T(value)); +} + /** * A conversion property binding contains the result of application of * a conversion function to property value. The result is update in-place