Skip to content

Commit

Permalink
rm_stm: implement replicate_seq pipelining
Browse files Browse the repository at this point in the history
Kafka protocol is asynchronous: a client may send the next write
request without waiting for the previous request to finish. Since
Redpanda's Raft implementation is also asynchronous, it's possible
to process Kafka requests with minimal overhead and/or delays.

The challenge is to process causally related requests without
pipeline stalling. Imagine that we have two requests A and B. When
we want to process them as fast as possible we should start repli-
cating the latter request without waiting until the replication of
the first request is over.

But in case the requests are the RSM commands and are causally
related e.g. request A is "set x=2" and B is "set y=3 if x=2" then
to guarantee that B's precondition holds Redpanda can't start
replication of B request without knowing that

  - The request A is successfully executed
  - No other command sneaks in between A and B

In case of idempotency the condition we need to preserve is the
monotonicity of the seq numbers without gaps.

In order to preserve causality and to avoid pipeline stalls we use
optimistic replication. A leader knows about its ongoing operations
so it may optimistically predict the outcome of operation A and
start executing operation B assuming its prediction is true.

But what happens if Redpanda is wrong with its prediction, how does
it ensure safety? Let's break down all the failure scenarios.

A leader uses a mutex and consesnsus::replicate_in_stages order all
the incoming requests. Before the first stage is resolves the mutex
controls the order and after the first stage is resolved Redpanda's
raft implementation guarantees order:

  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(request);
  co_await stages.enqueued;
  u.return_all();
  // after this point the order between the requests is certain
  // the order enforced by mutex is preserved in the log

The mutex / replicate_in_stages combination may enforce partial
order but can't prevent out-of-nowhere writes. Imagine a node loses
the leadership then new leader inserts a command C and transfers
the leadership back to the original node.

To fight this Redpanda uses conditional replication:

  auto term = persisted_stm::sync();
  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(term, request);
  co_await stages.enqueued;
  u.return_all();

It uses a sync method to make sure that the RSM's state reflects
all the commands replicated by previous term (during this phase it
may learn about the "A" command) and then it uses the raft's term
to issue the replication command (conditional replicate). By design
the replication call can't succeed if the term is wrong so instead
of leading the ACB state the replication of B is doomed to fail.

Redpanda's Raft implementation guarantees that if A was enqueued
before B within the same term then the replication of B can't be
successful without replication of A so we may not worry that A may
be dropped.

But we still have uncertainty. Should Redpanda process B assuming
that A was successful or should it assume it failed? In order to
resolve the uncertainty the leader steps down.

Since sync() guarantees that the RSM's state reflects all the
commands replicated by previous term - the next leader will resolve
the uncertainty.

The combination of those methods lead to preserving causal
relationships between the requests (idempotency) without
introducing pipelining stalls.

-------------------------------------------------------------------

The idempotency logic is straightforward:

 - Take a lock identified by producer id (concurrent producers
   don't affect each other)
    - If the seq number and its offset is already known then
      return the offset
    - If the seq number is known and in flight then "park" the
      current request (once the original resolved it pings all
      the parked requests with the written offsets or an error)
    - If the seq number is seen for the first time then:
        - Reject it if there is a gap between it and the last
          known seq
        - Start processing if there is no gap

fixes redpanda-data#5054
  • Loading branch information
rystsov committed Jun 23, 2022
1 parent 69b58c8 commit 21bb08a
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 40 deletions.
206 changes: 182 additions & 24 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -806,13 +806,7 @@ ss::future<result<raft::replicate_result>> rm_stm::do_replicate(
})
.finally([u = std::move(unit)] {});
} else if (bid.has_idempotent()) {
request_id rid{.pid = bid.pid, .seq = bid.first_seq};
return with_request_lock(
rid,
[this, enqueued, bid, opts, b = std::move(b)]() mutable {
return replicate_seq(
bid, std::move(b), opts, enqueued);
})
return replicate_seq(bid, std::move(b), opts, enqueued)
.finally([u = std::move(unit)] {});
}

Expand Down Expand Up @@ -946,6 +940,14 @@ rm_stm::known_seq(model::batch_identity bid) const {
return std::nullopt;
}

std::optional<int32_t> rm_stm::tail_seq(model::producer_identity pid) const {
auto pid_seq = _log_state.seq_table.find(pid);
if (pid_seq == _log_state.seq_table.end()) {
return std::nullopt;
}
return pid_seq->second.seq;
}

