Skip to content

Commit

Permalink
rm_stm: add replicate_in_stages wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Jun 23, 2022
1 parent 9a693fc commit 0791a5f
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 18 deletions.
13 changes: 1 addition & 12 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,18 +181,7 @@ raft::replicate_stages partition::replicate_in_stages(
}

if (_rm_stm) {
ss::promise<> p;
auto f = p.get_future();
auto replicate_finished
= _rm_stm->replicate(bid, std::move(r), opts)
.then(
[p = std::move(p)](result<raft::replicate_result> res) mutable {
p.set_value();
return res;
});
return raft::replicate_stages(
std::move(f), std::move(replicate_finished));

return _rm_stm->replicate_in_stages(bid, std::move(r), opts);
} else {
return _raft->replicate_in_stages(std::move(r), opts);
}
Expand Down
45 changes: 40 additions & 5 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -760,12 +760,43 @@ ss::future<tx_errc> rm_stm::do_abort_tx(
co_return tx_errc::none;
}

raft::replicate_stages rm_stm::replicate_in_stages(
model::batch_identity bid,
model::record_batch_reader r,
raft::replicate_options opts) {
auto enqueued = ss::make_lw_shared<available_promise<>>();
auto f = enqueued->get_future();
auto replicate_finished
= do_replicate(bid, std::move(r), opts, enqueued).finally([enqueued] {
// we should avoid situations when replicate_finished is set while
// enqueued isn't because it leads to hanging produce requests and
// the resource leaks. since staged replication is an optimization
// and setting enqueued only after replicate_finished is already
// set doesn't have sematic implications adding this post
// replicate_finished as a safety measure in case enqueued isn't
// set explicitly
if (!enqueued->available()) {
enqueued->set_value();
}
});
return raft::replicate_stages(std::move(f), std::move(replicate_finished));
}

ss::future<result<raft::replicate_result>> rm_stm::replicate(
model::batch_identity bid,
model::record_batch_reader b,
model::record_batch_reader r,
raft::replicate_options opts) {
auto enqueued = ss::make_lw_shared<available_promise<>>();
return do_replicate(bid, std::move(r), opts, enqueued);
}

ss::future<result<raft::replicate_result>> rm_stm::do_replicate(
model::batch_identity bid,
model::record_batch_reader b,
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued) {
return _state_lock.hold_read_lock().then(
[this, bid, opts, b = std::move(b)](
[this, enqueued, bid, opts, b = std::move(b)](
ss::basic_rwlock<>::holder unit) mutable {
if (bid.is_transactional) {
auto pid = bid.pid.get_id();
Expand All @@ -778,8 +809,9 @@ ss::future<result<raft::replicate_result>> rm_stm::replicate(
request_id rid{.pid = bid.pid, .seq = bid.first_seq};
return with_request_lock(
rid,
[this, bid, opts, b = std::move(b)]() mutable {
return replicate_seq(bid, std::move(b), opts);
[this, enqueued, bid, opts, b = std::move(b)]() mutable {
return replicate_seq(
bid, std::move(b), opts, enqueued);
})
.finally([u = std::move(unit)] {});
}
Expand Down Expand Up @@ -1064,8 +1096,11 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) {
ss::future<result<raft::replicate_result>> rm_stm::replicate_seq(
model::batch_identity bid,
model::record_batch_reader br,
raft::replicate_options opts) {
raft::replicate_options opts,
[[maybe_unused]] ss::lw_shared_ptr<available_promise<>> enqueued) {
if (!co_await sync(_sync_timeout)) {
// it's ok not to set enqueued on early return because
// the safety check in replicate_in_stages sets it automatically
co_return errc::not_leader;
}
auto synced_term = _insync_term;
Expand Down
15 changes: 14 additions & 1 deletion src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "raft/state_machine.h"
#include "raft/types.h"
#include "storage/snapshot.h"
#include "utils/available_promise.h"
#include "utils/expiring_promise.h"
#include "utils/mutex.h"

Expand Down Expand Up @@ -168,6 +169,11 @@ class rm_stm final : public persisted_stm {
ss::future<std::vector<rm_stm::tx_range>>
aborted_transactions(model::offset, model::offset);

raft::replicate_stages replicate_in_stages(
model::batch_identity,
model::record_batch_reader,
raft::replicate_options);

ss::future<result<raft::replicate_result>> replicate(
model::batch_identity,
model::record_batch_reader,
Expand Down Expand Up @@ -268,13 +274,20 @@ class rm_stm final : public persisted_stm {
void set_seq(model::batch_identity, model::offset);
void reset_seq(model::batch_identity);

ss::future<result<raft::replicate_result>> do_replicate(
model::batch_identity,
model::record_batch_reader,
raft::replicate_options,
ss::lw_shared_ptr<available_promise<>>);

ss::future<result<raft::replicate_result>>
replicate_tx(model::batch_identity, model::record_batch_reader);

ss::future<result<raft::replicate_result>> replicate_seq(
model::batch_identity,
model::record_batch_reader,
raft::replicate_options);
raft::replicate_options,
ss::lw_shared_ptr<available_promise<>>);

void compact_snapshot();

Expand Down

0 comments on commit 0791a5f

Please sign in to comment.