From 4417b1907a4d429ccaa55ced62f62972b2822dfc Mon Sep 17 00:00:00 2001 From: ballard26 Date: Mon, 23 May 2022 14:51:35 -0400 Subject: [PATCH] storage/log_manager: avoid repeated finds during housekeeping --- src/v/storage/log_housekeeping_meta.h | 3 + src/v/storage/log_manager.cc | 101 ++++++++++++-------------- src/v/storage/log_manager.h | 11 ++- 3 files changed, 58 insertions(+), 57 deletions(-) diff --git a/src/v/storage/log_housekeeping_meta.h b/src/v/storage/log_housekeeping_meta.h index b2b36f85fb07..fb169a93fa18 100644 --- a/src/v/storage/log_housekeeping_meta.h +++ b/src/v/storage/log_housekeeping_meta.h @@ -12,6 +12,7 @@ #pragma once #include "storage/log.h" +#include "utils/intrusive_list_helpers.h" namespace storage { struct log_housekeeping_meta { @@ -25,6 +26,8 @@ struct log_housekeeping_meta { log handle; bitflags flags{bitflags::none}; ss::lowres_clock::time_point last_compaction; + + intrusive_list_hook link; }; inline log_housekeeping_meta::bitflags operator|( diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index 3ff442cae71c..cf3ac8dcf1c0 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -88,18 +88,45 @@ ss::future<> log_manager::stop() { return _open_gate.close() .then([this] { return ss::parallel_for_each(_logs, [](logs_type::value_type& entry) { - return entry.second.handle.close(); + return entry.second->handle.close(); }); }) .then([this] { return _batch_cache.stop(); }); } -static inline logs_type::iterator find_next_non_compacted_log(logs_type& logs) { +/** + * `housekeeping_scan` scans over every current log in a single pass. + */ +ss::future<> +log_manager::housekeeping_scan(model::timestamp collection_threshold) { using bflags = log_housekeeping_meta::bitflags; - return std::find_if( - logs.begin(), logs.end(), [](const logs_type::value_type& l) { - return bflags::none == (l.second.flags & bflags::compacted); - }); + + if (_logs_list.empty()) { + co_return; + } + + for (auto& log_meta : _logs_list) { + log_meta.flags &= ~bflags::compacted; + } + + while ((_logs_list.front().flags & bflags::compacted) == bflags::none) { + auto& current_log = _logs_list.front(); + + _logs_list.pop_front(); + _logs_list.push_back(current_log); + + current_log.flags |= bflags::compacted; + current_log.last_compaction = ss::lowres_clock::now(); + co_await current_log.handle.compact(compaction_config( + collection_threshold, + _config.retention_bytes(), + _config.compaction_priority, + _abort_source)); + + if (_logs_list.empty()) { + co_return; + } + } } ss::future<> log_manager::housekeeping() { @@ -112,50 +139,10 @@ ss::future<> log_manager::housekeeping() { model::timestamp::now().value() - _config.delete_retention()->count()); } - /** - * Note that this loop does a double find - which is not fast. This solution - * is the tradeoff to *not* lock the segment during log_manager::remove(ntp) - * and to *not* use an ordered container. - * - * The issue with keeping an iterator and effectively doing a iterator++ - * is that during a concurrent log_manager::remove() we invalidate all the - * iterators for the absl::flat_hash_map; - an alternative here would be to - * use a lock - * - * Note that we also do not use an ordered container like an absl::tree-set - * because finds are frequent on this datastructure and we want to serve - * them as fast as we can since the majority of the time they will be in the - * hotpath / (request-response) - */ - using bflags = log_housekeeping_meta::bitflags; - return ss::do_until( - [this] { - auto it = find_next_non_compacted_log(_logs); - return it == _logs.end(); - }, - [this, collection_threshold] { - return ss::with_scheduling_group( - _config.compaction_sg, [this, collection_threshold] { - auto it = find_next_non_compacted_log(_logs); - if (it == _logs.end()) { - // must check again because between the stop - // condition and this continuation we might have - // removed the log - return ss::now(); - } - it->second.flags |= bflags::compacted; - it->second.last_compaction = ss::lowres_clock::now(); - return it->second.handle.compact(compaction_config( - collection_threshold, - _config.retention_bytes(), - _config.compaction_priority, - _abort_source)); - }); - }) - .finally([this] { - for (auto& h : _logs) { - h.second.flags &= ~bflags::compacted; - } + + return ss::with_scheduling_group( + _config.compaction_sg, [this, collection_threshold] { + return housekeeping_scan(collection_threshold); }); } @@ -229,7 +216,9 @@ ss::future log_manager::do_manage(ntp_config cfg) { if (_config.stype == log_config::storage_type::memory) { auto path = cfg.work_directory(); auto l = storage::make_memory_backed_log(std::move(cfg)); - _logs.emplace(l.config().ntp(), l); + auto [it, _] = _logs.emplace( + l.config().ntp(), std::make_unique(l)); + _logs_list.push_back(*it->second); // in-memory needs to write vote_for configuration return ss::recursive_touch_directory(path).then([l] { return l; }); } @@ -248,7 +237,9 @@ ss::future log_manager::do_manage(ntp_config cfg) { .then([this, cfg = std::move(cfg)](segment_set segments) mutable { auto l = storage::make_disk_backed_log( std::move(cfg), *this, std::move(segments), _kvstore); - auto [_, success] = _logs.emplace(l.config().ntp(), l); + auto [it, success] = _logs.emplace( + l.config().ntp(), std::make_unique(l)); + _logs_list.push_back(*it->second); vassert( success, "Could not keep track of:{} - concurrency issue", l); return l; @@ -263,7 +254,7 @@ ss::future<> log_manager::shutdown(model::ntp ntp) { if (handle.empty()) { return ss::make_ready_future<>(); } - storage::log lg = handle.mapped().handle; + storage::log lg = handle.mapped()->handle; return lg.close().finally([lg] {}); }); } @@ -276,7 +267,7 @@ ss::future<> log_manager::remove(model::ntp ntp) { return ss::make_ready_future<>(); } // 'ss::shared_ptr<>' make a copy - storage::log lg = handle.mapped().handle; + storage::log lg = handle.mapped()->handle; vlog(stlog.info, "Removing: {}", lg); // NOTE: it is ok to *not* externally synchronize the log here // because remove, takes a write lock on each individual segments @@ -340,7 +331,7 @@ int64_t log_manager::compaction_backlog() const { _logs.end(), int64_t(0), [](int64_t acc, const logs_type::value_type& p) { - return acc + p.second.handle.compaction_backlog(); + return acc + p.second->handle.compaction_backlog(); }); } diff --git a/src/v/storage/log_manager.h b/src/v/storage/log_manager.h index 95ee208cb5c6..b1d9846dace1 100644 --- a/src/v/storage/log_manager.h +++ b/src/v/storage/log_manager.h @@ -23,6 +23,7 @@ #include "storage/types.h" #include "storage/version.h" #include "units.h" +#include "utils/intrusive_list_helpers.h" #include "utils/mutex.h" #include @@ -216,7 +217,7 @@ class log_manager { /// Returns the log for the specified ntp. std::optional get(const model::ntp& ntp) { if (auto it = _logs.find(ntp); it != _logs.end()) { - return it->second.handle; + return it->second->handle; } return std::nullopt; } @@ -227,7 +228,10 @@ class log_manager { int64_t compaction_backlog() const; private: - using logs_type = absl::flat_hash_map; + using logs_type + = absl::flat_hash_map>; + using compaction_list_type + = intrusive_list; ss::future do_manage(ntp_config); @@ -243,11 +247,14 @@ class log_manager { ss::future<> dispatch_topic_dir_deletion(ss::sstring dir); ss::future<> recover_log_state(const ntp_config&); + ss::future<> housekeeping_scan(model::timestamp); + log_config _config; kvstore& _kvstore; simple_time_jitter _jitter; ss::timer _compaction_timer; logs_type _logs; + compaction_list_type _logs_list; batch_cache _batch_cache; ss::gate _open_gate; ss::abort_source _abort_source;