Skip to content

Commit

Permalink
Merge pull request #4891 from ballard26/storage-find-optimization
Browse files Browse the repository at this point in the history
Attempt to avoid repeated finds during housekeeping
  • Loading branch information
dotnwat committed Jun 3, 2022
2 parents 8f8731b + 4417b19 commit a194d3e
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 57 deletions.
3 changes: 3 additions & 0 deletions src/v/storage/log_housekeeping_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#pragma once

#include "storage/log.h"
#include "utils/intrusive_list_helpers.h"

namespace storage {
struct log_housekeeping_meta {
Expand All @@ -25,6 +26,8 @@ struct log_housekeeping_meta {
log handle;
bitflags flags{bitflags::none};
ss::lowres_clock::time_point last_compaction;

intrusive_list_hook link;
};

inline log_housekeeping_meta::bitflags operator|(
Expand Down
101 changes: 46 additions & 55 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,45 @@ ss::future<> log_manager::stop() {
return _open_gate.close()
.then([this] {
return ss::parallel_for_each(_logs, [](logs_type::value_type& entry) {
return entry.second.handle.close();
return entry.second->handle.close();
});
})
.then([this] { return _batch_cache.stop(); });
}

static inline logs_type::iterator find_next_non_compacted_log(logs_type& logs) {
/**
* `housekeeping_scan` scans over every current log in a single pass.
*/
ss::future<>
log_manager::housekeeping_scan(model::timestamp collection_threshold) {
using bflags = log_housekeeping_meta::bitflags;
return std::find_if(
logs.begin(), logs.end(), [](const logs_type::value_type& l) {
return bflags::none == (l.second.flags & bflags::compacted);
});

if (_logs_list.empty()) {
co_return;
}

for (auto& log_meta : _logs_list) {
log_meta.flags &= ~bflags::compacted;
}

while ((_logs_list.front().flags & bflags::compacted) == bflags::none) {
auto& current_log = _logs_list.front();

_logs_list.pop_front();
_logs_list.push_back(current_log);

current_log.flags |= bflags::compacted;
current_log.last_compaction = ss::lowres_clock::now();
co_await current_log.handle.compact(compaction_config(
collection_threshold,
_config.retention_bytes(),
_config.compaction_priority,
_abort_source));

if (_logs_list.empty()) {
co_return;
}
}
}

ss::future<> log_manager::housekeeping() {
Expand All @@ -112,50 +139,10 @@ ss::future<> log_manager::housekeeping() {
model::timestamp::now().value()
- _config.delete_retention()->count());
}
/**
* Note that this loop does a double find - which is not fast. This solution
* is the tradeoff to *not* lock the segment during log_manager::remove(ntp)
* and to *not* use an ordered container.
*
* The issue with keeping an iterator and effectively doing a iterator++
* is that during a concurrent log_manager::remove() we invalidate all the
* iterators for the absl::flat_hash_map; - an alternative here would be to
* use a lock
*
* Note that we also do not use an ordered container like an absl::tree-set
* because finds are frequent on this datastructure and we want to serve
* them as fast as we can since the majority of the time they will be in the
* hotpath / (request-response)
*/
using bflags = log_housekeeping_meta::bitflags;
return ss::do_until(
[this] {
auto it = find_next_non_compacted_log(_logs);
return it == _logs.end();
},
[this, collection_threshold] {
return ss::with_scheduling_group(
_config.compaction_sg, [this, collection_threshold] {
auto it = find_next_non_compacted_log(_logs);
if (it == _logs.end()) {
// must check again because between the stop
// condition and this continuation we might have
// removed the log
return ss::now();
}
it->second.flags |= bflags::compacted;
it->second.last_compaction = ss::lowres_clock::now();
return it->second.handle.compact(compaction_config(
collection_threshold,
_config.retention_bytes(),
_config.compaction_priority,
_abort_source));
});
})
.finally([this] {
for (auto& h : _logs) {
h.second.flags &= ~bflags::compacted;
}

return ss::with_scheduling_group(
_config.compaction_sg, [this, collection_threshold] {
return housekeeping_scan(collection_threshold);
});
}

Expand Down Expand Up @@ -229,7 +216,9 @@ ss::future<log> log_manager::do_manage(ntp_config cfg) {
if (_config.stype == log_config::storage_type::memory) {
auto path = cfg.work_directory();
auto l = storage::make_memory_backed_log(std::move(cfg));
_logs.emplace(l.config().ntp(), l);
auto [it, _] = _logs.emplace(
l.config().ntp(), std::make_unique<log_housekeeping_meta>(l));
_logs_list.push_back(*it->second);
// in-memory needs to write vote_for configuration
return ss::recursive_touch_directory(path).then([l] { return l; });
}
Expand All @@ -248,7 +237,9 @@ ss::future<log> log_manager::do_manage(ntp_config cfg) {
.then([this, cfg = std::move(cfg)](segment_set segments) mutable {
auto l = storage::make_disk_backed_log(
std::move(cfg), *this, std::move(segments), _kvstore);
auto [_, success] = _logs.emplace(l.config().ntp(), l);
auto [it, success] = _logs.emplace(
l.config().ntp(), std::make_unique<log_housekeeping_meta>(l));
_logs_list.push_back(*it->second);
vassert(
success, "Could not keep track of:{} - concurrency issue", l);
return l;
Expand All @@ -263,7 +254,7 @@ ss::future<> log_manager::shutdown(model::ntp ntp) {
if (handle.empty()) {
return ss::make_ready_future<>();
}
storage::log lg = handle.mapped().handle;
storage::log lg = handle.mapped()->handle;
return lg.close().finally([lg] {});
});
}
Expand All @@ -276,7 +267,7 @@ ss::future<> log_manager::remove(model::ntp ntp) {
return ss::make_ready_future<>();
}
// 'ss::shared_ptr<>' make a copy
storage::log lg = handle.mapped().handle;
storage::log lg = handle.mapped()->handle;
vlog(stlog.info, "Removing: {}", lg);
// NOTE: it is ok to *not* externally synchronize the log here
// because remove, takes a write lock on each individual segments
Expand Down Expand Up @@ -340,7 +331,7 @@ int64_t log_manager::compaction_backlog() const {
_logs.end(),
int64_t(0),
[](int64_t acc, const logs_type::value_type& p) {
return acc + p.second.handle.compaction_backlog();
return acc + p.second->handle.compaction_backlog();
});
}

Expand Down
11 changes: 9 additions & 2 deletions src/v/storage/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "storage/types.h"
#include "storage/version.h"
#include "units.h"
#include "utils/intrusive_list_helpers.h"
#include "utils/mutex.h"

#include <seastar/core/abort_source.hh>
Expand Down Expand Up @@ -216,7 +217,7 @@ class log_manager {
/// Returns the log for the specified ntp.
std::optional<log> get(const model::ntp& ntp) {
if (auto it = _logs.find(ntp); it != _logs.end()) {
return it->second.handle;
return it->second->handle;
}
return std::nullopt;
}
Expand All @@ -227,7 +228,10 @@ class log_manager {
int64_t compaction_backlog() const;

private:
using logs_type = absl::flat_hash_map<model::ntp, log_housekeeping_meta>;
using logs_type
= absl::flat_hash_map<model::ntp, std::unique_ptr<log_housekeeping_meta>>;
using compaction_list_type
= intrusive_list<log_housekeeping_meta, &log_housekeeping_meta::link>;

ss::future<log> do_manage(ntp_config);

Expand All @@ -243,11 +247,14 @@ class log_manager {
ss::future<> dispatch_topic_dir_deletion(ss::sstring dir);
ss::future<> recover_log_state(const ntp_config&);

ss::future<> housekeeping_scan(model::timestamp);

log_config _config;
kvstore& _kvstore;
simple_time_jitter<ss::lowres_clock> _jitter;
ss::timer<ss::lowres_clock> _compaction_timer;
logs_type _logs;
compaction_list_type _logs_list;
batch_cache _batch_cache;
ss::gate _open_gate;
ss::abort_source _abort_source;
Expand Down

0 comments on commit a194d3e

Please sign in to comment.