From 0791a5fefb37865cf6084670cc4220e00b81acf4 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Fri, 17 Jun 2022 19:28:22 -0700 Subject: [PATCH] rm_stm: add replicate_in_stages wrapper --- src/v/cluster/partition.cc | 13 +---------- src/v/cluster/rm_stm.cc | 45 +++++++++++++++++++++++++++++++++----- src/v/cluster/rm_stm.h | 15 ++++++++++++- 3 files changed, 55 insertions(+), 18 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 7411a7aaace6..8350845f7d95 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -181,18 +181,7 @@ raft::replicate_stages partition::replicate_in_stages( } if (_rm_stm) { - ss::promise<> p; - auto f = p.get_future(); - auto replicate_finished - = _rm_stm->replicate(bid, std::move(r), opts) - .then( - [p = std::move(p)](result res) mutable { - p.set_value(); - return res; - }); - return raft::replicate_stages( - std::move(f), std::move(replicate_finished)); - + return _rm_stm->replicate_in_stages(bid, std::move(r), opts); } else { return _raft->replicate_in_stages(std::move(r), opts); } diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 986a853a876b..5e67ae5504d4 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -760,12 +760,43 @@ ss::future rm_stm::do_abort_tx( co_return tx_errc::none; } +raft::replicate_stages rm_stm::replicate_in_stages( + model::batch_identity bid, + model::record_batch_reader r, + raft::replicate_options opts) { + auto enqueued = ss::make_lw_shared>(); + auto f = enqueued->get_future(); + auto replicate_finished + = do_replicate(bid, std::move(r), opts, enqueued).finally([enqueued] { + // we should avoid situations when replicate_finished is set while + // enqueued isn't because it leads to hanging produce requests and + // the resource leaks. since staged replication is an optimization + // and setting enqueued only after replicate_finished is already + // set doesn't have sematic implications adding this post + // replicate_finished as a safety measure in case enqueued isn't + // set explicitly + if (!enqueued->available()) { + enqueued->set_value(); + } + }); + return raft::replicate_stages(std::move(f), std::move(replicate_finished)); +} + ss::future> rm_stm::replicate( model::batch_identity bid, - model::record_batch_reader b, + model::record_batch_reader r, raft::replicate_options opts) { + auto enqueued = ss::make_lw_shared>(); + return do_replicate(bid, std::move(r), opts, enqueued); +} + +ss::future> rm_stm::do_replicate( + model::batch_identity bid, + model::record_batch_reader b, + raft::replicate_options opts, + ss::lw_shared_ptr> enqueued) { return _state_lock.hold_read_lock().then( - [this, bid, opts, b = std::move(b)]( + [this, enqueued, bid, opts, b = std::move(b)]( ss::basic_rwlock<>::holder unit) mutable { if (bid.is_transactional) { auto pid = bid.pid.get_id(); @@ -778,8 +809,9 @@ ss::future> rm_stm::replicate( request_id rid{.pid = bid.pid, .seq = bid.first_seq}; return with_request_lock( rid, - [this, bid, opts, b = std::move(b)]() mutable { - return replicate_seq(bid, std::move(b), opts); + [this, enqueued, bid, opts, b = std::move(b)]() mutable { + return replicate_seq( + bid, std::move(b), opts, enqueued); }) .finally([u = std::move(unit)] {}); } @@ -1064,8 +1096,11 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { ss::future> rm_stm::replicate_seq( model::batch_identity bid, model::record_batch_reader br, - raft::replicate_options opts) { + raft::replicate_options opts, + [[maybe_unused]] ss::lw_shared_ptr> enqueued) { if (!co_await sync(_sync_timeout)) { + // it's ok not to set enqueued on early return because + // the safety check in replicate_in_stages sets it automatically co_return errc::not_leader; } auto synced_term = _insync_term; diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 5f2d7e4461d2..b69530690f35 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -22,6 +22,7 @@ #include "raft/state_machine.h" #include "raft/types.h" #include "storage/snapshot.h" +#include "utils/available_promise.h" #include "utils/expiring_promise.h" #include "utils/mutex.h" @@ -168,6 +169,11 @@ class rm_stm final : public persisted_stm { ss::future> aborted_transactions(model::offset, model::offset); + raft::replicate_stages replicate_in_stages( + model::batch_identity, + model::record_batch_reader, + raft::replicate_options); + ss::future> replicate( model::batch_identity, model::record_batch_reader, @@ -268,13 +274,20 @@ class rm_stm final : public persisted_stm { void set_seq(model::batch_identity, model::offset); void reset_seq(model::batch_identity); + ss::future> do_replicate( + model::batch_identity, + model::record_batch_reader, + raft::replicate_options, + ss::lw_shared_ptr>); + ss::future> replicate_tx(model::batch_identity, model::record_batch_reader); ss::future> replicate_seq( model::batch_identity, model::record_batch_reader, - raft::replicate_options); + raft::replicate_options, + ss::lw_shared_ptr>); void compact_snapshot();