From 93b8bd4799561a69213adc577614af76b597ca8f Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Thu, 16 Jun 2022 09:09:53 -0700 Subject: [PATCH 01/11] make available_promise support variable templates --- src/v/utils/available_promise.h | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/v/utils/available_promise.h b/src/v/utils/available_promise.h index f994b07ea141..a69f20241b1c 100644 --- a/src/v/utils/available_promise.h +++ b/src/v/utils/available_promise.h @@ -14,19 +14,20 @@ #include -template +template class available_promise { public: - ss::future get_future() { return _promise.get_future(); } + ss::future get_future() { return _promise.get_future(); } - void set_value(T&& value) { + template + void set_value(A&&... a) { _available = true; - _promise.set_value(std::move(value)); + _promise.set_value(std::forward(a)...); } bool available() { return _available; } private: bool _available{false}; - ss::promise _promise; + ss::promise _promise; }; From 9a693fc6a649db95de98693d2a9974f2f7584300 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Thu, 16 Jun 2022 11:28:59 -0700 Subject: [PATCH 02/11] consensus: add step_down --- src/v/raft/consensus.h | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 9376da87519b..31326e45cbe2 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -255,6 +255,16 @@ class consensus { }); } + ss::future<> step_down() { + return _op_lock.with([this] { + do_step_down("external_stepdown"); + if (_leader_id) { + _leader_id = std::nullopt; + trigger_leadership_notification(); + } + }); + } + ss::future> timequery(storage::timequery_config cfg); From 0791a5fefb37865cf6084670cc4220e00b81acf4 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Fri, 17 Jun 2022 19:28:22 -0700 Subject: [PATCH 03/11] rm_stm: add replicate_in_stages wrapper --- src/v/cluster/partition.cc | 13 +---------- src/v/cluster/rm_stm.cc | 45 +++++++++++++++++++++++++++++++++----- src/v/cluster/rm_stm.h | 15 ++++++++++++- 3 files changed, 55 insertions(+), 18 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 7411a7aaace6..8350845f7d95 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -181,18 +181,7 @@ raft::replicate_stages partition::replicate_in_stages( } if (_rm_stm) { - ss::promise<> p; - auto f = p.get_future(); - auto replicate_finished - = _rm_stm->replicate(bid, std::move(r), opts) - .then( - [p = std::move(p)](result res) mutable { - p.set_value(); - return res; - }); - return raft::replicate_stages( - std::move(f), std::move(replicate_finished)); - + return _rm_stm->replicate_in_stages(bid, std::move(r), opts); } else { return _raft->replicate_in_stages(std::move(r), opts); } diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 986a853a876b..5e67ae5504d4 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -760,12 +760,43 @@ ss::future rm_stm::do_abort_tx( co_return tx_errc::none; } +raft::replicate_stages rm_stm::replicate_in_stages( + model::batch_identity bid, + model::record_batch_reader r, + raft::replicate_options opts) { + auto enqueued = ss::make_lw_shared>(); + auto f = enqueued->get_future(); + auto replicate_finished + = do_replicate(bid, std::move(r), opts, enqueued).finally([enqueued] { + // we should avoid situations when replicate_finished is set while + // enqueued isn't because it leads to hanging produce requests and + // the resource leaks. since staged replication is an optimization + // and setting enqueued only after replicate_finished is already + // set doesn't have sematic implications adding this post + // replicate_finished as a safety measure in case enqueued isn't + // set explicitly + if (!enqueued->available()) { + enqueued->set_value(); + } + }); + return raft::replicate_stages(std::move(f), std::move(replicate_finished)); +} + ss::future> rm_stm::replicate( model::batch_identity bid, - model::record_batch_reader b, + model::record_batch_reader r, raft::replicate_options opts) { + auto enqueued = ss::make_lw_shared>(); + return do_replicate(bid, std::move(r), opts, enqueued); +} + +ss::future> rm_stm::do_replicate( + model::batch_identity bid, + model::record_batch_reader b, + raft::replicate_options opts, + ss::lw_shared_ptr> enqueued) { return _state_lock.hold_read_lock().then( - [this, bid, opts, b = std::move(b)]( + [this, enqueued, bid, opts, b = std::move(b)]( ss::basic_rwlock<>::holder unit) mutable { if (bid.is_transactional) { auto pid = bid.pid.get_id(); @@ -778,8 +809,9 @@ ss::future> rm_stm::replicate( request_id rid{.pid = bid.pid, .seq = bid.first_seq}; return with_request_lock( rid, - [this, bid, opts, b = std::move(b)]() mutable { - return replicate_seq(bid, std::move(b), opts); + [this, enqueued, bid, opts, b = std::move(b)]() mutable { + return replicate_seq( + bid, std::move(b), opts, enqueued); }) .finally([u = std::move(unit)] {}); } @@ -1064,8 +1096,11 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { ss::future> rm_stm::replicate_seq( model::batch_identity bid, model::record_batch_reader br, - raft::replicate_options opts) { + raft::replicate_options opts, + [[maybe_unused]] ss::lw_shared_ptr> enqueued) { if (!co_await sync(_sync_timeout)) { + // it's ok not to set enqueued on early return because + // the safety check in replicate_in_stages sets it automatically co_return errc::not_leader; } auto synced_term = _insync_term; diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 5f2d7e4461d2..b69530690f35 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -22,6 +22,7 @@ #include "raft/state_machine.h" #include "raft/types.h" #include "storage/snapshot.h" +#include "utils/available_promise.h" #include "utils/expiring_promise.h" #include "utils/mutex.h" @@ -168,6 +169,11 @@ class rm_stm final : public persisted_stm { ss::future> aborted_transactions(model::offset, model::offset); + raft::replicate_stages replicate_in_stages( + model::batch_identity, + model::record_batch_reader, + raft::replicate_options); + ss::future> replicate( model::batch_identity, model::record_batch_reader, @@ -268,13 +274,20 @@ class rm_stm final : public persisted_stm { void set_seq(model::batch_identity, model::offset); void reset_seq(model::batch_identity); + ss::future> do_replicate( + model::batch_identity, + model::record_batch_reader, + raft::replicate_options, + ss::lw_shared_ptr>); + ss::future> replicate_tx(model::batch_identity, model::record_batch_reader); ss::future> replicate_seq( model::batch_identity, model::record_batch_reader, - raft::replicate_options); + raft::replicate_options, + ss::lw_shared_ptr>); void compact_snapshot(); From 9b5491bc2c0c6f50b953d39bcfac3964e65b7aba Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Thu, 16 Jun 2022 11:15:43 -0700 Subject: [PATCH 04/11] rm_stm: implement replicate pipelining --- src/v/cluster/rm_stm.cc | 23 +++++++++++++++-------- src/v/cluster/rm_stm.h | 5 +++++ 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 5e67ae5504d4..8f76d56c9092 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -816,14 +816,7 @@ ss::future> rm_stm::do_replicate( .finally([u = std::move(unit)] {}); } - return sync(_sync_timeout) - .then([this, opts, b = std::move(b)](bool is_synced) mutable { - if (!is_synced) { - return ss::make_ready_future< - result>(errc::not_leader); - } - return _c->replicate(std::move(b), opts); - }) + return replicate_msg(std::move(b), opts, enqueued) .finally([u = std::move(unit)] {}); }); } @@ -1133,6 +1126,20 @@ ss::future> rm_stm::replicate_seq( co_return r; } +ss::future> rm_stm::replicate_msg( + model::record_batch_reader br, + raft::replicate_options opts, + ss::lw_shared_ptr> enqueued) { + if (!co_await sync(_sync_timeout)) { + co_return errc::not_leader; + } + + auto ss = _c->replicate_in_stages(_insync_term, std::move(br), opts); + co_await std::move(ss.request_enqueued); + enqueued->set_value(); + co_return co_await std::move(ss.replicate_finished); +} + model::offset rm_stm::last_stable_offset() { auto first_tx_start = model::offset::max(); diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index b69530690f35..aee69f12ebdf 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -289,6 +289,11 @@ class rm_stm final : public persisted_stm { raft::replicate_options, ss::lw_shared_ptr>); + ss::future> replicate_msg( + model::record_batch_reader, + raft::replicate_options, + ss::lw_shared_ptr>); + void compact_snapshot(); ss::future sync(model::timeout_clock::duration); From 8ea6b149bf0ecf66bf173523ffc0c760bbd3695d Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Thu, 16 Jun 2022 11:16:23 -0700 Subject: [PATCH 05/11] rm_stm: make request validation stronger Return invalid_request instead of processing a wrong if branch --- src/v/cluster/errc.h | 3 +++ src/v/cluster/rm_stm.cc | 11 ++++++++++- src/v/kafka/server/errors.h | 2 ++ src/v/kafka/server/handlers/produce.cc | 2 ++ 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/errc.h b/src/v/cluster/errc.h index 426b60535477..1f85cad7c67c 100644 --- a/src/v/cluster/errc.h +++ b/src/v/cluster/errc.h @@ -59,6 +59,7 @@ enum class errc : int16_t { error_collecting_health_report, leadership_changed, feature_disabled, + invalid_request, }; struct errc_category final : public std::error_category { const char* name() const noexcept final { return "cluster::errc"; } @@ -167,6 +168,8 @@ struct errc_category final : public std::error_category { "to finish"; case errc::feature_disabled: return "Requested feature is disabled"; + case errc::invalid_request: + return "Invalid request"; } return "cluster::errc::unknown"; } diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 8f76d56c9092..21d06ef3193a 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -805,7 +805,7 @@ ss::future> rm_stm::do_replicate( return replicate_tx(bid, std::move(b)); }) .finally([u = std::move(unit)] {}); - } else if (bid.has_idempotent() && bid.first_seq <= bid.last_seq) { + } else if (bid.has_idempotent()) { request_id rid{.pid = bid.pid, .seq = bid.first_seq}; return with_request_lock( rid, @@ -1098,6 +1098,15 @@ ss::future> rm_stm::replicate_seq( } auto synced_term = _insync_term; + if (bid.first_seq > bid.last_seq) { + vlog( + clusterlog.warn, + "first_seq={} of the batch should be less or equal to last_seq={}", + bid.first_seq, + bid.last_seq); + co_return errc::invalid_request; + } + auto cached_offset = known_seq(bid); if (cached_offset) { if (cached_offset.value() < model::offset{0}) { diff --git a/src/v/kafka/server/errors.h b/src/v/kafka/server/errors.h index 354c80df3b23..e8d86e9da831 100644 --- a/src/v/kafka/server/errors.h +++ b/src/v/kafka/server/errors.h @@ -43,6 +43,8 @@ constexpr error_code map_topic_error_code(cluster::errc code) { return error_code::broker_not_available; case cluster::errc::not_leader: return error_code::not_coordinator; + case cluster::errc::invalid_request: + return error_code::invalid_request; case cluster::errc::replication_error: case cluster::errc::shutting_down: case cluster::errc::join_request_dispatch_error: diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index 546dfa2057ce..687b127c6c0b 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -154,6 +154,8 @@ static error_code map_produce_error_code(std::error_code ec) { return error_code::invalid_producer_epoch; case cluster::errc::sequence_out_of_order: return error_code::out_of_order_sequence_number; + case cluster::errc::invalid_request: + return error_code::invalid_request; default: return error_code::unknown_server_error; } From 88423d799d748622e99c6824e303cd2667b12271 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Thu, 16 Jun 2022 11:01:00 -0700 Subject: [PATCH 06/11] rm_stm: implement replicate_seq pipelining Kafka protocol is asynchronous: a client may send the next write request without waiting for the previous request to finish. Since Redpanda's Raft implementation is also asynchronous, it's possible to process Kafka requests with minimal overhead and/or delays. The challenge is to process causally related requests without pipeline stalling. Imagine that we have two requests A and B. When we want to process them as fast as possible we should start repli- cating the latter request without waiting until the replication of the first request is over. But in case the requests are the RSM commands and are causally related e.g. request A is "set x=2" and B is "set y=3 if x=2" then to guarantee that B's precondition holds Redpanda can't start replication of B request without knowing that - The request A is successfully executed - No other command sneaks in between A and B In case of idempotency the condition we need to preserve is the monotonicity of the seq numbers without gaps. In order to preserve causality and to avoid pipeline stalls we use optimistic replication. A leader knows about its ongoing operations so it may optimistically predict the outcome of operation A and start executing operation B assuming its prediction is true. But what happens if Redpanda is wrong with its prediction, how does it ensure safety? Let's break down all the failure scenarios. A leader uses a mutex and consesnsus::replicate_in_stages order all the incoming requests. Before the first stage is resolves the mutex controls the order and after the first stage is resolved Redpanda's raft implementation guarantees order: auto u = co_await mutex.lock(); auto stages = raft.replicate_in_stages(request); co_await stages.enqueued; u.return_all(); // after this point the order between the requests is certain // the order enforced by mutex is preserved in the log The mutex / replicate_in_stages combination may enforce partial order but can't prevent out-of-nowhere writes. Imagine a node loses the leadership then new leader inserts a command C and transfers the leadership back to the original node. To fight this Redpanda uses conditional replication: auto term = persisted_stm::sync(); auto u = co_await mutex.lock(); auto stages = raft.replicate_in_stages(term, request); co_await stages.enqueued; u.return_all(); It uses a sync method to make sure that the RSM's state reflects all the commands replicated by previous term (during this phase it may learn about the "A" command) and then it uses the raft's term to issue the replication command (conditional replicate). By design the replication call can't succeed if the term is wrong so instead of leading the ACB state the replication of B is doomed to fail. Redpanda's Raft implementation guarantees that if A was enqueued before B within the same term then the replication of B can't be successful without replication of A so we may not worry that A may be dropped. But we still have uncertainty. Should Redpanda process B assuming that A was successful or should it assume it failed? In order to resolve the uncertainty the leader steps down. Since sync() guarantees that the RSM's state reflects all the commands replicated by previous term - the next leader will resolve the uncertainty. The combination of those methods lead to preserving causal relationships between the requests (idempotency) without introducing pipelining stalls. ------------------------------------------------------------------- The idempotency logic is straightforward: - Take a lock identified by producer id (concurrent producers don't affect each other) - If the seq number and its offset is already known then return the offset - If the seq number is known and in flight then "park" the current request (once the original resolved it pings all the parked requests with the written offsets or an error) - If the seq number is seen for the first time then: - Reject it if there is a gap between it and the last known seq - Start processing if there is no gap fixes https://github.com/redpanda-data/redpanda/issues/5054 --- src/v/cluster/rm_stm.cc | 206 +++++++++++++++++++++++++++++++++++----- src/v/cluster/rm_stm.h | 78 +++++++++++---- 2 files changed, 244 insertions(+), 40 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 21d06ef3193a..2de99fd38774 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -806,13 +806,7 @@ ss::future> rm_stm::do_replicate( }) .finally([u = std::move(unit)] {}); } else if (bid.has_idempotent()) { - request_id rid{.pid = bid.pid, .seq = bid.first_seq}; - return with_request_lock( - rid, - [this, enqueued, bid, opts, b = std::move(b)]() mutable { - return replicate_seq( - bid, std::move(b), opts, enqueued); - }) + return replicate_seq(bid, std::move(b), opts, enqueued) .finally([u = std::move(unit)] {}); } @@ -946,6 +940,14 @@ rm_stm::known_seq(model::batch_identity bid) const { return std::nullopt; } +std::optional rm_stm::tail_seq(model::producer_identity pid) const { + auto pid_seq = _log_state.seq_table.find(pid); + if (pid_seq == _log_state.seq_table.end()) { + return std::nullopt; + } + return pid_seq->second.seq; +} + void rm_stm::set_seq(model::batch_identity bid, model::offset last_offset) { auto pid_seq = _log_state.seq_table.find(bid.pid); if (pid_seq != _log_state.seq_table.end()) { @@ -1090,7 +1092,7 @@ ss::future> rm_stm::replicate_seq( model::batch_identity bid, model::record_batch_reader br, raft::replicate_options opts, - [[maybe_unused]] ss::lw_shared_ptr> enqueued) { + ss::lw_shared_ptr> enqueued) { if (!co_await sync(_sync_timeout)) { // it's ok not to set enqueued on early return because // the safety check in replicate_in_stages sets it automatically @@ -1107,31 +1109,187 @@ ss::future> rm_stm::replicate_seq( co_return errc::invalid_request; } + // getting session by client's pid + auto session = _inflight_requests + .lazy_emplace( + bid.pid, + [&](const auto& ctor) { + ctor( + bid.pid, ss::make_lw_shared()); + }) + ->second; + + // critical section, rm_stm evaluates the request its order and + // whether or not it's duplicated + // ASSUMING THAT ALL INFLIGHT REPLICATION REQUESTS WILL SUCCEED + // if any of the request fail we expect that the leader step down + auto u = co_await session->lock.get_units(); + + if (!_c->is_leader() || _c->term() != synced_term) { + co_return errc::not_leader; + } + + if (session->term < synced_term) { + // we don't care about old inflight requests because the sync + // guarantee (all replicated records of the last term are + // applied) makes sure that the passed inflight records got + // into _log_state->seq_table + session->forget(); + session->term = synced_term; + } + + if (session->term > synced_term) { + co_return errc::not_leader; + } + + // checking if the request (identified by seq) is already resolved + // checking among the pending requests + auto cached_r = session->known_seq(bid.last_seq); + if (cached_r) { + co_return cached_r.value(); + } + // checking among the responded requests auto cached_offset = known_seq(bid); if (cached_offset) { - if (cached_offset.value() < model::offset{0}) { + co_return raft::replicate_result{.last_offset = cached_offset.value()}; + } + + // checking if the request is already being processed + for (auto& inflight : session->cache) { + if (inflight->last_seq == bid.last_seq && inflight->is_processing) { + // found an inflight request, parking the current request + // until the former is resolved + auto promise = ss::make_lw_shared< + available_promise>>(); + inflight->parked.push_back(promise); + u.return_all(); + co_return co_await promise->get_future(); + } + } + + // apparantly we process an unseen request + // checking if it isn't out of order with the already procceses + // or inflight requests + if (session->cache.size() == 0) { + // no inflight requests > checking processed + auto tail = tail_seq(bid.pid); + if (tail) { + if (!is_sequence(tail.value(), bid.first_seq)) { + // there is a gap between the request'ss seq number + // and the seq number of the latest processed request + co_return errc::sequence_out_of_order; + } + } else { + if (bid.first_seq != 0) { + // first request in a session should have seq=0 + co_return errc::sequence_out_of_order; + } + } + } else { + if (!is_sequence(session->tail_seq, bid.first_seq)) { + // there is a gap between the request'ss seq number + // and the seq number of the latest inflight request + co_return errc::sequence_out_of_order; + } + } + + // updating last observed seq, since we hold the mutex + // it's guranteed to be latest + session->tail_seq = bid.last_seq; + + auto request = ss::make_lw_shared(); + request->last_seq = bid.last_seq; + request->is_processing = true; + session->cache.push_back(request); + + // request comes in the right order, it's ok to replicate + ss::lw_shared_ptr ss; + auto has_failed = false; + try { + ss = _c->replicate_in_stages(synced_term, std::move(br), opts); + co_await std::move(ss->request_enqueued); + } catch (...) { + vlog( + clusterlog.warn, + "replication failed with {}", + std::current_exception()); + has_failed = true; + } + + result r = errc::success; + + if (has_failed) { + if (_c->is_leader() && _c->term() == synced_term) { + // we may not care about the requests waiting on the lock + // as soon as we release the lock the leader or term will + // be changed so the pending fibers won't pass the initial + // checks and be rejected with not_leader + co_await _c->step_down(); + } + u.return_all(); + r = errc::replication_error; + } else { + try { + u.return_all(); + enqueued->set_value(); + r = co_await std::move(ss->replicate_finished); + } catch (...) { vlog( clusterlog.warn, - "Status of the original attempt is unknown (still is in-flight " - "or failed). Failing a retried batch with the same pid:{} & " - "seq:{} to avoid duplicates", - bid.pid, - bid.last_seq); - // kafka clients don't automaticly handle fatal errors - // so when they encounter the error they will be forced - // to propagate it to the app layer - co_return errc::generic_tx_error; + "replication failed with {}", + std::current_exception()); + r = errc::replication_error; } - co_return raft::replicate_result{.last_offset = cached_offset.value()}; } - if (!check_seq(bid)) { - co_return errc::sequence_out_of_order; + // we don't need session->lock because we never interleave + // access to is_processing and offset with sync point (await) + request->is_processing = false; + request->r = r; + for (auto& pending : request->parked) { + pending->set_value(r); + } + request->parked.clear(); + + if (!r) { + // if r was failed at the consensus level (not because has_failed) + // it should guarantee that all follow up replication requests fail + // too but just in case stepping down to minimize the risk + if (_c->is_leader() && _c->term() == synced_term) { + co_await _c->step_down(); + } + co_return r; + } + + // requests get into session->cache in seq order so when we iterate + // starting from the beginning and until we meet the first still + // in flight request we may be sure that we update seq_table in + // order to preserve monotonicity and the lack of gaps + while (!session->cache.empty() && !session->cache.front()->is_processing) { + auto front = session->cache.front(); + if (!front->r) { + // just encountered a failed request it but when a request + // fails it causes a step down so we may not care about + // updating seq_table and stop doing it; the next leader's + // apply will do it for us + break; + } + auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid); + if (inserted) { + seq_it->second.pid = bid.pid; + seq_it->second.seq = front->last_seq; + seq_it->second.last_offset = front->r.value().last_offset; + } else { + seq_it->second.update( + front->last_seq, front->r.value().last_offset); + } + session->cache.pop_front(); } - auto r = co_await _c->replicate(synced_term, std::move(br), opts); - if (r) { - set_seq(bid, r.value().last_offset); + + if (session->cache.empty() && session->lock.ready()) { + _inflight_requests.erase(bid.pid); } + co_return r; } diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index aee69f12ebdf..40c450f7817c 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -273,6 +273,7 @@ class rm_stm final : public persisted_stm { std::optional known_seq(model::batch_identity) const; void set_seq(model::batch_identity, model::offset); void reset_seq(model::batch_identity); + std::optional tail_seq(model::producer_identity) const; ss::future> do_replicate( model::batch_identity, @@ -414,22 +415,64 @@ class rm_stm final : public persisted_stm { } }; - template - auto with_request_lock(request_id rid, Func&& f) { - auto lock_it = _request_locks.find(rid); - if (lock_it == _request_locks.end()) { - lock_it - = _request_locks.emplace(rid, ss::make_lw_shared()).first; + // When a request is retried while the first appempt is still + // being replicated the retried request is parked until the + // original request is replicated. + struct inflight_request { + int32_t last_seq{-1}; + result r = errc::success; + bool is_processing; + std::vector< + ss::lw_shared_ptr>>> + parked; + }; + + // Redpanda uses optimistic replication to implement pipelining of + // idempotent replication requests. Just like with non-idempotent + // requests redpanda doesn't wait until previous request is replicated + // before processing the next. + // + // However with idempotency the requests are causally related: seq + // numbers should increase without gaps. So we need to prevent a + // case when a request A's replication starts before a request's B + // replication but then it fails while B's replication passes. + // + // We reply on conditional replication to guarantee that A and B + // share the same term and then on ours Raft's guarantees about order + // if A was enqueued before B within the same term then if A fails + // then B should fail too. + // + // inflight_requests hosts inflight requests before they are resolved + // and form a monotonicly increasing continuation without gaps of + // log_state's seq_table + struct inflight_requests { + mutex lock; + int32_t tail_seq{-1}; + model::term_id term; + ss::circular_buffer> cache; + + void forget() { + for (auto& inflight : cache) { + if (inflight->is_processing) { + for (auto& pending : inflight->parked) { + pending->set_value(errc::generic_tx_error); + } + } + } + cache.clear(); + tail_seq = -1; } - auto r_lock = lock_it->second; - return r_lock->with(std::forward(f)) - .finally([this, r_lock, rid]() { - if (r_lock->waiters() == 0) { - // this fiber is the last holder of the lock - _request_locks.erase(rid); - } - }); - } + + std::optional> + known_seq(int32_t last_seq) const { + for (auto& seq : cache) { + if (seq->last_seq == last_seq && !seq->is_processing) { + return seq->r; + } + } + return std::nullopt; + } + }; ss::lw_shared_ptr get_tx_lock(model::producer_id pid) { auto lock_it = _tx_locks.find(pid); @@ -448,7 +491,10 @@ class rm_stm final : public persisted_stm { ss::basic_rwlock<> _state_lock; absl::flat_hash_map> _tx_locks; - absl::flat_hash_map> _request_locks; + absl::flat_hash_map< + model::producer_identity, + ss::lw_shared_ptr> + _inflight_requests; log_state _log_state; mem_state _mem_state; ss::timer auto_abort_timer; From 68848c1deb6b3b715834fa2a47156ead6e9ef6fb Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Sat, 18 Jun 2022 20:37:27 -0700 Subject: [PATCH 07/11] rm_stm: add leadership transfer coordination Leadership transfer and rm_stm should be coordinated to avoid a possibility of the live lock: - leadership transfer sets _transferring_leadership - it causes a write to fail - failing write initiate leader step down - the step down causes the transfer to fail --- src/v/cluster/partition.h | 6 +++++- src/v/cluster/rm_stm.cc | 9 +++++++++ src/v/cluster/rm_stm.h | 3 +++ src/v/cluster/scheduling/leader_balancer.cc | 6 +++--- src/v/redpanda/admin_server.cc | 8 ++++---- 5 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index a6e0d31dd6f0..31ef78b983b8 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -150,7 +150,11 @@ class partition { ss::future transfer_leadership(std::optional target) { - return _raft->do_transfer_leadership(target); + if (_rm_stm) { + return _rm_stm->transfer_leadership(target); + } else { + return _raft->do_transfer_leadership(target); + } } ss::future diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 2de99fd38774..4766ad986f21 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -790,6 +790,15 @@ ss::future> rm_stm::replicate( return do_replicate(bid, std::move(r), opts, enqueued); } +ss::future +rm_stm::transfer_leadership(std::optional target) { + return _state_lock.hold_write_lock().then( + [this, target](ss::basic_rwlock<>::holder unit) { + return _c->do_transfer_leadership(target).finally( + [u = std::move(unit)] {}); + }); +} + ss::future> rm_stm::do_replicate( model::batch_identity bid, model::record_batch_reader b, diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 40c450f7817c..48f3fa0da964 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -179,6 +179,9 @@ class rm_stm final : public persisted_stm { model::record_batch_reader, raft::replicate_options); + ss::future + transfer_leadership(std::optional); + ss::future<> stop() override; void testing_only_disable_auto_abort() { _is_autoabort_enabled = false; } diff --git a/src/v/cluster/scheduling/leader_balancer.cc b/src/v/cluster/scheduling/leader_balancer.cc index 4768f72cb610..f82a3eea62f6 100644 --- a/src/v/cluster/scheduling/leader_balancer.cc +++ b/src/v/cluster/scheduling/leader_balancer.cc @@ -626,8 +626,8 @@ leader_balancer::do_transfer_local(reassignment transfer) const { } auto shard = _shard_table.local().shard_for(transfer.group); auto func = [transfer, shard](cluster::partition_manager& pm) { - auto consensus = pm.consensus_for(transfer.group); - if (!consensus) { + auto partition = pm.partition_for(transfer.group); + if (!partition) { vlog( clusterlog.info, "Cannot complete group {} leader transfer: group instance " @@ -636,7 +636,7 @@ leader_balancer::do_transfer_local(reassignment transfer) const { shard); return ss::make_ready_future(false); } - return consensus->do_transfer_leadership(transfer.to.node_id) + return partition->transfer_leadership(transfer.to.node_id) .then([group = transfer.group](std::error_code err) { if (err) { vlog( diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index 1ca85c7a733a..907ab11c386a 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -1142,12 +1142,12 @@ void admin_server::register_raft_routes() { shard, [group_id, target, this, req = std::move(req)]( cluster::partition_manager& pm) mutable { - auto consensus = pm.consensus_for(group_id); - if (!consensus) { + auto partition = pm.partition_for(group_id); + if (!partition) { throw ss::httpd::not_found_exception(); } - const auto ntp = consensus->ntp(); - return consensus->do_transfer_leadership(target).then( + const auto ntp = partition->ntp(); + return partition->transfer_leadership(target).then( [this, req = std::move(req), ntp](std::error_code err) -> ss::future { co_await throw_on_error(*req, err, ntp); From 1064521aee5778dea34c372b0a7d1c112d6f3268 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Sat, 18 Jun 2022 21:53:44 -0700 Subject: [PATCH 08/11] admin: rename target to target_id --- tests/rptest/services/admin.py | 9 ++++++--- tests/rptest/tests/group_membership_test.py | 11 ++++++----- tests/rptest/tests/prefix_truncate_recovery_test.py | 2 +- tests/rptest/tests/tx_admin_api_test.py | 2 +- 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index 1d8a47bc148b..dea1dada30f9 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -542,15 +542,18 @@ def get_partition_leader(self, *, namespace, topic, partition, node=None): return partition_info['leader_id'] - def transfer_leadership_to(self, *, namespace, topic, partition, target): + def transfer_leadership_to(self, + *, + namespace, + topic, + partition, + target_id=None): """ Looks up current ntp leader and transfer leadership to target node, this operations is NOP when current leader is the same as target. If user pass None for target this function will choose next replica for new leader. If leadership transfer was performed this function return True """ - target_id = self.redpanda.idx(target) if isinstance( - target, ClusterNode) else target # check which node is current leader diff --git a/tests/rptest/tests/group_membership_test.py b/tests/rptest/tests/group_membership_test.py index 46b161962afd..389bc1cf3a6b 100644 --- a/tests/rptest/tests/group_membership_test.py +++ b/tests/rptest/tests/group_membership_test.py @@ -66,7 +66,7 @@ def _transfer_with_retry(self, namespace, topic, partition, target_id): admin.transfer_leadership_to(namespace=namespace, topic=topic, partition=partition, - target=target_id) + target_id=target_id) except requests.exceptions.HTTPError as e: if e.response.status_code == 503: time.sleep(1) @@ -321,10 +321,11 @@ def transfer_leadership(new_leader): """ self.logger.debug( f"Transferring leadership to {new_leader.account.hostname}") - admin.transfer_leadership_to(namespace="kafka", - topic="__consumer_offsets", - partition=0, - target=self.redpanda.idx(new_leader)) + admin.transfer_leadership_to( + namespace="kafka", + topic="__consumer_offsets", + partition=0, + target_id=self.redpanda.idx(new_leader)) for _ in range(3): # re-check a few times leader = get_group_leader() self.logger.debug(f"Current leader: {leader}") diff --git a/tests/rptest/tests/prefix_truncate_recovery_test.py b/tests/rptest/tests/prefix_truncate_recovery_test.py index 818d6dcb9504..7d9e559a6835 100644 --- a/tests/rptest/tests/prefix_truncate_recovery_test.py +++ b/tests/rptest/tests/prefix_truncate_recovery_test.py @@ -139,7 +139,7 @@ def verify_offsets(self): admin.transfer_leadership_to(namespace="kafka", topic=self.topic, partition=0, - target=node) + target_id=self.redpanda.idx(node)) # % ERROR: offsets_for_times failed: Local: Unknown partition # may occur here presumably because there is an interaction # with leadership transfer. the built-in retries in list_offsets diff --git a/tests/rptest/tests/tx_admin_api_test.py b/tests/rptest/tests/tx_admin_api_test.py index a33ab5f631a6..1661250c5e22 100644 --- a/tests/rptest/tests/tx_admin_api_test.py +++ b/tests/rptest/tests/tx_admin_api_test.py @@ -125,7 +125,7 @@ def test_expired_transaction(self): self.admin.transfer_leadership_to(namespace="kafka", topic=topic, partition=partition, - target=None) + target_id=None) def leader_is_changed(): new_leader = self.admin.get_partition_leader( From f15e3d87b886d96639c3b685cc22923bdbb8fb88 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Sat, 18 Jun 2022 21:57:43 -0700 Subject: [PATCH 09/11] admin: make transfer_leadership_to more stable making transfer_leadership_to more stable by checking that all nodes has updated metadata. previously we could choose a stale server, get 503 and retry the requests --- tests/rptest/services/admin.py | 43 +++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index dea1dada30f9..fd81762fb035 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -187,17 +187,19 @@ def _get_stable_configuration( return None info = PartitionDetails() info.status = status - info.leader = last_leader + info.leader = int(last_leader) info.replicas = replicas return info def wait_stable_configuration( self, topic, + *, partition=0, namespace="kafka", replication=None, timeout_s=10, + backoff_s=1, hosts: Optional[list[str]] = None) -> PartitionDetails: """ Method waits for timeout_s until the configuration is stable and returns it. @@ -231,7 +233,7 @@ def get_stable_configuration(): return wait_until_result( get_stable_configuration, timeout_sec=timeout_s, - backoff_sec=1, + backoff_sec=backoff_s, err_msg= f"can't fetch stable replicas for {namespace}/{topic}/{partition} within {timeout_s} sec" ) @@ -242,6 +244,7 @@ def await_stable_leader(self, namespace="kafka", replication=None, timeout_s=10, + backoff_s=1, hosts: Optional[list[str]] = None, check: Callable[[int], bool] = lambda node_id: True): @@ -252,9 +255,13 @@ def await_stable_leader(self, When the timeout is exhaust it throws TimeoutException """ def is_leader_stable(): - info = self.wait_stable_configuration(topic, partition, namespace, - replication, timeout_s, - hosts) + info = self.wait_stable_configuration(topic, + partition=partition, + namespace=namespace, + replication=replication, + timeout_s=timeout_s, + hosts=hosts, + backoff_s=backoff_s) if check(info.leader): return True, info.leader return False @@ -262,7 +269,7 @@ def is_leader_stable(): return wait_until_result( is_leader_stable, timeout_sec=timeout_s, - backoff_sec=1, + backoff_sec=backoff_s, err_msg= f"can't get stable leader of {namespace}/{topic}/{partition} within {timeout_s} sec" ) @@ -547,16 +554,14 @@ def transfer_leadership_to(self, namespace, topic, partition, - target_id=None): + target_id=None, + leader_id=None): """ Looks up current ntp leader and transfer leadership to target node, this operations is NOP when current leader is the same as target. If user pass None for target this function will choose next replica for new leader. If leadership transfer was performed this function return True """ - - # check which node is current leader - def _get_details(): p = self.get_partitions(topic=topic, partition=partition, @@ -565,25 +570,25 @@ def _get_details(): f"ntp {namespace}/{topic}/{partition} details: {p}") return p - def _has_leader(): - return self.get_partition_leader( - namespace=namespace, topic=topic, partition=partition) != -1 + # check which node is current leader - wait_until(_has_leader, - timeout_sec=30, - backoff_sec=2, - err_msg="Failed to establish current leader") + if leader_id == None: + leader_id = self.await_stable_leader(topic, + partition=partition, + namespace=namespace, + timeout_s=30, + backoff_s=2) details = _get_details() if target_id is not None: - if details['leader_id'] == target_id: + if leader_id == target_id: return False path = f"raft/{details['raft_group_id']}/transfer_leadership?target={target_id}" else: path = f"raft/{details['raft_group_id']}/transfer_leadership" - leader = self.redpanda.get_node(details['leader_id']) + leader = self.redpanda.get_node(leader_id) ret = self._request('post', path=path, node=leader) return ret.status_code == 200 From 9ccbfceb4b5e7abae33b2fdbdeab507795e081e1 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Sat, 18 Jun 2022 22:04:40 -0700 Subject: [PATCH 10/11] raft_availability_test: use stable leadership transfer --- tests/rptest/tests/raft_availability_test.py | 68 +++++++++++--------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/tests/rptest/tests/raft_availability_test.py b/tests/rptest/tests/raft_availability_test.py index 10d3943b54a5..88607f3c8df8 100644 --- a/tests/rptest/tests/raft_availability_test.py +++ b/tests/rptest/tests/raft_availability_test.py @@ -13,6 +13,7 @@ import random import ducktape.errors from typing import Optional +import requests from ducktape.mark import parametrize from ducktape.utils.util import wait_until @@ -123,41 +124,48 @@ def _expect_available(self): self.logger.info("Cluster is available as expected") def _transfer_leadership(self, admin: Admin, namespace: str, topic: str, - target_node_id: int) -> None: - - last_log_msg = "" # avoid spamming log - - def leader_predicate(l: Optional[int]) -> bool: - nonlocal last_log_msg, target_node_id - if not l: - return False - if l != target_node_id: # type: ignore - log_msg = f'Still waiting for leader {target_node_id}, got {l}' - if log_msg != last_log_msg: # type: ignore # "unbound" - self.logger.info(log_msg) - last_log_msg = log_msg - return False - return True + target_id: int) -> None: - retry_once = True + count = 0 while True: - self.logger.info(f"Starting transfer to {target_node_id}") - admin.partition_transfer_leadership("kafka", topic, 0, - target_node_id) + count += 1 + self.logger.info(f"Waiting for a leader") + leader_id = admin.await_stable_leader(topic, + partition=0, + namespace=namespace, + timeout_s=30, + backoff_s=2) + self.logger.info(f"Current leader {leader_id}") + + if leader_id == target_id: + return + + self.logger.info(f"Starting transfer to {target_id}") + requests.exceptions.HTTPError try: - self._wait_for_leader(leader_predicate, - timeout=ELECTION_TIMEOUT * 2) - except ducktape.errors.TimeoutError as e: - if retry_once: + admin.transfer_leadership_to(topic=topic, + namespace=namespace, + partition=0, + target_id=target_id, + leader_id=leader_id) + except requests.exceptions.HTTPError as e: + if count <= 10 and e.response.status_code == 503: self.logger.info( - f'Failed to get desired leader, retrying once.') - retry_once = False + f"Got 503: {leader_id}'s metadata hasn't been updated yet" + ) + time.sleep(1) continue - else: - raise e - break # no exception -> success, we can return now + raise + break - self.logger.info(f"Completed transfer to {target_node_id}") + admin.await_stable_leader(topic, + partition=0, + namespace=namespace, + timeout_s=ELECTION_TIMEOUT * 4, + backoff_s=2, + check=lambda node_id: node_id != leader_id) + + self.logger.info(f"Completed transfer to {target_id}") @cluster(num_nodes=3) def test_one_node_down(self): @@ -404,7 +412,7 @@ def test_leader_transfers_recovery(self, acks): # is tripping up the cluster when we have so many leadership transfers. # https://github.com/redpanda-data/redpanda/issues/2623 - admin = Admin(self.redpanda) + admin = Admin(self.redpanda, retry_codes=[]) initial_leader_id = leader_node_id for n in range(0, transfer_count): From b4a5fb276e673e7bbecbcc7ee91c5e5be05cb26e Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Sat, 18 Jun 2022 22:06:08 -0700 Subject: [PATCH 11/11] ducky: make wait_until faster temporary adding a faster version of wait_until before the ducktape repo is updated --- tests/rptest/services/admin.py | 1 - tests/rptest/tests/raft_availability_test.py | 2 +- tests/rptest/util.py | 28 +++++++++++++++++++- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index fd81762fb035..80e02a13d204 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -14,7 +14,6 @@ from requests.adapters import HTTPAdapter from requests.exceptions import RequestException from requests.packages.urllib3.util.retry import Retry -from ducktape.utils.util import wait_until from ducktape.cluster.cluster import ClusterNode from typing import Optional, Callable, NamedTuple from rptest.util import wait_until_result diff --git a/tests/rptest/tests/raft_availability_test.py b/tests/rptest/tests/raft_availability_test.py index 88607f3c8df8..fc7c05a22142 100644 --- a/tests/rptest/tests/raft_availability_test.py +++ b/tests/rptest/tests/raft_availability_test.py @@ -16,7 +16,7 @@ import requests from ducktape.mark import parametrize -from ducktape.utils.util import wait_until +from rptest.util import wait_until from rptest.clients.kafka_cat import KafkaCat from rptest.clients.rpk import RpkTool, RpkException diff --git a/tests/rptest/util.py b/tests/rptest/util.py index e7b22ba055ba..0049cda0cc86 100644 --- a/tests/rptest/util.py +++ b/tests/rptest/util.py @@ -11,8 +11,9 @@ from contextlib import contextmanager from requests.exceptions import HTTPError -from ducktape.utils.util import wait_until from rptest.clients.kafka_cli_tools import KafkaCliTools +from ducktape.errors import TimeoutError +import time class Scale: @@ -47,6 +48,31 @@ def release(self): return self._scale == Scale.RELEASE +def wait_until(condition, + timeout_sec, + backoff_sec=.1, + err_msg="", + retry_on_exc=False): + start = time.time() + stop = start + timeout_sec + last_exception = None + while time.time() < stop: + try: + if condition(): + return + else: + last_exception = None + except BaseException as e: + last_exception = e + if not retry_on_exc: + raise e + time.sleep(backoff_sec) + + # it is safe to call Exception from None - will be just treated as a normal exception + raise TimeoutError( + err_msg() if callable(err_msg) else err_msg) from last_exception + + def wait_until_result(condition, *args, **kwargs): """ a near drop-in replacement for ducktape's wait_util except that when