From ff303709d921555d9ac0017a528aee0e810b00ef Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Thu, 16 Jun 2022 11:01:00 -0700 Subject: [PATCH] rm_stm: implement replicate_seq pipelining Kafka protocol is asynchronous: a client may send the next write request without waiting for the previous request to finish. Since Redpanda's Raft implementation is also asynchronous, it's possible to process Kafka requests with minimal overhead and/or delays. The challenge is to process causally related requests without pipeline stalling. Imagine that we have two requests A and B. When we want to process them as fast as possible we should start repli- cating the latter request without waiting until the replication of the first request is over. But in case the requests are the RSM commands and are causally related e.g. request A is "set x=2" and B is "set y=3 if x=2" then to guarantee that B's precondition holds Redpanda can't start replication of B request without knowing that - The request A is successfully executed - No other command sneaks in between A and B In case of idempotency the condition we need to preserve is the monotonicity of the seq numbers without gaps. In order to preserve causality and to avoid pipeline stalls we use optimistic replication. A leader knows about its ongoing operations so it may optimistically predict the outcome of operation A and start executing operation B assuming its prediction is true. But what happens if Redpanda is wrong with its prediction, how does it ensure safety? Let's break down all the failure scenarios. A leader uses a mutex and consesnsus::replicate_in_stages order all the incoming requests. Before the first stage is resolves the mutex controls the order and after the first stage is resolved Redpanda's raft implementation guarantees order: auto u = co_await mutex.lock(); auto stages = raft.replicate_in_stages(request); co_await stages.enqueued; u.return_all(); // after this point the order between the requests is certain // the order enforced by mutex is preserved in the log The mutex / replicate_in_stages combination may enforce partial order but can't prevent out-of-nowhere writes. Imagine a node loses the leadership then new leader inserts a command C and transfers the leadership back to the original node. To fight this Redpanda uses conditional replication: auto term = persisted_stm::sync(); auto u = co_await mutex.lock(); auto stages = raft.replicate_in_stages(term, request); co_await stages.enqueued; u.return_all(); It uses a sync method to make sure that the RSM's state reflects all the commands replicated by previous term (during this phase it may learn about the "A" command) and then it uses the raft's term to issue the replication command (conditional replicate). By design the replication call can't succeed if the term is wrong so instead of leading the ACB state the replication of B is doomed to fail. Redpanda's Raft implementation guarantees that if A was enqueued before B within the same term then the replication of B can't be successful without replication of A so we may not worry that A may be dropped. But we still have uncertainty. Should Redpanda process B assuming that A was successful or should it assume it failed? In order to resolve the uncertainty the leader steps down. Since sync() guarantees that the RSM's state reflects all the commands replicated by previous term - the next leader will resolve the uncertainty. The combination of those methods lead to preserving causal relationships between the requests (idempotency) without introducing pipelining stalls. ------------------------------------------------------------------- The idempotency logic is straightforward: - Take a lock identified by producer id (concurrent producers don't affect each other) - If the seq number and its offset is already known then return the offset - If the seq number is known and in flight then "park" the current request (once the original resolved it pings all the parked requests with the written offsets or an error) - If the seq number is seen for the first time then: - Reject it if there is a gap between it and the last known seq - Start processing if there is no gap fixes https://github.com/redpanda-data/redpanda/issues/5054 --- src/v/cluster/rm_stm.cc | 206 +++++++++++++++++++++++++++++++++++----- src/v/cluster/rm_stm.h | 78 +++++++++++---- 2 files changed, 244 insertions(+), 40 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index bc2e0e187bdc6..540e7e2d07112 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -779,13 +779,7 @@ ss::future> rm_stm::do_replicate( }) .finally([u = std::move(unit)] {}); } else if (bid.has_idempotent()) { - request_id rid{.pid = bid.pid, .seq = bid.first_seq}; - return with_request_lock( - rid, - [this, enqueued, bid, opts, b = std::move(b)]() mutable { - return replicate_seq( - bid, std::move(b), opts, enqueued); - }) + return replicate_seq(bid, std::move(b), opts, enqueued) .finally([u = std::move(unit)] {}); } @@ -919,6 +913,14 @@ rm_stm::known_seq(model::batch_identity bid) const { return std::nullopt; } +std::optional rm_stm::tail_seq(model::producer_identity pid) const { + auto pid_seq = _log_state.seq_table.find(pid); + if (pid_seq == _log_state.seq_table.end()) { + return std::nullopt; + } + return pid_seq->second.seq; +} + void rm_stm::set_seq(model::batch_identity bid, model::offset last_offset) { auto pid_seq = _log_state.seq_table.find(bid.pid); if (pid_seq != _log_state.seq_table.end()) { @@ -1062,7 +1064,7 @@ ss::future> rm_stm::replicate_seq( model::batch_identity bid, model::record_batch_reader br, raft::replicate_options opts, - [[maybe_unused]] ss::lw_shared_ptr> enqueued) { + ss::lw_shared_ptr> enqueued) { if (!co_await sync(_sync_timeout)) { // it's ok not to set enqueued on early return because // the safety check in replicate_in_stages sets it automatically @@ -1079,31 +1081,187 @@ ss::future> rm_stm::replicate_seq( co_return errc::invalid_request; } + // getting session by client's pid + auto session = _inflight_requests + .lazy_emplace( + bid.pid, + [&](const auto& ctor) { + ctor( + bid.pid, ss::make_lw_shared()); + }) + ->second; + + // critical section, rm_stm evaluates the request its order and + // whether or not it's duplicated + // ASSUMING THAT ALL INFLIGHT REPLICATION REQUESTS WILL SUCCEED + // if any of the request fail we expect that the leader step down + auto u = co_await session->lock.get_units(); + + if (!_c->is_leader() || _c->term() != synced_term) { + co_return errc::not_leader; + } + + if (session->term < synced_term) { + // we don't care about old inflight requests because the sync + // guarantee (all replicated records of the last term are + // applied) makes sure that the passed inflight records got + // into _log_state->seq_table + session->forget(); + session->term = synced_term; + } + + if (session->term > synced_term) { + co_return errc::not_leader; + } + + // checking if the request (identified by seq) is already resolved + // checking among the pending requests + auto cached_r = session->known_seq(bid.last_seq); + if (cached_r) { + co_return cached_r.value(); + } + // checking among the responded requests auto cached_offset = known_seq(bid); if (cached_offset) { - if (cached_offset.value() < model::offset{0}) { + co_return raft::replicate_result{.last_offset = cached_offset.value()}; + } + + // checking if the request is already being processed + for (auto& inflight : session->cache) { + if (inflight->last_seq == bid.last_seq && inflight->is_processing) { + // found an inflight request, parking the current request + // until the former is resolved + auto promise = ss::make_lw_shared< + available_promise>>(); + inflight->parked.push_back(promise); + u.return_all(); + co_return co_await promise->get_future(); + } + } + + // apparantly we process an unseen request + // checking if it isn't out of order with the already procceses + // or inflight requests + if (session->cache.size() == 0) { + // no inflight requests > checking processed + auto tail = tail_seq(bid.pid); + if (tail) { + if (!is_sequence(tail.value(), bid.first_seq)) { + // there is a gap between the request'ss seq number + // and the seq number of the latest processed request + co_return errc::sequence_out_of_order; + } + } else { + if (bid.first_seq != 0) { + // first request in a session should have seq=0 + co_return errc::sequence_out_of_order; + } + } + } else { + if (!is_sequence(session->tail_seq, bid.first_seq)) { + // there is a gap between the request'ss seq number + // and the seq number of the latest inflight request + co_return errc::sequence_out_of_order; + } + } + + // updating last observed seq, since we hold the mutex + // it's guranteed to be latest + session->tail_seq = bid.last_seq; + + auto request = ss::make_lw_shared(); + request->last_seq = bid.last_seq; + request->is_processing = true; + session->cache.push_back(request); + + // request comes in the right order, it's ok to replicate + ss::lw_shared_ptr ss; + auto has_failed = false; + try { + ss = _c->replicate_in_stages(synced_term, std::move(br), opts); + co_await std::move(ss->request_enqueued); + } catch (...) { + vlog( + clusterlog.warn, + "replication failed with {}", + std::current_exception()); + has_failed = true; + } + + result r = errc::success; + + if (has_failed) { + if (_c->is_leader() && _c->term() == synced_term) { + // we may not care about the requests waiting on the lock + // as soon as we release the lock the leader or term will + // be changed so the pending fibers won't pass the initial + // checks and be rejected with not_leader + co_await _c->step_down(); + } + u.return_all(); + r = errc::replication_error; + } else { + try { + u.return_all(); + enqueued->set_value(); + r = co_await std::move(ss->replicate_finished); + } catch (...) { vlog( clusterlog.warn, - "Status of the original attempt is unknown (still is in-flight " - "or failed). Failing a retried batch with the same pid:{} & " - "seq:{} to avoid duplicates", - bid.pid, - bid.last_seq); - // kafka clients don't automaticly handle fatal errors - // so when they encounter the error they will be forced - // to propagate it to the app layer - co_return errc::generic_tx_error; + "replication failed with {}", + std::current_exception()); + r = errc::replication_error; } - co_return raft::replicate_result{.last_offset = cached_offset.value()}; } - if (!check_seq(bid)) { - co_return errc::sequence_out_of_order; + // we don't need session->lock because we never interleave + // access to is_processing and offset with sync point (await) + request->is_processing = false; + request->r = r; + for (auto& pending : request->parked) { + pending->set_value(r); + } + request->parked.clear(); + + if (!r) { + // if r was failed at the consensus level (not because has_failed) + // it should guarantee that all follow up replication requests fail + // too but just in case stepping down to minimize the risk + if (_c->is_leader() && _c->term() == synced_term) { + co_await _c->step_down(); + } + co_return r; + } + + // requests get into session->cache in seq order so when we iterate + // starting from the beginning and until we meet the first still + // in flight request we may be sure that we update seq_table in + // order to preserve monotonicity and the lack of gaps + while (!session->cache.empty() && !session->cache.front()->is_processing) { + auto front = session->cache.front(); + if (!front->r) { + // just encountered a failed request it but when a request + // fails it causes a step down so we may not care about + // updating seq_table and stop doing it; the next leader's + // apply will do it for us + break; + } + auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid); + if (inserted) { + seq_it->second.pid = bid.pid; + seq_it->second.seq = front->last_seq; + seq_it->second.last_offset = front->r.value().last_offset; + } else { + seq_it->second.update( + front->last_seq, front->r.value().last_offset); + } + session->cache.pop_front(); } - auto r = co_await _c->replicate(synced_term, std::move(br), opts); - if (r) { - set_seq(bid, r.value().last_offset); + + if (session->cache.empty() && session->lock.ready()) { + _inflight_requests.erase(bid.pid); } + co_return r; } diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 49b4ba7a01a96..9d9997f5845e6 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -268,6 +268,7 @@ class rm_stm final : public persisted_stm { std::optional known_seq(model::batch_identity) const; void set_seq(model::batch_identity, model::offset); void reset_seq(model::batch_identity); + std::optional tail_seq(model::producer_identity) const; ss::future> do_replicate( model::batch_identity, @@ -409,22 +410,64 @@ class rm_stm final : public persisted_stm { } }; - template - auto with_request_lock(request_id rid, Func&& f) { - auto lock_it = _request_locks.find(rid); - if (lock_it == _request_locks.end()) { - lock_it - = _request_locks.emplace(rid, ss::make_lw_shared()).first; + // When a request is retried while the first appempt is still + // being replicated the retried request is parked until the + // original request is replicated. + struct inflight_request { + int32_t last_seq{-1}; + result r = errc::success; + bool is_processing; + std::vector< + ss::lw_shared_ptr>>> + parked; + }; + + // Redpanda uses optimistic replication to implement pipelining of + // idempotent replication requests. Just like with non-idempotent + // requests redpanda doesn't wait until previous request is replicated + // before processing the next. + // + // However with idempotency the requests are causally related: seq + // numbers should increase without gaps. So we need to prevent a + // case when a request A's replication starts before a request's B + // replication but then it fails while B's replication passes. + // + // We reply on conditional replication to guarantee that A and B + // share the same term and then on ours Raft's guarantees about order + // if A was enqueued before B within the same term then if A fails + // then B should fail too. + // + // inflight_requests hosts inflight requests before they are resolved + // and form a monotonicly increasing continuation without gaps of + // log_state's seq_table + struct inflight_requests { + mutex lock; + int32_t tail_seq{-1}; + model::term_id term; + ss::circular_buffer> cache; + + void forget() { + for (auto& inflight : cache) { + if (inflight->is_processing) { + for (auto& pending : inflight->parked) { + pending->set_value(errc::generic_tx_error); + } + } + } + cache.clear(); + tail_seq = -1; } - auto r_lock = lock_it->second; - return r_lock->with(std::forward(f)) - .finally([this, r_lock, rid]() { - if (r_lock->waiters() == 0) { - // this fiber is the last holder of the lock - _request_locks.erase(rid); - } - }); - } + + std::optional> + known_seq(int32_t last_seq) const { + for (auto& seq : cache) { + if (seq->last_seq == last_seq && !seq->is_processing) { + return seq->r; + } + } + return std::nullopt; + } + }; ss::lw_shared_ptr get_tx_lock(model::producer_id pid) { auto lock_it = _tx_locks.find(pid); @@ -443,7 +486,10 @@ class rm_stm final : public persisted_stm { ss::basic_rwlock<> _state_lock; absl::flat_hash_map> _tx_locks; - absl::flat_hash_map> _request_locks; + absl::flat_hash_map< + model::producer_identity, + ss::lw_shared_ptr> + _inflight_requests; log_state _log_state; mem_state _mem_state; ss::timer auto_abort_timer;