Skip to content

Commit

Permalink
Merge pull request #5127 from jcsp/storage-clean-shutdown
Browse files Browse the repository at this point in the history
storage: explicitly mark clean segments in kvstore on clean shutdown
  • Loading branch information
jcsp committed Jul 5, 2022
2 parents 82c8fe9 + 5509f2d commit 630e791
Show file tree
Hide file tree
Showing 12 changed files with 207 additions and 108 deletions.
45 changes: 38 additions & 7 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,16 @@ ss::future<> disk_log_impl::remove() {
return _kvstore.remove(
kvstore::key_space::storage,
internal::start_offset_key(config().ntp()));
})
.then([this] {
return _kvstore.remove(
kvstore::key_space::storage,
internal::clean_segment_key(config().ntp()));
});
});
}
ss::future<> disk_log_impl::close() {

ss::future<std::optional<ss::sstring>> disk_log_impl::close() {
vassert(!_closed, "Invalid double closing of log - {}", *this);
vlog(stlog.debug, "closing log {}", *this);
_closed = true;
Expand All @@ -122,13 +128,38 @@ ss::future<> disk_log_impl::close() {
vlog(stlog.trace, "waiting for {} compaction to finish", config().ntp());
co_await _compaction_gate.close();
vlog(stlog.trace, "stopping {} readers cache", config().ntp());
co_await _readers_cache->stop().then([this] {
return ss::parallel_for_each(_segs, [](ss::lw_shared_ptr<segment>& h) {
return h->close().handle_exception([h](std::exception_ptr e) {
vlog(stlog.error, "Error closing segment:{} - {}", e, h);
});
});

// close() on the segments is not expected to fail, but it might
// encounter I/O errors (e.g. ENOSPC, EIO, out of file handles)
// when trying to flush. If that happens, all bets are off and
// we will not mark our latest segment as clean (even if that
// particular segment didn't report an I/O error)
bool errors = false;

co_await _readers_cache->stop().then([this, &errors] {
return ss::parallel_for_each(
_segs, [&errors](ss::lw_shared_ptr<segment>& h) {
return h->close().handle_exception(
[&errors, h](std::exception_ptr e) {
vlog(stlog.error, "Error closing segment:{} - {}", e, h);
errors = true;
});
});
});

if (_segs.size() && !errors) {
auto clean_seg = _segs.back()->filename();
vlog(
stlog.debug,
"closed {}, last clean segment is {}",
config().ntp(),
clean_seg);
co_return clean_seg;
}

// Avoid assuming there will always be a segment: it is legal to
// open a log + close it without writing anything.
co_return std::nullopt;
}

model::offset disk_log_impl::size_based_gc_max_offset(size_t max_size) {
Expand Down
2 changes: 1 addition & 1 deletion src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class disk_log_impl final : public log::impl {
disk_log_impl(const disk_log_impl&) = delete;
disk_log_impl& operator=(const disk_log_impl&) = delete;

ss::future<> close() final;
ss::future<std::optional<ss::sstring>> close() final;
ss::future<> remove() final;
ss::future<> flush() final;
ss::future<> truncate(truncate_config) final;
Expand Down
3 changes: 2 additions & 1 deletion src/v/storage/kvstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ ss::future<> kvstore::recover() {
[] { return std::nullopt; },
_as,
config::shard_local_cfg().storage_read_buffer_size(),
config::shard_local_cfg().storage_read_readahead_count())
config::shard_local_cfg().storage_read_readahead_count(),
std::nullopt)
.get0();

replay_segments_in_thread(std::move(segments));
Expand Down
4 changes: 2 additions & 2 deletions src/v/storage/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class log final {
virtual log_appender make_appender(log_append_config) = 0;

// final operation. Invalid filesystem state after
virtual ss::future<> close() = 0;
virtual ss::future<std::optional<ss::sstring>> close() = 0;
// final operation. Invalid state after
virtual ss::future<> remove() = 0;

Expand Down Expand Up @@ -99,7 +99,7 @@ class log final {
public:
explicit log(ss::shared_ptr<impl> i)
: _impl(std::move(i)) {}
ss::future<> close() { return _impl->close(); }
ss::future<std::optional<ss::sstring>> close() { return _impl->close(); }
ss::future<> remove() { return _impl->remove(); }
ss::future<> flush() { return _impl->flush(); }

Expand Down
139 changes: 86 additions & 53 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <seastar/core/thread.hh>
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>

#include <fmt/format.h>

Expand Down Expand Up @@ -84,17 +85,36 @@ void log_manager::trigger_housekeeping() {
});
}

ss::future<> log_manager::clean_close(storage::log& log) {
auto clean_segment = co_await log.close();

if (clean_segment) {
vlog(
stlog.debug,
"writing clean record for: {} {}",
log.config().ntp(),
clean_segment.value());
co_await _kvstore.put(
kvstore::key_space::storage,
internal::clean_segment_key(log.config().ntp()),
serde::to_iobuf(internal::clean_segment_value{
.segment_name = std::filesystem::path(clean_segment.value())
.filename()
.string()}));
}
}

ss::future<> log_manager::stop() {
_compaction_timer.cancel();
_abort_source.request_abort();
return _open_gate.close()
.then([this] {
return ss::parallel_for_each(_logs, [](logs_type::value_type& entry) {
return entry.second->handle.close();
});
})
.then([this] { return _batch_cache.stop(); })
.then([this] { return ssx::async_clear(_logs)(); });

co_await _open_gate.close();
co_await ss::coroutine::parallel_for_each(
_logs, [this](logs_type::value_type& entry) {
return clean_close(entry.second->handle);
});
co_await _batch_cache.stop();
co_await ssx::async_clear(_logs)();
}

/**
Expand Down Expand Up @@ -189,28 +209,34 @@ log_manager::create_cache(with_cache ntp_cache_enabled) {
}

ss::future<log> log_manager::manage(ntp_config cfg) {
return ss::with_gate(_open_gate, [this, cfg = std::move(cfg)]() mutable {
return do_manage(std::move(cfg));
});
auto gate = _open_gate.hold();

co_return co_await do_manage(std::move(cfg));
}

ss::future<> log_manager::recover_log_state(const ntp_config& cfg) {
return ss::file_exists(cfg.work_directory())
.then(
[this, key = internal::start_offset_key(cfg.ntp())](bool dir_exists) {
if (dir_exists) {
return ss::now();
}
// directory was deleted, make sure we do not have any state in KV
// store.
return _kvstore.remove(kvstore::key_space::storage, key);
});
.then([this,
offset_key = internal::start_offset_key(cfg.ntp()),
segment_key = internal::clean_segment_key(cfg.ntp())](
bool dir_exists) {
if (dir_exists) {
return ss::now();
}
// directory was deleted, make sure we do not have any state in KV
// store.
return _kvstore.remove(kvstore::key_space::storage, offset_key)
.then([this, segment_key] {
return _kvstore.remove(
kvstore::key_space::storage, segment_key);
});
});
}

ss::future<log> log_manager::do_manage(ntp_config cfg) {
if (_config.base_dir.empty()) {
return ss::make_exception_future<log>(std::runtime_error(
"log_manager:: cannot have empty config.base_dir"));
throw std::runtime_error(
"log_manager:: cannot have empty config.base_dir");
}

vassert(
Expand All @@ -223,43 +249,50 @@ ss::future<log> log_manager::do_manage(ntp_config cfg) {
l.config().ntp(), std::make_unique<log_housekeeping_meta>(l));
_logs_list.push_back(*it->second);
// in-memory needs to write vote_for configuration
return ss::recursive_touch_directory(path).then([l] { return l; });
co_await ss::recursive_touch_directory(path);
co_return l;
}

return recover_log_state(cfg).then([this, cfg = std::move(cfg)]() mutable {
ss::sstring path = cfg.work_directory();
with_cache cache_enabled = cfg.cache_enabled();
return recover_segments(
std::filesystem::path(path),
_config.sanitize_fileops,
cfg.is_compacted(),
[this, cache_enabled] { return create_cache(cache_enabled); },
_abort_source,
config::shard_local_cfg().storage_read_buffer_size(),
config::shard_local_cfg().storage_read_readahead_count())
.then([this, cfg = std::move(cfg)](segment_set segments) mutable {
auto l = storage::make_disk_backed_log(
std::move(cfg), *this, std::move(segments), _kvstore);
auto [it, success] = _logs.emplace(
l.config().ntp(), std::make_unique<log_housekeeping_meta>(l));
_logs_list.push_back(*it->second);
vassert(
success, "Could not keep track of:{} - concurrency issue", l);
return l;
});
});
std::optional<ss::sstring> last_clean_segment;
auto clean_iobuf = _kvstore.get(
kvstore::key_space::storage, internal::clean_segment_key(cfg.ntp()));
if (clean_iobuf) {
last_clean_segment = serde::from_iobuf<internal::clean_segment_value>(
std::move(clean_iobuf.value()))
.segment_name;
}

co_await recover_log_state(cfg);

ss::sstring path = cfg.work_directory();
with_cache cache_enabled = cfg.cache_enabled();
auto segments = co_await recover_segments(
std::filesystem::path(path),
_config.sanitize_fileops,
cfg.is_compacted(),
[this, cache_enabled] { return create_cache(cache_enabled); },
_abort_source,
config::shard_local_cfg().storage_read_buffer_size(),
config::shard_local_cfg().storage_read_readahead_count(),
last_clean_segment);

auto l = storage::make_disk_backed_log(
std::move(cfg), *this, std::move(segments), _kvstore);
auto [it, success] = _logs.emplace(
l.config().ntp(), std::make_unique<log_housekeeping_meta>(l));
_logs_list.push_back(*it->second);
vassert(success, "Could not keep track of:{} - concurrency issue", l);
co_return l;
}

ss::future<> log_manager::shutdown(model::ntp ntp) {
vlog(stlog.debug, "Asked to shutdown: {}", ntp);
return ss::with_gate(_open_gate, [this, ntp = std::move(ntp)] {
auto handle = _logs.extract(ntp);
if (handle.empty()) {
return ss::make_ready_future<>();
}
storage::log lg = handle.mapped()->handle;
return lg.close().finally([lg] {});
});
auto gate = _open_gate.hold();
auto handle = _logs.extract(ntp);
if (handle.empty()) {
co_return;
}
co_await clean_close(handle.mapped()->handle);
}

ss::future<> log_manager::remove(model::ntp ntp) {
Expand Down
1 change: 1 addition & 0 deletions src/v/storage/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class log_manager {
= intrusive_list<log_housekeeping_meta, &log_housekeeping_meta::link>;

ss::future<log> do_manage(ntp_config);
ss::future<> clean_close(storage::log&);

/**
* \brief delete old segments and trigger compacted segments
Expand Down
4 changes: 2 additions & 2 deletions src/v/storage/mem_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ struct mem_log_impl final : log::impl {
mem_log_impl& operator=(const mem_log_impl&) = delete;
mem_log_impl(mem_log_impl&&) noexcept = default;
mem_log_impl& operator=(mem_log_impl&&) noexcept = delete;
ss::future<> close() final {
ss::future<std::optional<ss::sstring>> close() final {
if (_eviction_monitor) {
_eviction_monitor->promise.set_exception(
std::runtime_error("log closed"));
}
return ss::make_ready_future<>();
return ss::make_ready_future<std::optional<ss ::sstring>>(std::nullopt);
}
ss::future<> remove() final { return ss::make_ready_future<>(); }
ss::future<> flush() final { return ss::make_ready_future<>(); }
Expand Down
14 changes: 6 additions & 8 deletions src/v/storage/segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,12 @@ ss::future<> segment::close() {
* pending background roll operation that requires the write lock.
*/
vlog(stlog.trace, "closing segment: {} ", *this);
return _gate.close().then([this] {
return write_lock().then([this](ss::rwlock::holder h) {
return do_flush()
.then([this] { return do_close(); })
.then([this] { return remove_tombstones(); })
.finally([h = std::move(h)] {});
});
});
co_await _gate.close();
auto locked = co_await write_lock();

co_await do_flush();
co_await do_close();
co_await remove_tombstones();
}

ss::future<> segment::remove_persistent_state() {
Expand Down
Loading

0 comments on commit 630e791

Please sign in to comment.