Skip to content

Commit

Permalink
rm_stm: implement replicate pipelining
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Jun 23, 2022
1 parent 0791a5f commit 9b5491b
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
23 changes: 15 additions & 8 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -816,14 +816,7 @@ ss::future<result<raft::replicate_result>> rm_stm::do_replicate(
.finally([u = std::move(unit)] {});
}

return sync(_sync_timeout)
.then([this, opts, b = std::move(b)](bool is_synced) mutable {
if (!is_synced) {
return ss::make_ready_future<
result<raft::replicate_result>>(errc::not_leader);
}
return _c->replicate(std::move(b), opts);
})
return replicate_msg(std::move(b), opts, enqueued)
.finally([u = std::move(unit)] {});
});
}
Expand Down Expand Up @@ -1133,6 +1126,20 @@ ss::future<result<raft::replicate_result>> rm_stm::replicate_seq(
co_return r;
}

ss::future<result<raft::replicate_result>> rm_stm::replicate_msg(
model::record_batch_reader br,
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued) {
if (!co_await sync(_sync_timeout)) {
co_return errc::not_leader;
}

auto ss = _c->replicate_in_stages(_insync_term, std::move(br), opts);
co_await std::move(ss.request_enqueued);
enqueued->set_value();
co_return co_await std::move(ss.replicate_finished);
}

model::offset rm_stm::last_stable_offset() {
auto first_tx_start = model::offset::max();

Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ class rm_stm final : public persisted_stm {
raft::replicate_options,
ss::lw_shared_ptr<available_promise<>>);

ss::future<result<raft::replicate_result>> replicate_msg(
model::record_batch_reader,
raft::replicate_options,
ss::lw_shared_ptr<available_promise<>>);

void compact_snapshot();

ss::future<bool> sync(model::timeout_clock::duration);
Expand Down

0 comments on commit 9b5491b

Please sign in to comment.