Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.1.x] storage: maybe_yield when building a batch in kvstore::save_snapshot #11927

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/v/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,9 @@ class counting_batch_consumer : public storage::batch_consumer {
void skip_batch_start(model::record_batch_header, size_t, size_t) override {
}
void consume_records(iobuf&&) override {}
stop_parser consume_batch_end() override { return stop_parser::no; }
ss::future<stop_parser> consume_batch_end() override {
co_return stop_parser::no;
}
void print(std::ostream& o) const override {
fmt::print(
o,
Expand Down
8 changes: 4 additions & 4 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
void consume_records(iobuf&& ib) override { _records = std::move(ib); }

/// Produce batch if within memory limits
stop_parser consume_batch_end() override {
ss::future<stop_parser> consume_batch_end() override {
auto batch = model::record_batch{
_header, std::move(_records), model::record_batch::tag_ctor_ng{}};

Expand All @@ -964,14 +964,14 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
size_t sz = _parent.produce(std::move(batch));

if (_config.over_budget) {
return stop_parser::yes;
co_return stop_parser::yes;
}

if (sz > max_consume_size) {
return stop_parser::yes;
co_return stop_parser::yes;
}

return stop_parser::no;
co_return stop_parser::no;
}

void print(std::ostream& o) const override {
Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_storage/remote_segment_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ void remote_segment_index_builder::skip_batch_start(

void remote_segment_index_builder::consume_records(iobuf&&) {}

remote_segment_index_builder::stop_parser
ss::future<remote_segment_index_builder::stop_parser>
remote_segment_index_builder::consume_batch_end() {
return stop_parser::no;
co_return stop_parser::no;
}

void remote_segment_index_builder::print(std::ostream& o) const {
Expand Down
2 changes: 1 addition & 1 deletion src/v/cloud_storage/remote_segment_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class remote_segment_index_builder : public storage::batch_consumer {
size_t size_on_disk);

virtual void consume_records(iobuf&&);
virtual stop_parser consume_batch_end();
virtual ss::future<stop_parser> consume_batch_end();
virtual void print(std::ostream&) const;

private:
Expand Down
4 changes: 3 additions & 1 deletion src/v/cloud_storage/tests/common_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ class recording_batch_consumer : public storage::batch_consumer {
records.push_back(std::move(ib));
}

stop_parser consume_batch_end() override { return stop_parser::no; }
ss::future<stop_parser> consume_batch_end() override {
co_return stop_parser::no;
}

void print(std::ostream& o) const override {
o << "counting_record_consumer";
Expand Down
3 changes: 2 additions & 1 deletion src/v/model/record.h
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,8 @@ class record_batch {
verify_iterable();
iobuf_const_parser parser(_records);
for (auto i = 0; i < _header.record_count; i++) {
co_await f(model::parse_one_record_copy_from_buffer(parser));
co_await ss::futurize_invoke(
f, model::parse_one_record_copy_from_buffer(parser));
}
if (unlikely(parser.bytes_left())) {
throw std::out_of_range(fmt::format(
Expand Down
28 changes: 20 additions & 8 deletions src/v/storage/kvstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <seastar/core/coroutine.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/thread.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/log.hh>

Expand Down Expand Up @@ -158,6 +159,8 @@ std::optional<iobuf> kvstore::get(key_space ks, bytes_view key) {

// do not re-assign to string_view -> temporary
auto kkey = make_spaced_key(ks, key);
// _db_mut lock is not required here; it's ok to observe a partial apply
// since _next_offset is not needed here.
if (auto it = _db.find(kkey); it != _db.end()) {
return it->second.copy();
}
Expand Down Expand Up @@ -188,7 +191,8 @@ ss::future<> kvstore::put(key_space ks, bytes key, std::optional<iobuf> value) {
});
}

void kvstore::apply_op(bytes key, std::optional<iobuf> value) {
void kvstore::apply_op(
bytes key, std::optional<iobuf> value, ssx::semaphore_units const&) {
auto it = _db.find(key);
bool found = it != _db.end();
if (value) {
Expand Down Expand Up @@ -247,12 +251,14 @@ ss::future<> kvstore::flush_and_apply_ops() {
*/
return _segment->append(std::move(batch))
.then([this](append_result) { return _segment->flush(); })
.then([this, last_offset, ops = std::move(ops)]() mutable {
.then([this]() { return _db_mut.get_units(); })
.then([this, last_offset, ops = std::move(ops)](auto units) mutable {
for (auto& op : ops) {
apply_op(std::move(op.key), std::move(op.value));
apply_op(std::move(op.key), std::move(op.value), units);
op.done.set_value();
}
_next_offset = last_offset + model::offset(1);
units.return_all();
});
}

Expand Down Expand Up @@ -336,11 +342,14 @@ ss::future<> kvstore::save_snapshot() {
// package up the db into a batch
storage::record_batch_builder builder(
model::record_batch_type::kvstore, model::offset(0));
auto units = co_await _db_mut.get_units();
for (auto& entry : _db) {
builder.add_raw_kv(
bytes_to_iobuf(entry.first),
entry.second.share(0, entry.second.size_bytes()));
co_await ss::coroutine::maybe_yield();
}
units.return_all();
auto batch = std::move(builder).build();

// serialize batch: size_prefix + batch
Expand Down Expand Up @@ -465,8 +474,9 @@ ss::future<> kvstore::load_snapshot_from_reader(snapshot_reader& reader) {
batch.header().header_crc));
}

auto lock = co_await _db_mut.get_units();
_db.reserve(batch.header().record_count);
batch.for_each_record([this](model::record r) {
co_await batch.for_each_record_async([this](model::record r) {
auto key = iobuf_to_bytes(r.release_key());
_probe.add_cached_bytes(key.size() + r.value().size_bytes());
auto res = _db.emplace(std::move(key), r.release_value());
Expand Down Expand Up @@ -596,18 +606,20 @@ void kvstore::replay_consumer::consume_records(iobuf&& records) {
_records = std::move(records);
}

batch_consumer::stop_parser kvstore::replay_consumer::consume_batch_end() {
ss::future<batch_consumer::stop_parser>
kvstore::replay_consumer::consume_batch_end() {
/*
* build the batch and then apply all its records to the store
*/
model::record_batch batch(
_header, std::move(_records), model::record_batch::tag_ctor_ng{});

batch.for_each_record([this](model::record r) {
auto lock = co_await _store->_db_mut.get_units();
co_await batch.for_each_record_async([this, &lock](model::record r) {
auto key = iobuf_to_bytes(r.release_key());
auto value = reflection::from_iobuf<std::optional<iobuf>>(
r.release_value());
_store->apply_op(std::move(key), std::move(value));
_store->apply_op(std::move(key), std::move(value), lock);
_store->_next_offset += model::offset(1);
});

Expand All @@ -617,7 +629,7 @@ batch_consumer::stop_parser kvstore::replay_consumer::consume_batch_end() {
"Unexpected next offset {} expected {}",
_store->_next_offset,
next_batch_offset);
return stop_parser::no;
co_return stop_parser::no;
}

void kvstore::replay_consumer::print(std::ostream& os) const {
Expand Down
7 changes: 5 additions & 2 deletions src/v/storage/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,14 @@ class kvstore {
ss::timer<> _timer;
ssx::semaphore _sem{0, "s/kvstore"};
ss::lw_shared_ptr<segment> _segment;
// Protect _db and _next_offset across asynchronous mutations.
mutex _db_mut;
model::offset _next_offset;
absl::node_hash_map<bytes, iobuf, bytes_type_hash, bytes_type_eq> _db;

ss::future<> put(key_space ks, bytes key, std::optional<iobuf> value);
void apply_op(bytes key, std::optional<iobuf> value);
void apply_op(
bytes key, std::optional<iobuf> value, ssx::semaphore_units const&);
ss::future<> flush_and_apply_ops();
ss::future<> roll();
ss::future<> save_snapshot();
Expand Down Expand Up @@ -188,7 +191,7 @@ class kvstore {
void skip_batch_start(
model::record_batch_header header, size_t, size_t) override;
void consume_records(iobuf&&) override;
stop_parser consume_batch_end() override;
ss::future<stop_parser> consume_batch_end() override;
void print(std::ostream&) const override;

private:
Expand Down
10 changes: 5 additions & 5 deletions src/v/storage/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void skipping_consumer::consume_records(iobuf&& records) {
_records = std::move(records);
}

batch_consumer::stop_parser skipping_consumer::consume_batch_end() {
ss::future<batch_consumer::stop_parser> skipping_consumer::consume_batch_end() {
// Note: This is what keeps the train moving. the `_reader.*` transitively
// updates the next batch to consume
_reader.add_one(model::record_batch(
Expand All @@ -102,22 +102,22 @@ batch_consumer::stop_parser skipping_consumer::consume_batch_end() {
if (
_header.last_offset() >= _reader._seg.offsets().stable_offset
|| _header.last_offset() >= _reader._config.max_offset) {
return stop_parser::yes;
co_return stop_parser::yes;
}
/*
* if the very next batch is known to be cached, then stop parsing. the next
* read will with high probability experience a cache hit.
*/
if (_next_cached_batch == (_header.last_offset() + model::offset(1))) {
return stop_parser::yes;
co_return stop_parser::yes;
}
if (
_reader._config.bytes_consumed >= _reader._config.max_bytes
|| model::timeout_clock::now() >= _timeout) {
return stop_parser::yes;
co_return stop_parser::yes;
}
_header = {};
return stop_parser(_reader._state.is_full());
co_return stop_parser(_reader._state.is_full());
}

void skipping_consumer::print(std::ostream& os) const {
Expand Down
2 changes: 1 addition & 1 deletion src/v/storage/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class skipping_consumer final : public batch_consumer {
size_t physical_base_offset,
size_t bytes_on_disk) override;
void consume_records(iobuf&&) override;
stop_parser consume_batch_end() override;
ss::future<stop_parser> consume_batch_end() override;
void print(std::ostream&) const override;

private:
Expand Down
6 changes: 3 additions & 3 deletions src/v/storage/log_replayer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class checksumming_consumer final : public batch_consumer {
crc_extend_iobuf(_crc, records);
}

stop_parser consume_batch_end() override {
ss::future<stop_parser> consume_batch_end() override {
if (is_valid_batch_crc()) {
_cfg.last_offset = _header.last_offset();
_cfg.truncate_file_pos = _file_pos_to_end_of_batch;
Expand All @@ -68,9 +68,9 @@ class checksumming_consumer final : public batch_consumer {
- _header.size_bytes;
_seg->index().maybe_track(_header, physical_base_offset);
_header = {};
return stop_parser::no;
co_return stop_parser::no;
}
return stop_parser::yes;
co_return stop_parser::yes;
}

bool is_valid_batch_crc() const {
Expand Down
8 changes: 5 additions & 3 deletions src/v/storage/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,14 @@ ss::future<result<stop_parser>> continuous_batch_parser::consume_records() {
auto sz = _header->size_bytes - model::packed_record_batch_header_size;
return verify_read_iobuf(
get_stream(), sz, "parser::consume_records", _recovery)
.then([this](result<iobuf> record) -> result<stop_parser> {
.then([this](result<iobuf> record) -> ss::future<result<stop_parser>> {
if (!record) {
return record.error();
return ss::make_ready_future<result<stop_parser>>(record.error());
}
_consumer->consume_records(std::move(record.value()));
return result<stop_parser>(_consumer->consume_batch_end());
return _consumer->consume_batch_end().then([](stop_parser sp) {
return ss::make_ready_future<result<stop_parser>>(sp);
});
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/v/storage/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class batch_consumer {
= 0;

virtual void consume_records(iobuf&&) = 0;
virtual stop_parser consume_batch_end() = 0;
virtual ss::future<stop_parser> consume_batch_end() = 0;

virtual void print(std::ostream&) const = 0;

Expand Down