Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve idempotency latency by introducing rm_stm pipelining #5157

Merged
merged 11 commits into from
Jun 24, 2022
3 changes: 3 additions & 0 deletions src/v/cluster/errc.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ enum class errc : int16_t {
error_collecting_health_report,
leadership_changed,
feature_disabled,
invalid_request,
};
struct errc_category final : public std::error_category {
const char* name() const noexcept final { return "cluster::errc"; }
Expand Down Expand Up @@ -167,6 +168,8 @@ struct errc_category final : public std::error_category {
"to finish";
case errc::feature_disabled:
return "Requested feature is disabled";
case errc::invalid_request:
return "Invalid request";
}
return "cluster::errc::unknown";
}
Expand Down
13 changes: 1 addition & 12 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,18 +181,7 @@ raft::replicate_stages partition::replicate_in_stages(
}

if (_rm_stm) {
ss::promise<> p;
auto f = p.get_future();
auto replicate_finished
= _rm_stm->replicate(bid, std::move(r), opts)
.then(
[p = std::move(p)](result<raft::replicate_result> res) mutable {
p.set_value();
return res;
});
return raft::replicate_stages(
std::move(f), std::move(replicate_finished));

return _rm_stm->replicate_in_stages(bid, std::move(r), opts);
} else {
return _raft->replicate_in_stages(std::move(r), opts);
}
Expand Down
6 changes: 5 additions & 1 deletion src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ class partition {

ss::future<std::error_code>
transfer_leadership(std::optional<model::node_id> target) {
return _raft->do_transfer_leadership(target);
if (_rm_stm) {
return _rm_stm->transfer_leadership(target);
Copy link
Member

@mmaslankaprv mmaslankaprv Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a general question, can the livelock happen when leader is elected because heartbeats were lost ?

Copy link
Contributor Author

@rystsov rystsov Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about this situation?

  1. something happens
  2. since consensus::replication doesn't have a timeout and it doesn't return control flow in time
  3. leadership balancer tries to do its job but it gets block by the unfinished replication

As I understand the leadership balancer is an optimization and if it doesn't work in some cases it isn't a problem. Imaging a node is being isolated from the rest of the cluster, even if the leadership balancer is fully functional it can do anything. Just like in this case eventually a new node becomes a leader and carries on.

} else {
return _raft->do_transfer_leadership(target);
}
}

ss::future<std::error_code>
Expand Down
286 changes: 252 additions & 34 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -760,12 +760,52 @@ ss::future<tx_errc> rm_stm::do_abort_tx(
co_return tx_errc::none;
}

raft::replicate_stages rm_stm::replicate_in_stages(
model::batch_identity bid,
model::record_batch_reader r,
rystsov marked this conversation as resolved.
Show resolved Hide resolved
raft::replicate_options opts) {
auto enqueued = ss::make_lw_shared<available_promise<>>();
auto f = enqueued->get_future();
auto replicate_finished
= do_replicate(bid, std::move(r), opts, enqueued).finally([enqueued] {
// we should avoid situations when replicate_finished is set while
// enqueued isn't because it leads to hanging produce requests and
// the resource leaks. since staged replication is an optimization
// and setting enqueued only after replicate_finished is already
// set doesn't have sematic implications adding this post
// replicate_finished as a safety measure in case enqueued isn't
// set explicitly
if (!enqueued->available()) {
enqueued->set_value();
}
});
return raft::replicate_stages(std::move(f), std::move(replicate_finished));
}

ss::future<result<raft::replicate_result>> rm_stm::replicate(
model::batch_identity bid,
model::record_batch_reader b,
model::record_batch_reader r,
raft::replicate_options opts) {
auto enqueued = ss::make_lw_shared<available_promise<>>();
return do_replicate(bid, std::move(r), opts, enqueued);
}

ss::future<std::error_code>
rm_stm::transfer_leadership(std::optional<model::node_id> target) {
return _state_lock.hold_write_lock().then(
rystsov marked this conversation as resolved.
Show resolved Hide resolved
[this, target](ss::basic_rwlock<>::holder unit) {
return _c->do_transfer_leadership(target).finally(
[u = std::move(unit)] {});
});
}

ss::future<result<raft::replicate_result>> rm_stm::do_replicate(
model::batch_identity bid,
model::record_batch_reader b,
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued) {
return _state_lock.hold_read_lock().then(
[this, bid, opts, b = std::move(b)](
[this, enqueued, bid, opts, b = std::move(b)](
ss::basic_rwlock<>::holder unit) mutable {
if (bid.is_transactional) {
auto pid = bid.pid.get_id();
Expand All @@ -774,24 +814,12 @@ ss::future<result<raft::replicate_result>> rm_stm::replicate(
return replicate_tx(bid, std::move(b));
})
.finally([u = std::move(unit)] {});
} else if (bid.has_idempotent() && bid.first_seq <= bid.last_seq) {
request_id rid{.pid = bid.pid, .seq = bid.first_seq};
return with_request_lock(
rid,
[this, bid, opts, b = std::move(b)]() mutable {
return replicate_seq(bid, std::move(b), opts);
})
} else if (bid.has_idempotent()) {
return replicate_seq(bid, std::move(b), opts, enqueued)
.finally([u = std::move(unit)] {});
}

return sync(_sync_timeout)
.then([this, opts, b = std::move(b)](bool is_synced) mutable {
if (!is_synced) {
return ss::make_ready_future<
result<raft::replicate_result>>(errc::not_leader);
}
return _c->replicate(std::move(b), opts);
})
return replicate_msg(std::move(b), opts, enqueued)
.finally([u = std::move(unit)] {});
});
}
Expand Down Expand Up @@ -921,6 +949,14 @@ rm_stm::known_seq(model::batch_identity bid) const {
return std::nullopt;
}

std::optional<int32_t> rm_stm::tail_seq(model::producer_identity pid) const {
auto pid_seq = _log_state.seq_table.find(pid);
if (pid_seq == _log_state.seq_table.end()) {
return std::nullopt;
}
return pid_seq->second.seq;
}

void rm_stm::set_seq(model::batch_identity bid, model::offset last_offset) {
auto pid_seq = _log_state.seq_table.find(bid.pid);
if (pid_seq != _log_state.seq_table.end()) {
Expand Down Expand Up @@ -1064,40 +1100,222 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) {
ss::future<result<raft::replicate_result>> rm_stm::replicate_seq(
model::batch_identity bid,
model::record_batch_reader br,
raft::replicate_options opts) {
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued) {
if (!co_await sync(_sync_timeout)) {
// it's ok not to set enqueued on early return because
// the safety check in replicate_in_stages sets it automatically
co_return errc::not_leader;
}
auto synced_term = _insync_term;

if (bid.first_seq > bid.last_seq) {
rystsov marked this conversation as resolved.
Show resolved Hide resolved
vlog(
clusterlog.warn,
"first_seq={} of the batch should be less or equal to last_seq={}",
bid.first_seq,
bid.last_seq);
co_return errc::invalid_request;
}

// getting session by client's pid
auto session = _inflight_requests
.lazy_emplace(
bid.pid,
[&](const auto& ctor) {
ctor(
bid.pid, ss::make_lw_shared<inflight_requests>());
})
->second;

// critical section, rm_stm evaluates the request its order and
// whether or not it's duplicated
// ASSUMING THAT ALL INFLIGHT REPLICATION REQUESTS WILL SUCCEED
// if any of the request fail we expect that the leader step down
auto u = co_await session->lock.get_units();

if (!_c->is_leader() || _c->term() != synced_term) {
co_return errc::not_leader;
}

if (session->term < synced_term) {
// we don't care about old inflight requests because the sync
// guarantee (all replicated records of the last term are
// applied) makes sure that the passed inflight records got
// into _log_state->seq_table
session->forget();
session->term = synced_term;
}

if (session->term > synced_term) {
co_return errc::not_leader;
}

// checking if the request (identified by seq) is already resolved
// checking among the pending requests
auto cached_r = session->known_seq(bid.last_seq);
if (cached_r) {
co_return cached_r.value();
}
// checking among the responded requests
rystsov marked this conversation as resolved.
Show resolved Hide resolved
auto cached_offset = known_seq(bid);
if (cached_offset) {
if (cached_offset.value() < model::offset{0}) {
co_return raft::replicate_result{.last_offset = cached_offset.value()};
}

// checking if the request is already being processed
for (auto& inflight : session->cache) {
if (inflight->last_seq == bid.last_seq && inflight->is_processing) {
// found an inflight request, parking the current request
// until the former is resolved
auto promise = ss::make_lw_shared<
available_promise<result<raft::replicate_result>>>();
inflight->parked.push_back(promise);
u.return_all();
co_return co_await promise->get_future();
}
}

// apparantly we process an unseen request
// checking if it isn't out of order with the already procceses
// or inflight requests
if (session->cache.size() == 0) {
// no inflight requests > checking processed
auto tail = tail_seq(bid.pid);
if (tail) {
if (!is_sequence(tail.value(), bid.first_seq)) {
// there is a gap between the request'ss seq number
// and the seq number of the latest processed request
co_return errc::sequence_out_of_order;
}
} else {
if (bid.first_seq != 0) {
// first request in a session should have seq=0
co_return errc::sequence_out_of_order;
}
}
} else {
if (!is_sequence(session->tail_seq, bid.first_seq)) {
// there is a gap between the request'ss seq number
// and the seq number of the latest inflight request
co_return errc::sequence_out_of_order;
}
}

// updating last observed seq, since we hold the mutex
// it's guranteed to be latest
session->tail_seq = bid.last_seq;

auto request = ss::make_lw_shared<inflight_request>();
request->last_seq = bid.last_seq;
request->is_processing = true;
session->cache.push_back(request);

// request comes in the right order, it's ok to replicate
ss::lw_shared_ptr<raft::replicate_stages> ss;
auto has_failed = false;
try {
ss = _c->replicate_in_stages(synced_term, std::move(br), opts);
co_await std::move(ss->request_enqueued);
} catch (...) {
vlog(
clusterlog.warn,
"replication failed with {}",
std::current_exception());
has_failed = true;
}

result<raft::replicate_result> r = errc::success;

if (has_failed) {
if (_c->is_leader() && _c->term() == synced_term) {
// we may not care about the requests waiting on the lock
// as soon as we release the lock the leader or term will
// be changed so the pending fibers won't pass the initial
// checks and be rejected with not_leader
co_await _c->step_down();
}
u.return_all();
r = errc::replication_error;
} else {
try {
u.return_all();
rystsov marked this conversation as resolved.
Show resolved Hide resolved
enqueued->set_value();
r = co_await std::move(ss->replicate_finished);
} catch (...) {
vlog(
clusterlog.warn,
"Status of the original attempt is unknown (still is in-flight "
"or failed). Failing a retried batch with the same pid:{} & "
"seq:{} to avoid duplicates",
bid.pid,
bid.last_seq);
// kafka clients don't automaticly handle fatal errors
// so when they encounter the error they will be forced
// to propagate it to the app layer
co_return errc::generic_tx_error;
"replication failed with {}",
std::current_exception());
r = errc::replication_error;
}
co_return raft::replicate_result{.last_offset = cached_offset.value()};
}

if (!check_seq(bid)) {
co_return errc::sequence_out_of_order;
// we don't need session->lock because we never interleave
// access to is_processing and offset with sync point (await)
request->is_processing = false;
request->r = r;
for (auto& pending : request->parked) {
pending->set_value(r);
}
request->parked.clear();

if (!r) {
// if r was failed at the consensus level (not because has_failed)
// it should guarantee that all follow up replication requests fail
// too but just in case stepping down to minimize the risk
if (_c->is_leader() && _c->term() == synced_term) {
co_await _c->step_down();
}
co_return r;
}

// requests get into session->cache in seq order so when we iterate
// starting from the beginning and until we meet the first still
// in flight request we may be sure that we update seq_table in
// order to preserve monotonicity and the lack of gaps
while (!session->cache.empty() && !session->cache.front()->is_processing) {
auto front = session->cache.front();
if (!front->r) {
// just encountered a failed request it but when a request
// fails it causes a step down so we may not care about
// updating seq_table and stop doing it; the next leader's
// apply will do it for us
break;
}
auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid);
rystsov marked this conversation as resolved.
Show resolved Hide resolved
if (inserted) {
seq_it->second.pid = bid.pid;
seq_it->second.seq = front->last_seq;
seq_it->second.last_offset = front->r.value().last_offset;
} else {
seq_it->second.update(
front->last_seq, front->r.value().last_offset);
}
session->cache.pop_front();
}
auto r = co_await _c->replicate(synced_term, std::move(br), opts);
if (r) {
set_seq(bid, r.value().last_offset);

if (session->cache.empty() && session->lock.ready()) {
_inflight_requests.erase(bid.pid);
}

co_return r;
}

ss::future<result<raft::replicate_result>> rm_stm::replicate_msg(
model::record_batch_reader br,
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued) {
if (!co_await sync(_sync_timeout)) {
co_return errc::not_leader;
}

auto ss = _c->replicate_in_stages(_insync_term, std::move(br), opts);
co_await std::move(ss.request_enqueued);
rystsov marked this conversation as resolved.
Show resolved Hide resolved
enqueued->set_value();
rystsov marked this conversation as resolved.
Show resolved Hide resolved
co_return co_await std::move(ss.replicate_finished);
}

model::offset rm_stm::last_stable_offset() {
auto first_tx_start = model::offset::max();

Expand Down
Loading