diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 5e67ae5504d4..8f76d56c9092 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -816,14 +816,7 @@ ss::future> rm_stm::do_replicate( .finally([u = std::move(unit)] {}); } - return sync(_sync_timeout) - .then([this, opts, b = std::move(b)](bool is_synced) mutable { - if (!is_synced) { - return ss::make_ready_future< - result>(errc::not_leader); - } - return _c->replicate(std::move(b), opts); - }) + return replicate_msg(std::move(b), opts, enqueued) .finally([u = std::move(unit)] {}); }); } @@ -1133,6 +1126,20 @@ ss::future> rm_stm::replicate_seq( co_return r; } +ss::future> rm_stm::replicate_msg( + model::record_batch_reader br, + raft::replicate_options opts, + ss::lw_shared_ptr> enqueued) { + if (!co_await sync(_sync_timeout)) { + co_return errc::not_leader; + } + + auto ss = _c->replicate_in_stages(_insync_term, std::move(br), opts); + co_await std::move(ss.request_enqueued); + enqueued->set_value(); + co_return co_await std::move(ss.replicate_finished); +} + model::offset rm_stm::last_stable_offset() { auto first_tx_start = model::offset::max(); diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index b69530690f35..aee69f12ebdf 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -289,6 +289,11 @@ class rm_stm final : public persisted_stm { raft::replicate_options, ss::lw_shared_ptr>); + ss::future> replicate_msg( + model::record_batch_reader, + raft::replicate_options, + ss::lw_shared_ptr>); + void compact_snapshot(); ss::future sync(model::timeout_clock::duration);