Skip to content

Commit

Permalink
add rm_stm pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Jun 18, 2022
1 parent c056601 commit b949b1b
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 41 deletions.
196 changes: 171 additions & 25 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -769,13 +769,7 @@ ss::future<result<raft::replicate_result>> rm_stm::do_replicate(
})
.finally([u = std::move(unit)] {});
} else if (bid.has_idempotent()) {
request_id rid{.pid = bid.pid, .seq = bid.first_seq};
return with_request_lock(
rid,
[this, enqueued, bid, opts, b = std::move(b)]() mutable {
return replicate_seq(
bid, std::move(b), opts, enqueued);
})
return replicate_seq(bid, std::move(b), opts, enqueued)
.finally([u = std::move(unit)] {});
}

Expand Down Expand Up @@ -914,6 +908,14 @@ rm_stm::known_seq(model::batch_identity bid) const {
return std::nullopt;
}

std::optional<int32_t> rm_stm::tail_seq(model::producer_identity pid) const {
auto pid_seq = _log_state.seq_table.find(pid);
if (pid_seq == _log_state.seq_table.end()) {
return std::nullopt;
}
return pid_seq->second.seq;
}

void rm_stm::set_seq(model::batch_identity bid, model::offset last_offset) {
auto pid_seq = _log_state.seq_table.find(bid.pid);
if (pid_seq != _log_state.seq_table.end()) {
Expand Down Expand Up @@ -1057,7 +1059,7 @@ ss::future<result<raft::replicate_result>> rm_stm::replicate_seq(
model::batch_identity bid,
model::record_batch_reader br,
raft::replicate_options opts,
[[maybe_unused]] ss::lw_shared_ptr<available_promise<>> enqueued) {
ss::lw_shared_ptr<available_promise<>> enqueued) {
if (!co_await sync(_sync_timeout)) {
co_return errc::not_leader;
}
Expand All @@ -1069,35 +1071,179 @@ ss::future<result<raft::replicate_result>> rm_stm::replicate_seq(
"first_seq={} of the batch should be less or equal to last_seq={}",
bid.first_seq,
bid.last_seq);
outcome->set_value();
co_return errc::generic_tx_error;
}

// getting session by client's pid
ss::lw_shared_ptr<inflight_requests> session;
auto session_it = _inflight_requests.find(bid.pid);
if (session_it == _inflight_requests.end()) {
session = ss::make_lw_shared<inflight_requests>();
_inflight_requests.try_emplace(bid.pid, session);
} else {
session = session_it->second;
}

// critical section, rm_stm evaluates the request its order and
// whether or not it's duplicated
// ASSUMING THAT ALL INFLIGHT REPLICATION REQUESTS WILL SUCCEED
// if any of the request fail we expect that the leader step down
auto u = co_await session->lock.get_units();

if (!_c->is_leader() || _c->term() != synced_term) {
co_return errc::not_leader;
}

if (session->term < synced_term) {
// we don't care about old inflight requests because the sync
// guarantee (all replicated records of the last term are
// applied) makes sure that the passed inflight records got
// into _log_state->seq_table
session->forget();
session->term = synced_term;
}

// checking if the request (identified by seq) is already resolved
// checking among the pending requests
auto cached_r = session->known_seq(bid.last_seq);
if (cached_r) {
co_return cached_r.value();
}
// checking among the responded requests
auto cached_offset = known_seq(bid);
if (cached_offset) {
if (cached_offset.value() < model::offset{0}) {
co_return raft::replicate_result{.last_offset = cached_offset.value()};
}

// checking if the request is already being processed
for (auto inflight : session->cache) {
if (inflight->last_seq == bid.last_seq && inflight->is_processing) {
// found an inflight request, parking the current request
// until the former is resolved
auto promise = ss::make_lw_shared<
available_promise<result<raft::replicate_result>>>();
inflight->parked.push_back(promise);
u.return_all();
co_return co_await promise->get_future();
}
}

// apparantly we process an unseen request
// checking if it isn't out of order with the already procceses
// or inflight requests
if (session->cache.size() == 0) {
// no inflight requests > checking processed
auto tail = tail_seq(bid.pid);
if (tail) {
if (!is_sequence(tail.value(), bid.first_seq)) {
// there is a gap between the request'ss seq number
// and the seq number of the latest processed request
co_return errc::sequence_out_of_order;
}
} else {
if (bid.first_seq != 0) {
// first request in a session should have seq=0
co_return errc::sequence_out_of_order;
}
}
} else {
if (!is_sequence(session->tail_seq, bid.first_seq)) {
// there is a gap between the request'ss seq number
// and the seq number of the latest inflight request
co_return errc::sequence_out_of_order;
}
}

// updating last observed seq, since we hold the mutex
// it's guranteed to be latest
session->tail_seq = bid.last_seq;

auto request = ss::make_lw_shared<inflight_request>();
request->last_seq = bid.last_seq;
request->is_processing = true;
session->cache.push_back(request);

// request comes in the right order, it's ok to replicate
ss::lw_shared_ptr<raft::replicate_stages> ss;
auto has_failed = false;
try {
ss = _c->replicate_in_stages(synced_term, std::move(br), opts);
co_await std::move(ss->request_enqueued);
} catch (...) {
vlog(
clusterlog.warn,
"replication failed with {}",
std::current_exception());
has_failed = true;
}

result<raft::replicate_result> r = errc::success;

if (has_failed) {
if (_c->is_leader() && _c->term() == synced_term) {
// we may not care about the requests waiting on the lock
// as soon as we release the lock the leader or term will
// be changed so the pending fibers won't pass the initial
// checks and be rejected with not_leader
co_await _c->step_down();
}
u.return_all();
r = errc::generic_tx_error;
} else {
try {
u.return_all();
enqueued->set_value();
r = co_await std::move(ss->replicate_finished);
} catch (...) {
vlog(
clusterlog.warn,
"Status of the original attempt is unknown (still is in-flight "
"or failed). Failing a retried batch with the same pid:{} & "
"seq:{} to avoid duplicates",
bid.pid,
bid.last_seq);
// kafka clients don't automaticly handle fatal errors
// so when they encounter the error they will be forced
// to propagate it to the app layer
co_return errc::generic_tx_error;
"replication failed with {}",
std::current_exception());
r = errc::generic_tx_error;
}
co_return raft::replicate_result{.last_offset = cached_offset.value()};
}

if (!check_seq(bid)) {
co_return errc::sequence_out_of_order;
// we don't need session->lock because we never interleave
// access to is_processing and offset with sync point (await)
request->is_processing = false;
request->r = r;
for (auto pending : request->parked) {
pending->set_value(r);
}
auto r = co_await _c->replicate(synced_term, std::move(br), opts);
if (r) {
set_seq(bid, r.value().last_offset);
request->parked.clear();

if (!r) {
// if r was failed at the consensus level (not because has_failed)
// it should guarantee that all follow up replication requests fail
// too but just in case stepping down to minimize the risk
if (_c->is_leader() && _c->term() == synced_term) {
co_await _c->step_down();
}
co_return r;
}

while (session->cache.size() > 0
&& !session->cache.front()->is_processing) {
auto front = session->cache.front();
if (!front->r) {
break;
}
auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid);
if (inserted) {
seq_it->second.pid = bid.pid;
seq_it->second.seq = front->last_seq;
seq_it->second.last_offset = front->r.value().last_offset;
} else {
seq_it->second.update(
front->last_seq, front->r.value().last_offset);
}
session->cache.pop_front();
}

if (session->cache.size() == 0 && session->lock.ready()) {
_inflight_requests.erase(bid.pid);
}

co_return r;
}

Expand Down
57 changes: 41 additions & 16 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ class rm_stm final : public persisted_stm {
std::optional<model::offset> known_seq(model::batch_identity) const;
void set_seq(model::batch_identity, model::offset);
void reset_seq(model::batch_identity);
std::optional<int32_t> tail_seq(model::producer_identity) const;

ss::future<result<raft::replicate_result>> do_replicate(
model::batch_identity,
Expand Down Expand Up @@ -409,22 +410,43 @@ class rm_stm final : public persisted_stm {
}
};

template<typename Func>
auto with_request_lock(request_id rid, Func&& f) {
auto lock_it = _request_locks.find(rid);
if (lock_it == _request_locks.end()) {
lock_it
= _request_locks.emplace(rid, ss::make_lw_shared<mutex>()).first;
struct inflight_request {
int32_t last_seq{-1};
result<raft::replicate_result> r = errc::success;
bool is_processing;
std::vector<
ss::lw_shared_ptr<available_promise<result<raft::replicate_result>>>>
parked;
};

struct inflight_requests {
mutex lock;
int32_t tail_seq{-1};
model::term_id term;
ss::circular_buffer<ss::lw_shared_ptr<inflight_request>> cache;

void forget() {
for (auto inflight : cache) {
if (inflight->is_processing) {
for (auto pending : inflight->parked) {
pending->set_value(errc::generic_tx_error);
}
}
}
cache.clear();
tail_seq = -1;
}
auto r_lock = lock_it->second;
return r_lock->with(std::forward<Func>(f))
.finally([this, r_lock, rid]() {
if (r_lock->waiters() == 0) {
// this fiber is the last holder of the lock
_request_locks.erase(rid);
}
});
}

std::optional<result<raft::replicate_result>>
known_seq(int32_t last_seq) const {
for (auto seq : cache) {
if (seq->last_seq == last_seq && !seq->is_processing) {
return seq->r;
}
}
return std::nullopt;
}
};

ss::lw_shared_ptr<mutex> get_tx_lock(model::producer_id pid) {
auto lock_it = _tx_locks.find(pid);
Expand All @@ -443,7 +465,10 @@ class rm_stm final : public persisted_stm {

ss::basic_rwlock<> _state_lock;
absl::flat_hash_map<model::producer_id, ss::lw_shared_ptr<mutex>> _tx_locks;
absl::flat_hash_map<request_id, ss::lw_shared_ptr<mutex>> _request_locks;
absl::flat_hash_map<
model::producer_identity,
ss::lw_shared_ptr<inflight_requests>>
_inflight_requests;
log_state _log_state;
mem_state _mem_state;
ss::timer<clock_type> auto_abort_timer;
Expand Down

0 comments on commit b949b1b

Please sign in to comment.