Skip to content

Commit

Permalink
Merge pull request #5157 from rystsov/idempotency-latency
Browse files Browse the repository at this point in the history
Improve idempotency latency by introducing rm_stm pipelining
  • Loading branch information
rystsov committed Jun 24, 2022
2 parents 0d9e424 + b4a5fb2 commit 884086e
Show file tree
Hide file tree
Showing 17 changed files with 475 additions and 137 deletions.
3 changes: 3 additions & 0 deletions src/v/cluster/errc.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ enum class errc : int16_t {
error_collecting_health_report,
leadership_changed,
feature_disabled,
invalid_request,
};
struct errc_category final : public std::error_category {
const char* name() const noexcept final { return "cluster::errc"; }
Expand Down Expand Up @@ -167,6 +168,8 @@ struct errc_category final : public std::error_category {
"to finish";
case errc::feature_disabled:
return "Requested feature is disabled";
case errc::invalid_request:
return "Invalid request";
}
return "cluster::errc::unknown";
}
Expand Down
13 changes: 1 addition & 12 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,18 +181,7 @@ raft::replicate_stages partition::replicate_in_stages(
}

if (_rm_stm) {
ss::promise<> p;
auto f = p.get_future();
auto replicate_finished
= _rm_stm->replicate(bid, std::move(r), opts)
.then(
[p = std::move(p)](result<raft::replicate_result> res) mutable {
p.set_value();
return res;
});
return raft::replicate_stages(
std::move(f), std::move(replicate_finished));

return _rm_stm->replicate_in_stages(bid, std::move(r), opts);
} else {
return _raft->replicate_in_stages(std::move(r), opts);
}
Expand Down
6 changes: 5 additions & 1 deletion src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ class partition {

ss::future<std::error_code>
transfer_leadership(std::optional<model::node_id> target) {
return _raft->do_transfer_leadership(target);
if (_rm_stm) {
return _rm_stm->transfer_leadership(target);
} else {
return _raft->do_transfer_leadership(target);
}
}

ss::future<std::error_code>
Expand Down
286 changes: 252 additions & 34 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -760,12 +760,52 @@ ss::future<tx_errc> rm_stm::do_abort_tx(
co_return tx_errc::none;
}

raft::replicate_stages rm_stm::replicate_in_stages(
model::batch_identity bid,
model::record_batch_reader r,
raft::replicate_options opts) {
auto enqueued = ss::make_lw_shared<available_promise<>>();
auto f = enqueued->get_future();
auto replicate_finished
= do_replicate(bid, std::move(r), opts, enqueued).finally([enqueued] {
// we should avoid situations when replicate_finished is set while
// enqueued isn't because it leads to hanging produce requests and
// the resource leaks. since staged replication is an optimization
// and setting enqueued only after replicate_finished is already
// set doesn't have sematic implications adding this post
// replicate_finished as a safety measure in case enqueued isn't
// set explicitly
if (!enqueued->available()) {
enqueued->set_value();
}
});
return raft::replicate_stages(std::move(f), std::move(replicate_finished));
}

ss::future<result<raft::replicate_result>> rm_stm::replicate(
model::batch_identity bid,
model::record_batch_reader b,
model::record_batch_reader r,
raft::replicate_options opts) {
auto enqueued = ss::make_lw_shared<available_promise<>>();
return do_replicate(bid, std::move(r), opts, enqueued);
}

ss::future<std::error_code>
rm_stm::transfer_leadership(std::optional<model::node_id> target) {
return _state_lock.hold_write_lock().then(
[this, target](ss::basic_rwlock<>::holder unit) {
return _c->do_transfer_leadership(target).finally(
[u = std::move(unit)] {});
});
}

ss::future<result<raft::replicate_result>> rm_stm::do_replicate(
model::batch_identity bid,
model::record_batch_reader b,
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued) {
return _state_lock.hold_read_lock().then(
[this, bid, opts, b = std::move(b)](
[this, enqueued, bid, opts, b = std::move(b)](
ss::basic_rwlock<>::holder unit) mutable {
if (bid.is_transactional) {
auto pid = bid.pid.get_id();
Expand All @@ -774,24 +814,12 @@ ss::future<result<raft::replicate_result>> rm_stm::replicate(
return replicate_tx(bid, std::move(b));
})
.finally([u = std::move(unit)] {});
} else if (bid.has_idempotent() && bid.first_seq <= bid.last_seq) {
request_id rid{.pid = bid.pid, .seq = bid.first_seq};
return with_request_lock(
rid,
[this, bid, opts, b = std::move(b)]() mutable {
return replicate_seq(bid, std::move(b), opts);
})
} else if (bid.has_idempotent()) {
return replicate_seq(bid, std::move(b), opts, enqueued)
.finally([u = std::move(unit)] {});
}

return sync(_sync_timeout)
.then([this, opts, b = std::move(b)](bool is_synced) mutable {
if (!is_synced) {
return ss::make_ready_future<
result<raft::replicate_result>>(errc::not_leader);
}
return _c->replicate(std::move(b), opts);
})
return replicate_msg(std::move(b), opts, enqueued)
.finally([u = std::move(unit)] {});
});
}
Expand Down Expand Up @@ -921,6 +949,14 @@ rm_stm::known_seq(model::batch_identity bid) const {
return std::nullopt;
}

std::optional<int32_t> rm_stm::tail_seq(model::producer_identity pid) const {
auto pid_seq = _log_state.seq_table.find(pid);
if (pid_seq == _log_state.seq_table.end()) {
return std::nullopt;
}
return pid_seq->second.seq;
}

void rm_stm::set_seq(model::batch_identity bid, model::offset last_offset) {
auto pid_seq = _log_state.seq_table.find(bid.pid);
if (pid_seq != _log_state.seq_table.end()) {
Expand Down Expand Up @@ -1064,40 +1100,222 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) {
ss::future<result<raft::replicate_result>> rm_stm::replicate_seq(
model::batch_identity bid,
model::record_batch_reader br,
raft::replicate_options opts) {
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued) {
if (!co_await sync(_sync_timeout)) {
// it's ok not to set enqueued on early return because
// the safety check in replicate_in_stages sets it automatically
co_return errc::not_leader;
}
auto synced_term = _insync_term;

if (bid.first_seq > bid.last_seq) {
vlog(
clusterlog.warn,
"first_seq={} of the batch should be less or equal to last_seq={}",
bid.first_seq,
bid.last_seq);
co_return errc::invalid_request;
}

// getting session by client's pid
auto session = _inflight_requests
.lazy_emplace(
bid.pid,
[&](const auto& ctor) {
ctor(
bid.pid, ss::make_lw_shared<inflight_requests>());
})
->second;

// critical section, rm_stm evaluates the request its order and
// whether or not it's duplicated
// ASSUMING THAT ALL INFLIGHT REPLICATION REQUESTS WILL SUCCEED
// if any of the request fail we expect that the leader step down
auto u = co_await session->lock.get_units();

if (!_c->is_leader() || _c->term() != synced_term) {
co_return errc::not_leader;
}

if (session->term < synced_term) {
// we don't care about old inflight requests because the sync
// guarantee (all replicated records of the last term are
// applied) makes sure that the passed inflight records got
// into _log_state->seq_table
session->forget();
session->term = synced_term;
}

if (session->term > synced_term) {
co_return errc::not_leader;
}

// checking if the request (identified by seq) is already resolved
// checking among the pending requests
auto cached_r = session->known_seq(bid.last_seq);
if (cached_r) {
co_return cached_r.value();
}
// checking among the responded requests
auto cached_offset = known_seq(bid);
if (cached_offset) {
if (cached_offset.value() < model::offset{0}) {
co_return raft::replicate_result{.last_offset = cached_offset.value()};
}

// checking if the request is already being processed
for (auto& inflight : session->cache) {
if (inflight->last_seq == bid.last_seq && inflight->is_processing) {
// found an inflight request, parking the current request
// until the former is resolved
auto promise = ss::make_lw_shared<
available_promise<result<raft::replicate_result>>>();
inflight->parked.push_back(promise);
u.return_all();
co_return co_await promise->get_future();
}
}

// apparantly we process an unseen request
// checking if it isn't out of order with the already procceses
// or inflight requests
if (session->cache.size() == 0) {
// no inflight requests > checking processed
auto tail = tail_seq(bid.pid);
if (tail) {
if (!is_sequence(tail.value(), bid.first_seq)) {
// there is a gap between the request'ss seq number
// and the seq number of the latest processed request
co_return errc::sequence_out_of_order;
}
} else {
if (bid.first_seq != 0) {
// first request in a session should have seq=0
co_return errc::sequence_out_of_order;
}
}
} else {
if (!is_sequence(session->tail_seq, bid.first_seq)) {
// there is a gap between the request'ss seq number
// and the seq number of the latest inflight request
co_return errc::sequence_out_of_order;
}
}

// updating last observed seq, since we hold the mutex
// it's guranteed to be latest
session->tail_seq = bid.last_seq;

auto request = ss::make_lw_shared<inflight_request>();
request->last_seq = bid.last_seq;
request->is_processing = true;
session->cache.push_back(request);

// request comes in the right order, it's ok to replicate
ss::lw_shared_ptr<raft::replicate_stages> ss;
auto has_failed = false;
try {
ss = _c->replicate_in_stages(synced_term, std::move(br), opts);
co_await std::move(ss->request_enqueued);
} catch (...) {
vlog(
clusterlog.warn,
"replication failed with {}",
std::current_exception());
has_failed = true;
}

result<raft::replicate_result> r = errc::success;

if (has_failed) {
if (_c->is_leader() && _c->term() == synced_term) {
// we may not care about the requests waiting on the lock
// as soon as we release the lock the leader or term will
// be changed so the pending fibers won't pass the initial
// checks and be rejected with not_leader
co_await _c->step_down();
}
u.return_all();
r = errc::replication_error;
} else {
try {
u.return_all();
enqueued->set_value();
r = co_await std::move(ss->replicate_finished);
} catch (...) {
vlog(
clusterlog.warn,
"Status of the original attempt is unknown (still is in-flight "
"or failed). Failing a retried batch with the same pid:{} & "
"seq:{} to avoid duplicates",
bid.pid,
bid.last_seq);
// kafka clients don't automaticly handle fatal errors
// so when they encounter the error they will be forced
// to propagate it to the app layer
co_return errc::generic_tx_error;
"replication failed with {}",
std::current_exception());
r = errc::replication_error;
}
co_return raft::replicate_result{.last_offset = cached_offset.value()};
}

if (!check_seq(bid)) {
co_return errc::sequence_out_of_order;
// we don't need session->lock because we never interleave
// access to is_processing and offset with sync point (await)
request->is_processing = false;
request->r = r;
for (auto& pending : request->parked) {
pending->set_value(r);
}
request->parked.clear();

if (!r) {
// if r was failed at the consensus level (not because has_failed)
// it should guarantee that all follow up replication requests fail
// too but just in case stepping down to minimize the risk
if (_c->is_leader() && _c->term() == synced_term) {
co_await _c->step_down();
}
co_return r;
}

// requests get into session->cache in seq order so when we iterate
// starting from the beginning and until we meet the first still
// in flight request we may be sure that we update seq_table in
// order to preserve monotonicity and the lack of gaps
while (!session->cache.empty() && !session->cache.front()->is_processing) {
auto front = session->cache.front();
if (!front->r) {
// just encountered a failed request it but when a request
// fails it causes a step down so we may not care about
// updating seq_table and stop doing it; the next leader's
// apply will do it for us
break;
}
auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid);
if (inserted) {
seq_it->second.pid = bid.pid;
seq_it->second.seq = front->last_seq;
seq_it->second.last_offset = front->r.value().last_offset;
} else {
seq_it->second.update(
front->last_seq, front->r.value().last_offset);
}
session->cache.pop_front();
}
auto r = co_await _c->replicate(synced_term, std::move(br), opts);
if (r) {
set_seq(bid, r.value().last_offset);

if (session->cache.empty() && session->lock.ready()) {
_inflight_requests.erase(bid.pid);
}

co_return r;
}

ss::future<result<raft::replicate_result>> rm_stm::replicate_msg(
model::record_batch_reader br,
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued) {
if (!co_await sync(_sync_timeout)) {
co_return errc::not_leader;
}

auto ss = _c->replicate_in_stages(_insync_term, std::move(br), opts);
co_await std::move(ss.request_enqueued);
enqueued->set_value();
co_return co_await std::move(ss.replicate_finished);
}

model::offset rm_stm::last_stable_offset() {
auto first_tx_start = model::offset::max();

Expand Down
Loading

0 comments on commit 884086e

Please sign in to comment.