From c413fe81972f31bbea5da32ff7c9999c1db3a9e4 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Mon, 3 Jul 2023 11:10:41 +0100 Subject: [PATCH 1/3] model/record: `futurize_invoke` `f` in `for_each_record_async` The function passed to `for_each_record_async` might not return a future. This is the case when for_each_record_async` is used primary for the purpose of yeilding, rather than to accept an async function. `futurize_invoke` the passed function so that the call site doesn't have to be modified. Signed-off-by: Ben Pope (cherry picked from commit e24eb396cea4ca8fec6d43f50569b2bfe34ee006) --- src/v/model/record.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/v/model/record.h b/src/v/model/record.h index 8279d238ae79..15551a1fedcc 100644 --- a/src/v/model/record.h +++ b/src/v/model/record.h @@ -695,7 +695,8 @@ class record_batch { verify_iterable(); iobuf_const_parser parser(_records); for (auto i = 0; i < _header.record_count; i++) { - co_await f(model::parse_one_record_copy_from_buffer(parser)); + co_await ss::futurize_invoke( + f, model::parse_one_record_copy_from_buffer(parser)); } if (unlikely(parser.bytes_left())) { throw std::out_of_range(fmt::format( From 91eb802aefe9be8bcde7deb93d0a527db3e99fe8 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Fri, 30 Jun 2023 16:03:28 +0100 Subject: [PATCH 2/3] storage: make `batch_consumer::consume_batch_end` async. This is necessary for the next commit. Signed-off-by: Ben Pope (cherry picked from commit 68d951b7ce0ba3520432c92a1a2adfefc5df9adf) --- src/v/archival/tests/ntp_archiver_test.cc | 4 +++- src/v/cloud_storage/remote_segment.cc | 8 ++++---- src/v/cloud_storage/remote_segment_index.cc | 4 ++-- src/v/cloud_storage/remote_segment_index.h | 2 +- src/v/cloud_storage/tests/common_def.h | 4 +++- src/v/storage/kvstore.cc | 5 +++-- src/v/storage/kvstore.h | 2 +- src/v/storage/log_reader.cc | 10 +++++----- src/v/storage/log_reader.h | 2 +- src/v/storage/log_replayer.cc | 6 +++--- src/v/storage/parser.cc | 8 +++++--- src/v/storage/parser.h | 2 +- 12 files changed, 32 insertions(+), 25 deletions(-) diff --git a/src/v/archival/tests/ntp_archiver_test.cc b/src/v/archival/tests/ntp_archiver_test.cc index bc2e24ef0e62..ee00e1642e05 100644 --- a/src/v/archival/tests/ntp_archiver_test.cc +++ b/src/v/archival/tests/ntp_archiver_test.cc @@ -798,7 +798,9 @@ class counting_batch_consumer : public storage::batch_consumer { void skip_batch_start(model::record_batch_header, size_t, size_t) override { } void consume_records(iobuf&&) override {} - stop_parser consume_batch_end() override { return stop_parser::no; } + ss::future consume_batch_end() override { + co_return stop_parser::no; + } void print(std::ostream& o) const override { fmt::print( o, diff --git a/src/v/cloud_storage/remote_segment.cc b/src/v/cloud_storage/remote_segment.cc index 275cbc61200b..bd5844d56845 100644 --- a/src/v/cloud_storage/remote_segment.cc +++ b/src/v/cloud_storage/remote_segment.cc @@ -948,7 +948,7 @@ class remote_segment_batch_consumer : public storage::batch_consumer { void consume_records(iobuf&& ib) override { _records = std::move(ib); } /// Produce batch if within memory limits - stop_parser consume_batch_end() override { + ss::future consume_batch_end() override { auto batch = model::record_batch{ _header, std::move(_records), model::record_batch::tag_ctor_ng{}}; @@ -964,14 +964,14 @@ class remote_segment_batch_consumer : public storage::batch_consumer { size_t sz = _parent.produce(std::move(batch)); if (_config.over_budget) { - return stop_parser::yes; + co_return stop_parser::yes; } if (sz > max_consume_size) { - return stop_parser::yes; + co_return stop_parser::yes; } - return stop_parser::no; + co_return stop_parser::no; } void print(std::ostream& o) const override { diff --git a/src/v/cloud_storage/remote_segment_index.cc b/src/v/cloud_storage/remote_segment_index.cc index 78cdb489df72..a7b935c4a567 100644 --- a/src/v/cloud_storage/remote_segment_index.cc +++ b/src/v/cloud_storage/remote_segment_index.cc @@ -303,9 +303,9 @@ void remote_segment_index_builder::skip_batch_start( void remote_segment_index_builder::consume_records(iobuf&&) {} -remote_segment_index_builder::stop_parser +ss::future remote_segment_index_builder::consume_batch_end() { - return stop_parser::no; + co_return stop_parser::no; } void remote_segment_index_builder::print(std::ostream& o) const { diff --git a/src/v/cloud_storage/remote_segment_index.h b/src/v/cloud_storage/remote_segment_index.h index 97d68c977861..aa77f59f2409 100644 --- a/src/v/cloud_storage/remote_segment_index.h +++ b/src/v/cloud_storage/remote_segment_index.h @@ -172,7 +172,7 @@ class remote_segment_index_builder : public storage::batch_consumer { size_t size_on_disk); virtual void consume_records(iobuf&&); - virtual stop_parser consume_batch_end(); + virtual ss::future consume_batch_end(); virtual void print(std::ostream&) const; private: diff --git a/src/v/cloud_storage/tests/common_def.h b/src/v/cloud_storage/tests/common_def.h index 58c233a9a77e..7d6ab67fd6ac 100644 --- a/src/v/cloud_storage/tests/common_def.h +++ b/src/v/cloud_storage/tests/common_def.h @@ -126,7 +126,9 @@ class recording_batch_consumer : public storage::batch_consumer { records.push_back(std::move(ib)); } - stop_parser consume_batch_end() override { return stop_parser::no; } + ss::future consume_batch_end() override { + co_return stop_parser::no; + } void print(std::ostream& o) const override { o << "counting_record_consumer"; diff --git a/src/v/storage/kvstore.cc b/src/v/storage/kvstore.cc index 1d8ac6a4dc53..ca2f30d0031f 100644 --- a/src/v/storage/kvstore.cc +++ b/src/v/storage/kvstore.cc @@ -596,7 +596,8 @@ void kvstore::replay_consumer::consume_records(iobuf&& records) { _records = std::move(records); } -batch_consumer::stop_parser kvstore::replay_consumer::consume_batch_end() { +ss::future +kvstore::replay_consumer::consume_batch_end() { /* * build the batch and then apply all its records to the store */ @@ -617,7 +618,7 @@ batch_consumer::stop_parser kvstore::replay_consumer::consume_batch_end() { "Unexpected next offset {} expected {}", _store->_next_offset, next_batch_offset); - return stop_parser::no; + co_return stop_parser::no; } void kvstore::replay_consumer::print(std::ostream& os) const { diff --git a/src/v/storage/kvstore.h b/src/v/storage/kvstore.h index f6b723bf5016..ab7a37aa3987 100644 --- a/src/v/storage/kvstore.h +++ b/src/v/storage/kvstore.h @@ -188,7 +188,7 @@ class kvstore { void skip_batch_start( model::record_batch_header header, size_t, size_t) override; void consume_records(iobuf&&) override; - stop_parser consume_batch_end() override; + ss::future consume_batch_end() override; void print(std::ostream&) const override; private: diff --git a/src/v/storage/log_reader.cc b/src/v/storage/log_reader.cc index fc23fa81a623..77b2200a6d4e 100644 --- a/src/v/storage/log_reader.cc +++ b/src/v/storage/log_reader.cc @@ -93,7 +93,7 @@ void skipping_consumer::consume_records(iobuf&& records) { _records = std::move(records); } -batch_consumer::stop_parser skipping_consumer::consume_batch_end() { +ss::future skipping_consumer::consume_batch_end() { // Note: This is what keeps the train moving. the `_reader.*` transitively // updates the next batch to consume _reader.add_one(model::record_batch( @@ -102,22 +102,22 @@ batch_consumer::stop_parser skipping_consumer::consume_batch_end() { if ( _header.last_offset() >= _reader._seg.offsets().stable_offset || _header.last_offset() >= _reader._config.max_offset) { - return stop_parser::yes; + co_return stop_parser::yes; } /* * if the very next batch is known to be cached, then stop parsing. the next * read will with high probability experience a cache hit. */ if (_next_cached_batch == (_header.last_offset() + model::offset(1))) { - return stop_parser::yes; + co_return stop_parser::yes; } if ( _reader._config.bytes_consumed >= _reader._config.max_bytes || model::timeout_clock::now() >= _timeout) { - return stop_parser::yes; + co_return stop_parser::yes; } _header = {}; - return stop_parser(_reader._state.is_full()); + co_return stop_parser(_reader._state.is_full()); } void skipping_consumer::print(std::ostream& os) const { diff --git a/src/v/storage/log_reader.h b/src/v/storage/log_reader.h index 7ffdac416644..b46512b89e45 100644 --- a/src/v/storage/log_reader.h +++ b/src/v/storage/log_reader.h @@ -74,7 +74,7 @@ class skipping_consumer final : public batch_consumer { size_t physical_base_offset, size_t bytes_on_disk) override; void consume_records(iobuf&&) override; - stop_parser consume_batch_end() override; + ss::future consume_batch_end() override; void print(std::ostream&) const override; private: diff --git a/src/v/storage/log_replayer.cc b/src/v/storage/log_replayer.cc index f56d7d2d1aa8..ae40cdfef978 100644 --- a/src/v/storage/log_replayer.cc +++ b/src/v/storage/log_replayer.cc @@ -58,7 +58,7 @@ class checksumming_consumer final : public batch_consumer { crc_extend_iobuf(_crc, records); } - stop_parser consume_batch_end() override { + ss::future consume_batch_end() override { if (is_valid_batch_crc()) { _cfg.last_offset = _header.last_offset(); _cfg.truncate_file_pos = _file_pos_to_end_of_batch; @@ -68,9 +68,9 @@ class checksumming_consumer final : public batch_consumer { - _header.size_bytes; _seg->index().maybe_track(_header, physical_base_offset); _header = {}; - return stop_parser::no; + co_return stop_parser::no; } - return stop_parser::yes; + co_return stop_parser::yes; } bool is_valid_batch_crc() const { diff --git a/src/v/storage/parser.cc b/src/v/storage/parser.cc index d35116106dd6..91a47fb0d80c 100644 --- a/src/v/storage/parser.cc +++ b/src/v/storage/parser.cc @@ -247,12 +247,14 @@ ss::future> continuous_batch_parser::consume_records() { auto sz = _header->size_bytes - model::packed_record_batch_header_size; return verify_read_iobuf( get_stream(), sz, "parser::consume_records", _recovery) - .then([this](result record) -> result { + .then([this](result record) -> ss::future> { if (!record) { - return record.error(); + return ss::make_ready_future>(record.error()); } _consumer->consume_records(std::move(record.value())); - return result(_consumer->consume_batch_end()); + return _consumer->consume_batch_end().then([](stop_parser sp) { + return ss::make_ready_future>(sp); + }); }); } diff --git a/src/v/storage/parser.h b/src/v/storage/parser.h index 0fb431b89fc4..308186ef8a38 100644 --- a/src/v/storage/parser.h +++ b/src/v/storage/parser.h @@ -79,7 +79,7 @@ class batch_consumer { = 0; virtual void consume_records(iobuf&&) = 0; - virtual stop_parser consume_batch_end() = 0; + virtual ss::future consume_batch_end() = 0; virtual void print(std::ostream&) const = 0; From 2373e54f08b7d45a059bd5bab7c9937a5cc0fe12 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Fri, 30 Jun 2023 14:45:48 +0100 Subject: [PATCH 3/3] storage: maybe_yield in kvstore::save_snapshot & load_snapshot 54c0a3f4117091369ec634c5f97a8f926a632f45 made the serialization of the batch async, but it's possible for the batch builder to also cause reactor stalls, as seen on a live system running v23.1.12 (of about 150ms) Add a mutex to protect _db and _next_offset * `maybe_yield` in the builder loop to avoid reactor stalls whilst saving * Use `for_each_record_async` to avoid reactor stalls whilst loading * Use `for_each_record_async` to avoid reactor stalls whilst replaying Signed-off-by: Ben Pope (cherry picked from commit 310fb33d7675c38ea77f91e067e9738385f91d09) --- src/v/storage/kvstore.cc | 23 +++++++++++++++++------ src/v/storage/kvstore.h | 5 ++++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/v/storage/kvstore.cc b/src/v/storage/kvstore.cc index ca2f30d0031f..aecb8f5dc0c8 100644 --- a/src/v/storage/kvstore.cc +++ b/src/v/storage/kvstore.cc @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -158,6 +159,8 @@ std::optional kvstore::get(key_space ks, bytes_view key) { // do not re-assign to string_view -> temporary auto kkey = make_spaced_key(ks, key); + // _db_mut lock is not required here; it's ok to observe a partial apply + // since _next_offset is not needed here. if (auto it = _db.find(kkey); it != _db.end()) { return it->second.copy(); } @@ -188,7 +191,8 @@ ss::future<> kvstore::put(key_space ks, bytes key, std::optional value) { }); } -void kvstore::apply_op(bytes key, std::optional value) { +void kvstore::apply_op( + bytes key, std::optional value, ssx::semaphore_units const&) { auto it = _db.find(key); bool found = it != _db.end(); if (value) { @@ -247,12 +251,14 @@ ss::future<> kvstore::flush_and_apply_ops() { */ return _segment->append(std::move(batch)) .then([this](append_result) { return _segment->flush(); }) - .then([this, last_offset, ops = std::move(ops)]() mutable { + .then([this]() { return _db_mut.get_units(); }) + .then([this, last_offset, ops = std::move(ops)](auto units) mutable { for (auto& op : ops) { - apply_op(std::move(op.key), std::move(op.value)); + apply_op(std::move(op.key), std::move(op.value), units); op.done.set_value(); } _next_offset = last_offset + model::offset(1); + units.return_all(); }); } @@ -336,11 +342,14 @@ ss::future<> kvstore::save_snapshot() { // package up the db into a batch storage::record_batch_builder builder( model::record_batch_type::kvstore, model::offset(0)); + auto units = co_await _db_mut.get_units(); for (auto& entry : _db) { builder.add_raw_kv( bytes_to_iobuf(entry.first), entry.second.share(0, entry.second.size_bytes())); + co_await ss::coroutine::maybe_yield(); } + units.return_all(); auto batch = std::move(builder).build(); // serialize batch: size_prefix + batch @@ -465,8 +474,9 @@ ss::future<> kvstore::load_snapshot_from_reader(snapshot_reader& reader) { batch.header().header_crc)); } + auto lock = co_await _db_mut.get_units(); _db.reserve(batch.header().record_count); - batch.for_each_record([this](model::record r) { + co_await batch.for_each_record_async([this](model::record r) { auto key = iobuf_to_bytes(r.release_key()); _probe.add_cached_bytes(key.size() + r.value().size_bytes()); auto res = _db.emplace(std::move(key), r.release_value()); @@ -604,11 +614,12 @@ kvstore::replay_consumer::consume_batch_end() { model::record_batch batch( _header, std::move(_records), model::record_batch::tag_ctor_ng{}); - batch.for_each_record([this](model::record r) { + auto lock = co_await _store->_db_mut.get_units(); + co_await batch.for_each_record_async([this, &lock](model::record r) { auto key = iobuf_to_bytes(r.release_key()); auto value = reflection::from_iobuf>( r.release_value()); - _store->apply_op(std::move(key), std::move(value)); + _store->apply_op(std::move(key), std::move(value), lock); _store->_next_offset += model::offset(1); }); diff --git a/src/v/storage/kvstore.h b/src/v/storage/kvstore.h index ab7a37aa3987..dd45c6555291 100644 --- a/src/v/storage/kvstore.h +++ b/src/v/storage/kvstore.h @@ -150,11 +150,14 @@ class kvstore { ss::timer<> _timer; ssx::semaphore _sem{0, "s/kvstore"}; ss::lw_shared_ptr _segment; + // Protect _db and _next_offset across asynchronous mutations. + mutex _db_mut; model::offset _next_offset; absl::node_hash_map _db; ss::future<> put(key_space ks, bytes key, std::optional value); - void apply_op(bytes key, std::optional value); + void apply_op( + bytes key, std::optional value, ssx::semaphore_units const&); ss::future<> flush_and_apply_ops(); ss::future<> roll(); ss::future<> save_snapshot();