void rm_stm::set_seq(model::batch_identity bid, model::offset last_offset) {
auto pid_seq = _log_state.seq_table.find(bid.pid);
if (pid_seq != _log_state.seq_table.end()) {
Expand Down Expand Up @@ -1090,7 +1092,7 @@ ss::future<result<raft::replicate_result>> rm_stm::replicate_seq(
model::batch_identity bid,
model::record_batch_reader br,
raft::replicate_options opts,
[[maybe_unused]] ss::lw_shared_ptr<available_promise<>> enqueued) {
ss::lw_shared_ptr<available_promise<>> enqueued) {
if (!co_await sync(_sync_timeout)) {
// it's ok not to set enqueued on early return because
// the safety check in replicate_in_stages sets it automatically
Expand All @@ -1107,31 +1109,187 @@ ss::future<result<raft::replicate_result>> rm_stm::replicate_seq(
co_return errc::invalid_request;
}

// getting session by client's pid
auto session = _inflight_requests
.lazy_emplace(
bid.pid,
[&](const auto& ctor) {
ctor(
bid.pid, ss::make_lw_shared<inflight_requests>());
})
->second;

// critical section, rm_stm evaluates the request its order and
// whether or not it's duplicated
// ASSUMING THAT ALL INFLIGHT REPLICATION REQUESTS WILL SUCCEED
// if any of the request fail we expect that the leader step down
auto u = co_await session->lock.get_units();

if (!_c->is_leader() || _c->term() != synced_term) {
co_return errc::not_leader;
}

if (session->term < synced_term) {
// we don't care about old inflight requests because the sync
// guarantee (all replicated records of the last term are
// applied) makes sure that the passed inflight records got
// into _log_state->seq_table
session->forget();
session->term = synced_term;
}

if (session->term > synced_term) {
co_return errc::not_leader;
}

// checking if the request (identified by seq) is already resolved
// checking among the pending requests
auto cached_r = session->known_seq(bid.last_seq);
if (cached_r) {
co_return cached_r.value();
}
// checking among the responded requests
auto cached_offset = known_seq(bid);
if (cached_offset) {
if (cached_offset.value() < model::offset{0}) {
co_return raft::replicate_result{.last_offset = cached_offset.value()};
}

// checking if the request is already being processed
for (auto& inflight : session->cache) {
if (inflight->last_seq == bid.last_seq && inflight->is_processing) {
// found an inflight request, parking the current request
// until the former is resolved
auto promise = ss::make_lw_shared<
available_promise<result<raft::replicate_result>>>();
inflight->parked.push_back(promise);
u.return_all();
co_return co_await promise->get_future();
}
}

// apparantly we process an unseen request
// checking if it isn't out of order with the already procceses
// or inflight requests
if (session->cache.size() == 0) {
// no inflight requests > checking processed
auto tail = tail_seq(bid.pid);
if (tail) {
if (!is_sequence(tail.value(), bid.first_seq)) {
// there is a gap between the request'ss seq number
// and the seq number of the latest processed request
co_return errc::sequence_out_of_order;
}
} else {
if (bid.first_seq != 0) {
// first request in a session should have seq=0
co_return errc::sequence_out_of_order;
}
}
} else {
if (!is_sequence(session->tail_seq, bid.first_seq)) {
// there is a gap between the request'ss seq number
// and the seq number of the latest inflight request
co_return errc::sequence_out_of_order;
}
}

// updating last observed seq, since we hold the mutex
// it's guranteed to be latest
session->tail_seq = bid.last_seq;

auto request = ss::make_lw_shared<inflight_request>();
request->last_seq = bid.last_seq;
request->is_processing = true;
session->cache.push_back(request);

// request comes in the right order, it's ok to replicate
ss::lw_shared_ptr<raft::replicate_stages> ss;
auto has_failed = false;
try {
ss = _c->replicate_in_stages(synced_term, std::move(br), opts);
co_await std::move(ss->request_enqueued);
} catch (...) {
vlog(
clusterlog.warn,
"replication failed with {}",
std::current_exception());
has_failed = true;
}

result<raft::replicate_result> r = errc::success;

if (has_failed) {
if (_c->is_leader() && _c->term() == synced_term) {
// we may not care about the requests waiting on the lock
// as soon as we release the lock the leader or term will
// be changed so the pending fibers won't pass the initial
// checks and be rejected with not_leader
co_await _c->step_down();
}
u.return_all();
r = errc::replication_error;
} else {
try {
u.return_all();
enqueued->set_value();
r = co_await std::move(ss->replicate_finished);
} catch (...) {
vlog(
clusterlog.warn,
"Status of the original attempt is unknown (still is in-flight "
"or failed). Failing a retried batch with the same pid:{} & "
"seq:{} to avoid duplicates",
bid.pid,
bid.last_seq);
// kafka clients don't automaticly handle fatal errors
// so when they encounter the error they will be forced
// to propagate it to the app layer
co_return errc::generic_tx_error;
"replication failed with {}",
std::current_exception());
r = errc::replication_error;
}
co_return raft::replicate_result{.last_offset = cached_offset.value()};
}

if (!check_seq(bid)) {
co_return errc::sequence_out_of_order;
// we don't need session->lock because we never interleave
// access to is_processing and offset with sync point (await)
request->is_processing = false;
request->r = r;
for (auto& pending : request->parked) {
pending->set_value(r);
}
request->parked.clear();

if (!r) {
// if r was failed at the consensus level (not because has_failed)
// it should guarantee that all follow up replication requests fail
// too but just in case stepping down to minimize the risk
if (_c->is_leader() && _c->term() == synced_term) {
co_await _c->step_down();
}
co_return r;
}

// requests get into session->cache in seq order so when we iterate
// starting from the beginning and until we meet the first still
// in flight request we may be sure that we update seq_table in
// order to preserve monotonicity and the lack of gaps
while (!session->cache.empty() && !session->cache.front()->is_processing) {
auto front = session->cache.front();
if (!front->r) {
// just encountered a failed request it but when a request
// fails it causes a step down so we may not care about
// updating seq_table and stop doing it; the next leader's
// apply will do it for us
break;
}
auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid);
if (inserted) {
seq_it->second.pid = bid.pid;
seq_it->second.seq = front->last_seq;
seq_it->second.last_offset = front->r.value().last_offset;
} else {
seq_it->second.update(
front->last_seq, front->r.value().last_offset);
}
session->cache.pop_front();
}
auto r = co_await _c->replicate(synced_term, std::move(br), opts);
if (r) {
set_seq(bid, r.value().last_offset);

if (session->cache.empty() && session->lock.ready()) {
_inflight_requests.erase(bid.pid);
}

co_return r;
}

Expand Down
78 changes: 62 additions & 16 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ class rm_stm final : public persisted_stm {
std::optional<model::offset> known_seq(model::batch_identity) const;
void set_seq(model::batch_identity, model::offset);
void reset_seq(model::batch_identity);
std::optional<int32_t> tail_seq(model::producer_identity) const;

ss::future<result<raft::replicate_result>> do_replicate(
model::batch_identity,
Expand Down Expand Up @@ -414,22 +415,64 @@ class rm_stm final : public persisted_stm {
}
};

template<typename Func>
auto with_request_lock(request_id rid, Func&& f) {
auto lock_it = _request_locks.find(rid);
if (lock_it == _request_locks.end()) {
lock_it
= _request_locks.emplace(rid, ss::make_lw_shared<mutex>()).first;
// When a request is retried while the first appempt is still
// being replicated the retried request is parked until the
// original request is replicated.
struct inflight_request {
int32_t last_seq{-1};
result<raft::replicate_result> r = errc::success;
bool is_processing;
std::vector<
ss::lw_shared_ptr<available_promise<result<raft::replicate_result>>>>
parked;
};

// Redpanda uses optimistic replication to implement pipelining of
// idempotent replication requests. Just like with non-idempotent
// requests redpanda doesn't wait until previous request is replicated
// before processing the next.
//
// However with idempotency the requests are causally related: seq
// numbers should increase without gaps. So we need to prevent a
// case when a request A's replication starts before a request's B
// replication but then it fails while B's replication passes.
//
// We reply on conditional replication to guarantee that A and B
// share the same term and then on ours Raft's guarantees about order
// if A was enqueued before B within the same term then if A fails
// then B should fail too.
//
// inflight_requests hosts inflight requests before they are resolved
// and form a monotonicly increasing continuation without gaps of
// log_state's seq_table
struct inflight_requests {
mutex lock;
int32_t tail_seq{-1};
model::term_id term;
ss::circular_buffer<ss::lw_shared_ptr<inflight_request>> cache;

void forget() {
for (auto& inflight : cache) {
if (inflight->is_processing) {
for (auto& pending : inflight->parked) {
pending->set_value(errc::generic_tx_error);
}
}
}
cache.clear();
tail_seq = -1;
}
auto r_lock = lock_it->second;
return r_lock->with(std::forward<Func>(f))
.finally([this, r_lock, rid]() {
if (r_lock->waiters() == 0) {
// this fiber is the last holder of the lock
_request_locks.erase(rid);
}
});
}

std::optional<result<raft::replicate_result>>
known_seq(int32_t last_seq) const {
for (auto& seq : cache) {
if (seq->last_seq == last_seq && !seq->is_processing) {
return seq->r;
}
}
return std::nullopt;
}
};

ss::lw_shared_ptr<mutex> get_tx_lock(model::producer_id pid) {
auto lock_it = _tx_locks.find(pid);
Expand All @@ -448,7 +491,10 @@ class rm_stm final : public persisted_stm {

ss::basic_rwlock<> _state_lock;
absl::flat_hash_map<model::producer_id, ss::lw_shared_ptr<mutex>> _tx_locks;
absl::flat_hash_map<request_id, ss::lw_shared_ptr<mutex>> _request_locks;
absl::flat_hash_map<
model::producer_identity,
ss::lw_shared_ptr<inflight_requests>>
_inflight_requests;
log_state _log_state;
mem_state _mem_state;
ss::timer<clock_type> auto_abort_timer;
Expand Down

0 comments on commit 21bb08a

Please sign in to comment.