diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 4cddaac265fa..e58c0352da30 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -134,10 +135,7 @@ void cache::update_max_bytes() { _max_percent()); if (_current_cache_size > _max_bytes) { - ssx::spawn_with_gate(_gate, [this]() { - return ss::with_semaphore( - _cleanup_sm, 1, [this]() { return trim_throttled(); }); - }); + ssx::spawn_with_gate(_gate, [this]() { return trim_throttled(); }); } } @@ -263,7 +261,9 @@ std::optional cache::get_trim_delay() const { } } -ss::future<> cache::trim_throttled() { +ss::future<> cache::trim_throttled_unlocked( + std::optional size_limit_override, + std::optional object_limit_override) { // If we trimmed very recently then do not do it immediately: // this reduces load and improves chance of currently promoted // segments finishing their read work before we demote their @@ -279,7 +279,15 @@ ss::future<> cache::trim_throttled() { co_await ss::sleep_abortable(*trim_delay, _as); } - co_await trim(); + co_await trim(size_limit_override, object_limit_override); +} + +ss::future<> cache::trim_throttled( + std::optional size_limit_override, + std::optional object_limit_override) { + auto units = co_await ss::get_units(_cleanup_sm, 1); + co_await trim_throttled_unlocked( + size_limit_override, object_limit_override); } ss::future<> cache::trim_manually( @@ -1033,10 +1041,7 @@ ss::future<> cache::put( // Trim proactively: if many fibers hit this concurrently, // they'll contend for cleanup_sm and the losers will skip // trim due to throttling. - { - auto units = co_await ss::get_units(_cleanup_sm, 1); - co_await trim_throttled(); - } + co_await trim_throttled(); throw disk_full_error; } @@ -1213,9 +1218,55 @@ bool cache::may_exceed_limits(uint64_t bytes, size_t objects) { && !would_fit_in_cache; } +void cache::maybe_background_trim() { + auto& trim_threshold_pct_objects + = config::shard_local_cfg() + .cloud_storage_cache_trim_threshold_percent_objects; + auto& trim_threshold_pct_size + = config::shard_local_cfg() + .cloud_storage_cache_trim_threshold_percent_size; + if ( + !trim_threshold_pct_size.value().has_value() + && !trim_threshold_pct_objects.value().has_value()) { + return; + } + + uint64_t target_bytes = uint64_t( + _max_bytes * trim_threshold_pct_size.value().value_or(100.0) / 100.0); + uint32_t target_objects = uint32_t( + _max_objects() * trim_threshold_pct_objects.value().value_or(100.0) + / 100.0); + + bool bytes_over_limit = _current_cache_size + _reserved_cache_size + > target_bytes; + bool objects_over_limit = _current_cache_objects + _reserved_cache_objects + > target_objects; + + if (bytes_over_limit || objects_over_limit) { + auto units = ss::try_get_units(_cleanup_sm, 1); + if (units.has_value()) { + vlog(cst_log.debug, "Spawning background trim"); + ssx::spawn_with_gate( + _gate, + [this, + target_bytes, + target_objects, + u = std::move(units)]() mutable { + return trim_throttled_unlocked(target_bytes, target_objects) + .finally([u = std::move(u)] {}); + }); + } else { + vlog( + cst_log.debug, "Not spawning background trim: already started"); + } + } +} + ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { vassert(ss::this_shard_id() == ss::shard_id{0}, "Only call on shard 0"); + maybe_background_trim(); + if (may_reserve_space(bytes, objects)) { // Fast path: space was available. _reserved_cache_size += bytes; @@ -1263,7 +1314,7 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { // After taking lock, there still isn't space: means someone // else didn't take it and free space for us already, so we will // do the trim. - co_await trim_throttled(); + co_await trim_throttled_unlocked(); did_trim = true; } @@ -1411,5 +1462,4 @@ ss::future<> cache::initialize(std::filesystem::path cache_dir) { co_await ss::recursive_touch_directory(cache_dir.string()); } } - } // namespace cloud_storage diff --git a/src/v/cloud_storage/cache_service.h b/src/v/cloud_storage/cache_service.h index 5ed72e2a35b0..467e78377a06 100644 --- a/src/v/cloud_storage/cache_service.h +++ b/src/v/cloud_storage/cache_service.h @@ -24,8 +24,11 @@ #include #include #include +#include #include +#include +#include #include #include @@ -216,7 +219,16 @@ class cache : public ss::peering_sharded_service { std::optional get_trim_delay() const; /// Invoke trim, waiting if not enough time passed since the last trim - ss::future<> trim_throttled(); + ss::future<> trim_throttled_unlocked( + std::optional size_limit_override = std::nullopt, + std::optional object_limit_override = std::nullopt); + + // Take the cleanup semaphore before calling trim_throttled + ss::future<> trim_throttled( + std::optional size_limit_override = std::nullopt, + std::optional object_limit_override = std::nullopt); + + void maybe_background_trim(); /// Whether an objects path makes it impervious to pinning, like /// the access time tracker. diff --git a/src/v/cloud_storage/tests/cache_test.cc b/src/v/cloud_storage/tests/cache_test.cc index fa84f4eb6ee9..9e7c19d39e26 100644 --- a/src/v/cloud_storage/tests/cache_test.cc +++ b/src/v/cloud_storage/tests/cache_test.cc @@ -13,6 +13,7 @@ #include "cache_test_fixture.h" #include "cloud_storage/access_time_tracker.h" #include "cloud_storage/cache_service.h" +#include "random/generators.h" #include "test_utils/fixture.h" #include "units.h" #include "utils/file_io.h" @@ -538,3 +539,45 @@ FIXTURE_TEST(test_log_segment_cleanup, cache_test_fixture) { return !std::filesystem::exists(path); })); } + +FIXTURE_TEST(test_background_maybe_trim, cache_test_fixture) { + std::string write_buf(1_KiB, ' '); + random_generators::fill_buffer_randomchars( + write_buf.data(), write_buf.size()); + size_t num_objects = 100; + std::vector object_keys; + for (int i = 0; i < num_objects; i++) { + object_keys.emplace_back(fmt::format("test_{}.log.1", i)); + std::ofstream segment{CACHE_DIR / object_keys.back()}; + segment.write( + write_buf.data(), static_cast(write_buf.size())); + segment.flush(); + } + + // Account all files in the cache (100 MiB). + clean_up_at_start().get(); + BOOST_REQUIRE_EQUAL(get_object_count(), 100); + + for (const auto& key : object_keys) { + // Touch every object so they have access times assigned to them + auto item = sharded_cache.local().get(key).get(); + item->body.close().get(); + } + BOOST_REQUIRE_EQUAL(get_object_count(), 100); + + set_trim_thresholds(100.0, 50.0, 100); + + // Do a put which should trigger a background trim and reduce the + // object count to 40, followed by a put that will increase it to 41. + auto data_string = create_data_string('a', 1_KiB); + put_into_cache(data_string, KEY); + wait_for_trim(); + + // 40: we set a trigger at 50. However, the new object reserves space so the + // target is adjusted to 49. The low water mark is 80%, which gives an + // adjusted target of 39. We add one for the new object to get a value + // of 40. NB: This calculation is specific to the 23.2.x backport. The + // reservation works a bit differently in newer versions of the code, so we + // end up with 41 objects in those cases. + BOOST_REQUIRE_EQUAL(get_object_count(), 40); +} diff --git a/src/v/cloud_storage/tests/cache_test_fixture.h b/src/v/cloud_storage/tests/cache_test_fixture.h index 6562cfe4abcc..c21b82f16206 100644 --- a/src/v/cloud_storage/tests/cache_test_fixture.h +++ b/src/v/cloud_storage/tests/cache_test_fixture.h @@ -12,19 +12,22 @@ #include "bytes/iobuf.h" #include "cloud_storage/cache_service.h" #include "config/property.h" -#include "seastarx.h" +#include "test_utils/scoped_config.h" #include "test_utils/tmp_dir.h" #include "units.h" #include #include +#include #include #include #include #include +#include #include +#include using namespace std::chrono_literals; @@ -119,6 +122,46 @@ class cache_test_fixture { }) .get(); } + + void set_trim_thresholds( + double size_limit_percent, + double object_limit_percent, + uint32_t max_objects) { + cfg.get("cloud_storage_cache_trim_threshold_percent_size") + .set_value(std::make_optional(size_limit_percent)); + cfg.get("cloud_storage_cache_trim_threshold_percent_objects") + .set_value(std::make_optional(object_limit_percent)); + + sharded_cache + .invoke_on( + ss::shard_id{0}, + [max_objects](cloud_storage::cache& c) { + c._max_objects = config::mock_binding(max_objects); + }) + .get(); + } + + void wait_for_trim() { + // Waits for the cleanup semaphore to be available. This ensures that + // there are no trim operations in progress. + sharded_cache + .invoke_on( + ss::shard_id{0}, + [](cloud_storage::cache& c) { + auto units = ss::get_units(c._cleanup_sm, 1).get(); + }) + .get(); + } + + uint32_t get_object_count() { + return sharded_cache + .invoke_on( + ss::shard_id{0}, + [](cloud_storage::cache& c) { return c._current_cache_objects; }) + .get(); + } + + scoped_config cfg; }; } // namespace cloud_storage diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index ad60fbd49cc6..6b30ba341608 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -1941,6 +1941,24 @@ configuration::configuration() "Number of chunks to prefetch ahead of every downloaded chunk", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, 0) + , cloud_storage_cache_trim_threshold_percent_size( + *this, + "cloud_storage_cache_trim_threshold_percent_size", + "Trim is triggered when the cache reaches this percent of the maximum " + "cache size. If this is unset, the default behavior" + "is to start trim when the cache is about 100\% full.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + std::nullopt, + {.min = 1.0, .max = 100.0}) + , cloud_storage_cache_trim_threshold_percent_objects( + *this, + "cloud_storage_cache_trim_threshold_percent_objects", + "Trim is triggered when the cache reaches this percent of the maximum " + "object count. If this is unset, the default behavior" + "is to start trim when the cache is about 100\% full.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + std::nullopt, + {.min = 1.0, .max = 100.0}) , superusers( *this, "superusers", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 4e3f3fb564f4..7a7a77b7ab9d 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -368,6 +368,10 @@ struct configuration final : public config_store { enum_property cloud_storage_chunk_eviction_strategy; property cloud_storage_chunk_prefetch; + bounded_property, numeric_bounds> + cloud_storage_cache_trim_threshold_percent_size; + bounded_property, numeric_bounds> + cloud_storage_cache_trim_threshold_percent_objects; one_or_many_property superusers; diff --git a/src/v/config/property.h b/src/v/config/property.h index 3ef63b37cca2..e48484eb7fab 100644 --- a/src/v/config/property.h +++ b/src/v/config/property.h @@ -458,6 +458,8 @@ class binding : public binding_base { friend class mock_property; template friend inline binding mock_binding(U&&); + template + friend inline binding mock_binding(U const&); }; /** @@ -474,6 +476,11 @@ inline binding mock_binding(T&& value) { return binding(std::forward(value)); } +template +inline binding mock_binding(T const& value) { + return binding(T(value)); +} + /** * A conversion property binding contains the result of application of * a conversion function to property value. The result is update in-place