diff --git a/src/v/cluster/errc.h b/src/v/cluster/errc.h index 426b60535477..1f85cad7c67c 100644 --- a/src/v/cluster/errc.h +++ b/src/v/cluster/errc.h @@ -59,6 +59,7 @@ enum class errc : int16_t { error_collecting_health_report, leadership_changed, feature_disabled, + invalid_request, }; struct errc_category final : public std::error_category { const char* name() const noexcept final { return "cluster::errc"; } @@ -167,6 +168,8 @@ struct errc_category final : public std::error_category { "to finish"; case errc::feature_disabled: return "Requested feature is disabled"; + case errc::invalid_request: + return "Invalid request"; } return "cluster::errc::unknown"; } diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 7411a7aaace6..8350845f7d95 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -181,18 +181,7 @@ raft::replicate_stages partition::replicate_in_stages( } if (_rm_stm) { - ss::promise<> p; - auto f = p.get_future(); - auto replicate_finished - = _rm_stm->replicate(bid, std::move(r), opts) - .then( - [p = std::move(p)](result res) mutable { - p.set_value(); - return res; - }); - return raft::replicate_stages( - std::move(f), std::move(replicate_finished)); - + return _rm_stm->replicate_in_stages(bid, std::move(r), opts); } else { return _raft->replicate_in_stages(std::move(r), opts); } diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index a6e0d31dd6f0..31ef78b983b8 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -150,7 +150,11 @@ class partition { ss::future transfer_leadership(std::optional target) { - return _raft->do_transfer_leadership(target); + if (_rm_stm) { + return _rm_stm->transfer_leadership(target); + } else { + return _raft->do_transfer_leadership(target); + } } ss::future diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 986a853a876b..4766ad986f21 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -760,12 +760,52 @@ ss::future rm_stm::do_abort_tx( co_return tx_errc::none; } +raft::replicate_stages rm_stm::replicate_in_stages( + model::batch_identity bid, + model::record_batch_reader r, + raft::replicate_options opts) { + auto enqueued = ss::make_lw_shared>(); + auto f = enqueued->get_future(); + auto replicate_finished + = do_replicate(bid, std::move(r), opts, enqueued).finally([enqueued] { + // we should avoid situations when replicate_finished is set while + // enqueued isn't because it leads to hanging produce requests and + // the resource leaks. since staged replication is an optimization + // and setting enqueued only after replicate_finished is already + // set doesn't have sematic implications adding this post + // replicate_finished as a safety measure in case enqueued isn't + // set explicitly + if (!enqueued->available()) { + enqueued->set_value(); + } + }); + return raft::replicate_stages(std::move(f), std::move(replicate_finished)); +} + ss::future> rm_stm::replicate( model::batch_identity bid, - model::record_batch_reader b, + model::record_batch_reader r, raft::replicate_options opts) { + auto enqueued = ss::make_lw_shared>(); + return do_replicate(bid, std::move(r), opts, enqueued); +} + +ss::future +rm_stm::transfer_leadership(std::optional target) { + return _state_lock.hold_write_lock().then( + [this, target](ss::basic_rwlock<>::holder unit) { + return _c->do_transfer_leadership(target).finally( + [u = std::move(unit)] {}); + }); +} + +ss::future> rm_stm::do_replicate( + model::batch_identity bid, + model::record_batch_reader b, + raft::replicate_options opts, + ss::lw_shared_ptr> enqueued) { return _state_lock.hold_read_lock().then( - [this, bid, opts, b = std::move(b)]( + [this, enqueued, bid, opts, b = std::move(b)]( ss::basic_rwlock<>::holder unit) mutable { if (bid.is_transactional) { auto pid = bid.pid.get_id(); @@ -774,24 +814,12 @@ ss::future> rm_stm::replicate( return replicate_tx(bid, std::move(b)); }) .finally([u = std::move(unit)] {}); - } else if (bid.has_idempotent() && bid.first_seq <= bid.last_seq) { - request_id rid{.pid = bid.pid, .seq = bid.first_seq}; - return with_request_lock( - rid, - [this, bid, opts, b = std::move(b)]() mutable { - return replicate_seq(bid, std::move(b), opts); - }) + } else if (bid.has_idempotent()) { + return replicate_seq(bid, std::move(b), opts, enqueued) .finally([u = std::move(unit)] {}); } - return sync(_sync_timeout) - .then([this, opts, b = std::move(b)](bool is_synced) mutable { - if (!is_synced) { - return ss::make_ready_future< - result>(errc::not_leader); - } - return _c->replicate(std::move(b), opts); - }) + return replicate_msg(std::move(b), opts, enqueued) .finally([u = std::move(unit)] {}); }); } @@ -921,6 +949,14 @@ rm_stm::known_seq(model::batch_identity bid) const { return std::nullopt; } +std::optional rm_stm::tail_seq(model::producer_identity pid) const { + auto pid_seq = _log_state.seq_table.find(pid); + if (pid_seq == _log_state.seq_table.end()) { + return std::nullopt; + } + return pid_seq->second.seq; +} + void rm_stm::set_seq(model::batch_identity bid, model::offset last_offset) { auto pid_seq = _log_state.seq_table.find(bid.pid); if (pid_seq != _log_state.seq_table.end()) { @@ -1064,40 +1100,222 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { ss::future> rm_stm::replicate_seq( model::batch_identity bid, model::record_batch_reader br, - raft::replicate_options opts) { + raft::replicate_options opts, + ss::lw_shared_ptr> enqueued) { if (!co_await sync(_sync_timeout)) { + // it's ok not to set enqueued on early return because + // the safety check in replicate_in_stages sets it automatically co_return errc::not_leader; } auto synced_term = _insync_term; + if (bid.first_seq > bid.last_seq) { + vlog( + clusterlog.warn, + "first_seq={} of the batch should be less or equal to last_seq={}", + bid.first_seq, + bid.last_seq); + co_return errc::invalid_request; + } + + // getting session by client's pid + auto session = _inflight_requests + .lazy_emplace( + bid.pid, + [&](const auto& ctor) { + ctor( + bid.pid, ss::make_lw_shared()); + }) + ->second; + + // critical section, rm_stm evaluates the request its order and + // whether or not it's duplicated + // ASSUMING THAT ALL INFLIGHT REPLICATION REQUESTS WILL SUCCEED + // if any of the request fail we expect that the leader step down + auto u = co_await session->lock.get_units(); + + if (!_c->is_leader() || _c->term() != synced_term) { + co_return errc::not_leader; + } + + if (session->term < synced_term) { + // we don't care about old inflight requests because the sync + // guarantee (all replicated records of the last term are + // applied) makes sure that the passed inflight records got + // into _log_state->seq_table + session->forget(); + session->term = synced_term; + } + + if (session->term > synced_term) { + co_return errc::not_leader; + } + + // checking if the request (identified by seq) is already resolved + // checking among the pending requests + auto cached_r = session->known_seq(bid.last_seq); + if (cached_r) { + co_return cached_r.value(); + } + // checking among the responded requests auto cached_offset = known_seq(bid); if (cached_offset) { - if (cached_offset.value() < model::offset{0}) { + co_return raft::replicate_result{.last_offset = cached_offset.value()}; + } + + // checking if the request is already being processed + for (auto& inflight : session->cache) { + if (inflight->last_seq == bid.last_seq && inflight->is_processing) { + // found an inflight request, parking the current request + // until the former is resolved + auto promise = ss::make_lw_shared< + available_promise>>(); + inflight->parked.push_back(promise); + u.return_all(); + co_return co_await promise->get_future(); + } + } + + // apparantly we process an unseen request + // checking if it isn't out of order with the already procceses + // or inflight requests + if (session->cache.size() == 0) { + // no inflight requests > checking processed + auto tail = tail_seq(bid.pid); + if (tail) { + if (!is_sequence(tail.value(), bid.first_seq)) { + // there is a gap between the request'ss seq number + // and the seq number of the latest processed request + co_return errc::sequence_out_of_order; + } + } else { + if (bid.first_seq != 0) { + // first request in a session should have seq=0 + co_return errc::sequence_out_of_order; + } + } + } else { + if (!is_sequence(session->tail_seq, bid.first_seq)) { + // there is a gap between the request'ss seq number + // and the seq number of the latest inflight request + co_return errc::sequence_out_of_order; + } + } + + // updating last observed seq, since we hold the mutex + // it's guranteed to be latest + session->tail_seq = bid.last_seq; + + auto request = ss::make_lw_shared(); + request->last_seq = bid.last_seq; + request->is_processing = true; + session->cache.push_back(request); + + // request comes in the right order, it's ok to replicate + ss::lw_shared_ptr ss; + auto has_failed = false; + try { + ss = _c->replicate_in_stages(synced_term, std::move(br), opts); + co_await std::move(ss->request_enqueued); + } catch (...) { + vlog( + clusterlog.warn, + "replication failed with {}", + std::current_exception()); + has_failed = true; + } + + result r = errc::success; + + if (has_failed) { + if (_c->is_leader() && _c->term() == synced_term) { + // we may not care about the requests waiting on the lock + // as soon as we release the lock the leader or term will + // be changed so the pending fibers won't pass the initial + // checks and be rejected with not_leader + co_await _c->step_down(); + } + u.return_all(); + r = errc::replication_error; + } else { + try { + u.return_all(); + enqueued->set_value(); + r = co_await std::move(ss->replicate_finished); + } catch (...) { vlog( clusterlog.warn, - "Status of the original attempt is unknown (still is in-flight " - "or failed). Failing a retried batch with the same pid:{} & " - "seq:{} to avoid duplicates", - bid.pid, - bid.last_seq); - // kafka clients don't automaticly handle fatal errors - // so when they encounter the error they will be forced - // to propagate it to the app layer - co_return errc::generic_tx_error; + "replication failed with {}", + std::current_exception()); + r = errc::replication_error; } - co_return raft::replicate_result{.last_offset = cached_offset.value()}; } - if (!check_seq(bid)) { - co_return errc::sequence_out_of_order; + // we don't need session->lock because we never interleave + // access to is_processing and offset with sync point (await) + request->is_processing = false; + request->r = r; + for (auto& pending : request->parked) { + pending->set_value(r); + } + request->parked.clear(); + + if (!r) { + // if r was failed at the consensus level (not because has_failed) + // it should guarantee that all follow up replication requests fail + // too but just in case stepping down to minimize the risk + if (_c->is_leader() && _c->term() == synced_term) { + co_await _c->step_down(); + } + co_return r; + } + + // requests get into session->cache in seq order so when we iterate + // starting from the beginning and until we meet the first still + // in flight request we may be sure that we update seq_table in + // order to preserve monotonicity and the lack of gaps + while (!session->cache.empty() && !session->cache.front()->is_processing) { + auto front = session->cache.front(); + if (!front->r) { + // just encountered a failed request it but when a request + // fails it causes a step down so we may not care about + // updating seq_table and stop doing it; the next leader's + // apply will do it for us + break; + } + auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid); + if (inserted) { + seq_it->second.pid = bid.pid; + seq_it->second.seq = front->last_seq; + seq_it->second.last_offset = front->r.value().last_offset; + } else { + seq_it->second.update( + front->last_seq, front->r.value().last_offset); + } + session->cache.pop_front(); } - auto r = co_await _c->replicate(synced_term, std::move(br), opts); - if (r) { - set_seq(bid, r.value().last_offset); + + if (session->cache.empty() && session->lock.ready()) { + _inflight_requests.erase(bid.pid); } + co_return r; } +ss::future> rm_stm::replicate_msg( + model::record_batch_reader br, + raft::replicate_options opts, + ss::lw_shared_ptr> enqueued) { + if (!co_await sync(_sync_timeout)) { + co_return errc::not_leader; + } + + auto ss = _c->replicate_in_stages(_insync_term, std::move(br), opts); + co_await std::move(ss.request_enqueued); + enqueued->set_value(); + co_return co_await std::move(ss.replicate_finished); +} + model::offset rm_stm::last_stable_offset() { auto first_tx_start = model::offset::max(); diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 5f2d7e4461d2..48f3fa0da964 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -22,6 +22,7 @@ #include "raft/state_machine.h" #include "raft/types.h" #include "storage/snapshot.h" +#include "utils/available_promise.h" #include "utils/expiring_promise.h" #include "utils/mutex.h" @@ -168,11 +169,19 @@ class rm_stm final : public persisted_stm { ss::future> aborted_transactions(model::offset, model::offset); + raft::replicate_stages replicate_in_stages( + model::batch_identity, + model::record_batch_reader, + raft::replicate_options); + ss::future> replicate( model::batch_identity, model::record_batch_reader, raft::replicate_options); + ss::future + transfer_leadership(std::optional); + ss::future<> stop() override; void testing_only_disable_auto_abort() { _is_autoabort_enabled = false; } @@ -267,6 +276,13 @@ class rm_stm final : public persisted_stm { std::optional known_seq(model::batch_identity) const; void set_seq(model::batch_identity, model::offset); void reset_seq(model::batch_identity); + std::optional tail_seq(model::producer_identity) const; + + ss::future> do_replicate( + model::batch_identity, + model::record_batch_reader, + raft::replicate_options, + ss::lw_shared_ptr>); ss::future> replicate_tx(model::batch_identity, model::record_batch_reader); @@ -274,7 +290,13 @@ class rm_stm final : public persisted_stm { ss::future> replicate_seq( model::batch_identity, model::record_batch_reader, - raft::replicate_options); + raft::replicate_options, + ss::lw_shared_ptr>); + + ss::future> replicate_msg( + model::record_batch_reader, + raft::replicate_options, + ss::lw_shared_ptr>); void compact_snapshot(); @@ -396,22 +418,64 @@ class rm_stm final : public persisted_stm { } }; - template - auto with_request_lock(request_id rid, Func&& f) { - auto lock_it = _request_locks.find(rid); - if (lock_it == _request_locks.end()) { - lock_it - = _request_locks.emplace(rid, ss::make_lw_shared()).first; + // When a request is retried while the first appempt is still + // being replicated the retried request is parked until the + // original request is replicated. + struct inflight_request { + int32_t last_seq{-1}; + result r = errc::success; + bool is_processing; + std::vector< + ss::lw_shared_ptr>>> + parked; + }; + + // Redpanda uses optimistic replication to implement pipelining of + // idempotent replication requests. Just like with non-idempotent + // requests redpanda doesn't wait until previous request is replicated + // before processing the next. + // + // However with idempotency the requests are causally related: seq + // numbers should increase without gaps. So we need to prevent a + // case when a request A's replication starts before a request's B + // replication but then it fails while B's replication passes. + // + // We reply on conditional replication to guarantee that A and B + // share the same term and then on ours Raft's guarantees about order + // if A was enqueued before B within the same term then if A fails + // then B should fail too. + // + // inflight_requests hosts inflight requests before they are resolved + // and form a monotonicly increasing continuation without gaps of + // log_state's seq_table + struct inflight_requests { + mutex lock; + int32_t tail_seq{-1}; + model::term_id term; + ss::circular_buffer> cache; + + void forget() { + for (auto& inflight : cache) { + if (inflight->is_processing) { + for (auto& pending : inflight->parked) { + pending->set_value(errc::generic_tx_error); + } + } + } + cache.clear(); + tail_seq = -1; } - auto r_lock = lock_it->second; - return r_lock->with(std::forward(f)) - .finally([this, r_lock, rid]() { - if (r_lock->waiters() == 0) { - // this fiber is the last holder of the lock - _request_locks.erase(rid); - } - }); - } + + std::optional> + known_seq(int32_t last_seq) const { + for (auto& seq : cache) { + if (seq->last_seq == last_seq && !seq->is_processing) { + return seq->r; + } + } + return std::nullopt; + } + }; ss::lw_shared_ptr get_tx_lock(model::producer_id pid) { auto lock_it = _tx_locks.find(pid); @@ -430,7 +494,10 @@ class rm_stm final : public persisted_stm { ss::basic_rwlock<> _state_lock; absl::flat_hash_map> _tx_locks; - absl::flat_hash_map> _request_locks; + absl::flat_hash_map< + model::producer_identity, + ss::lw_shared_ptr> + _inflight_requests; log_state _log_state; mem_state _mem_state; ss::timer auto_abort_timer; diff --git a/src/v/cluster/scheduling/leader_balancer.cc b/src/v/cluster/scheduling/leader_balancer.cc index 4768f72cb610..f82a3eea62f6 100644 --- a/src/v/cluster/scheduling/leader_balancer.cc +++ b/src/v/cluster/scheduling/leader_balancer.cc @@ -626,8 +626,8 @@ leader_balancer::do_transfer_local(reassignment transfer) const { } auto shard = _shard_table.local().shard_for(transfer.group); auto func = [transfer, shard](cluster::partition_manager& pm) { - auto consensus = pm.consensus_for(transfer.group); - if (!consensus) { + auto partition = pm.partition_for(transfer.group); + if (!partition) { vlog( clusterlog.info, "Cannot complete group {} leader transfer: group instance " @@ -636,7 +636,7 @@ leader_balancer::do_transfer_local(reassignment transfer) const { shard); return ss::make_ready_future(false); } - return consensus->do_transfer_leadership(transfer.to.node_id) + return partition->transfer_leadership(transfer.to.node_id) .then([group = transfer.group](std::error_code err) { if (err) { vlog( diff --git a/src/v/kafka/server/errors.h b/src/v/kafka/server/errors.h index 354c80df3b23..e8d86e9da831 100644 --- a/src/v/kafka/server/errors.h +++ b/src/v/kafka/server/errors.h @@ -43,6 +43,8 @@ constexpr error_code map_topic_error_code(cluster::errc code) { return error_code::broker_not_available; case cluster::errc::not_leader: return error_code::not_coordinator; + case cluster::errc::invalid_request: + return error_code::invalid_request; case cluster::errc::replication_error: case cluster::errc::shutting_down: case cluster::errc::join_request_dispatch_error: diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index 546dfa2057ce..687b127c6c0b 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -154,6 +154,8 @@ static error_code map_produce_error_code(std::error_code ec) { return error_code::invalid_producer_epoch; case cluster::errc::sequence_out_of_order: return error_code::out_of_order_sequence_number; + case cluster::errc::invalid_request: + return error_code::invalid_request; default: return error_code::unknown_server_error; } diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 9376da87519b..31326e45cbe2 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -255,6 +255,16 @@ class consensus { }); } + ss::future<> step_down() { + return _op_lock.with([this] { + do_step_down("external_stepdown"); + if (_leader_id) { + _leader_id = std::nullopt; + trigger_leadership_notification(); + } + }); + } + ss::future> timequery(storage::timequery_config cfg); diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index 1ca85c7a733a..907ab11c386a 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -1142,12 +1142,12 @@ void admin_server::register_raft_routes() { shard, [group_id, target, this, req = std::move(req)]( cluster::partition_manager& pm) mutable { - auto consensus = pm.consensus_for(group_id); - if (!consensus) { + auto partition = pm.partition_for(group_id); + if (!partition) { throw ss::httpd::not_found_exception(); } - const auto ntp = consensus->ntp(); - return consensus->do_transfer_leadership(target).then( + const auto ntp = partition->ntp(); + return partition->transfer_leadership(target).then( [this, req = std::move(req), ntp](std::error_code err) -> ss::future { co_await throw_on_error(*req, err, ntp); diff --git a/src/v/utils/available_promise.h b/src/v/utils/available_promise.h index f994b07ea141..a69f20241b1c 100644 --- a/src/v/utils/available_promise.h +++ b/src/v/utils/available_promise.h @@ -14,19 +14,20 @@ #include -template +template class available_promise { public: - ss::future get_future() { return _promise.get_future(); } + ss::future get_future() { return _promise.get_future(); } - void set_value(T&& value) { + template + void set_value(A&&... a) { _available = true; - _promise.set_value(std::move(value)); + _promise.set_value(std::forward(a)...); } bool available() { return _available; } private: bool _available{false}; - ss::promise _promise; + ss::promise _promise; }; diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index 1d8a47bc148b..80e02a13d204 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -14,7 +14,6 @@ from requests.adapters import HTTPAdapter from requests.exceptions import RequestException from requests.packages.urllib3.util.retry import Retry -from ducktape.utils.util import wait_until from ducktape.cluster.cluster import ClusterNode from typing import Optional, Callable, NamedTuple from rptest.util import wait_until_result @@ -187,17 +186,19 @@ def _get_stable_configuration( return None info = PartitionDetails() info.status = status - info.leader = last_leader + info.leader = int(last_leader) info.replicas = replicas return info def wait_stable_configuration( self, topic, + *, partition=0, namespace="kafka", replication=None, timeout_s=10, + backoff_s=1, hosts: Optional[list[str]] = None) -> PartitionDetails: """ Method waits for timeout_s until the configuration is stable and returns it. @@ -231,7 +232,7 @@ def get_stable_configuration(): return wait_until_result( get_stable_configuration, timeout_sec=timeout_s, - backoff_sec=1, + backoff_sec=backoff_s, err_msg= f"can't fetch stable replicas for {namespace}/{topic}/{partition} within {timeout_s} sec" ) @@ -242,6 +243,7 @@ def await_stable_leader(self, namespace="kafka", replication=None, timeout_s=10, + backoff_s=1, hosts: Optional[list[str]] = None, check: Callable[[int], bool] = lambda node_id: True): @@ -252,9 +254,13 @@ def await_stable_leader(self, When the timeout is exhaust it throws TimeoutException """ def is_leader_stable(): - info = self.wait_stable_configuration(topic, partition, namespace, - replication, timeout_s, - hosts) + info = self.wait_stable_configuration(topic, + partition=partition, + namespace=namespace, + replication=replication, + timeout_s=timeout_s, + hosts=hosts, + backoff_s=backoff_s) if check(info.leader): return True, info.leader return False @@ -262,7 +268,7 @@ def is_leader_stable(): return wait_until_result( is_leader_stable, timeout_sec=timeout_s, - backoff_sec=1, + backoff_sec=backoff_s, err_msg= f"can't get stable leader of {namespace}/{topic}/{partition} within {timeout_s} sec" ) @@ -542,18 +548,19 @@ def get_partition_leader(self, *, namespace, topic, partition, node=None): return partition_info['leader_id'] - def transfer_leadership_to(self, *, namespace, topic, partition, target): + def transfer_leadership_to(self, + *, + namespace, + topic, + partition, + target_id=None, + leader_id=None): """ Looks up current ntp leader and transfer leadership to target node, this operations is NOP when current leader is the same as target. If user pass None for target this function will choose next replica for new leader. If leadership transfer was performed this function return True """ - target_id = self.redpanda.idx(target) if isinstance( - target, ClusterNode) else target - - # check which node is current leader - def _get_details(): p = self.get_partitions(topic=topic, partition=partition, @@ -562,25 +569,25 @@ def _get_details(): f"ntp {namespace}/{topic}/{partition} details: {p}") return p - def _has_leader(): - return self.get_partition_leader( - namespace=namespace, topic=topic, partition=partition) != -1 + # check which node is current leader - wait_until(_has_leader, - timeout_sec=30, - backoff_sec=2, - err_msg="Failed to establish current leader") + if leader_id == None: + leader_id = self.await_stable_leader(topic, + partition=partition, + namespace=namespace, + timeout_s=30, + backoff_s=2) details = _get_details() if target_id is not None: - if details['leader_id'] == target_id: + if leader_id == target_id: return False path = f"raft/{details['raft_group_id']}/transfer_leadership?target={target_id}" else: path = f"raft/{details['raft_group_id']}/transfer_leadership" - leader = self.redpanda.get_node(details['leader_id']) + leader = self.redpanda.get_node(leader_id) ret = self._request('post', path=path, node=leader) return ret.status_code == 200 diff --git a/tests/rptest/tests/group_membership_test.py b/tests/rptest/tests/group_membership_test.py index 46b161962afd..389bc1cf3a6b 100644 --- a/tests/rptest/tests/group_membership_test.py +++ b/tests/rptest/tests/group_membership_test.py @@ -66,7 +66,7 @@ def _transfer_with_retry(self, namespace, topic, partition, target_id): admin.transfer_leadership_to(namespace=namespace, topic=topic, partition=partition, - target=target_id) + target_id=target_id) except requests.exceptions.HTTPError as e: if e.response.status_code == 503: time.sleep(1) @@ -321,10 +321,11 @@ def transfer_leadership(new_leader): """ self.logger.debug( f"Transferring leadership to {new_leader.account.hostname}") - admin.transfer_leadership_to(namespace="kafka", - topic="__consumer_offsets", - partition=0, - target=self.redpanda.idx(new_leader)) + admin.transfer_leadership_to( + namespace="kafka", + topic="__consumer_offsets", + partition=0, + target_id=self.redpanda.idx(new_leader)) for _ in range(3): # re-check a few times leader = get_group_leader() self.logger.debug(f"Current leader: {leader}") diff --git a/tests/rptest/tests/prefix_truncate_recovery_test.py b/tests/rptest/tests/prefix_truncate_recovery_test.py index 818d6dcb9504..7d9e559a6835 100644 --- a/tests/rptest/tests/prefix_truncate_recovery_test.py +++ b/tests/rptest/tests/prefix_truncate_recovery_test.py @@ -139,7 +139,7 @@ def verify_offsets(self): admin.transfer_leadership_to(namespace="kafka", topic=self.topic, partition=0, - target=node) + target_id=self.redpanda.idx(node)) # % ERROR: offsets_for_times failed: Local: Unknown partition # may occur here presumably because there is an interaction # with leadership transfer. the built-in retries in list_offsets diff --git a/tests/rptest/tests/raft_availability_test.py b/tests/rptest/tests/raft_availability_test.py index 10d3943b54a5..fc7c05a22142 100644 --- a/tests/rptest/tests/raft_availability_test.py +++ b/tests/rptest/tests/raft_availability_test.py @@ -13,9 +13,10 @@ import random import ducktape.errors from typing import Optional +import requests from ducktape.mark import parametrize -from ducktape.utils.util import wait_until +from rptest.util import wait_until from rptest.clients.kafka_cat import KafkaCat from rptest.clients.rpk import RpkTool, RpkException @@ -123,41 +124,48 @@ def _expect_available(self): self.logger.info("Cluster is available as expected") def _transfer_leadership(self, admin: Admin, namespace: str, topic: str, - target_node_id: int) -> None: - - last_log_msg = "" # avoid spamming log - - def leader_predicate(l: Optional[int]) -> bool: - nonlocal last_log_msg, target_node_id - if not l: - return False - if l != target_node_id: # type: ignore - log_msg = f'Still waiting for leader {target_node_id}, got {l}' - if log_msg != last_log_msg: # type: ignore # "unbound" - self.logger.info(log_msg) - last_log_msg = log_msg - return False - return True + target_id: int) -> None: - retry_once = True + count = 0 while True: - self.logger.info(f"Starting transfer to {target_node_id}") - admin.partition_transfer_leadership("kafka", topic, 0, - target_node_id) + count += 1 + self.logger.info(f"Waiting for a leader") + leader_id = admin.await_stable_leader(topic, + partition=0, + namespace=namespace, + timeout_s=30, + backoff_s=2) + self.logger.info(f"Current leader {leader_id}") + + if leader_id == target_id: + return + + self.logger.info(f"Starting transfer to {target_id}") + requests.exceptions.HTTPError try: - self._wait_for_leader(leader_predicate, - timeout=ELECTION_TIMEOUT * 2) - except ducktape.errors.TimeoutError as e: - if retry_once: + admin.transfer_leadership_to(topic=topic, + namespace=namespace, + partition=0, + target_id=target_id, + leader_id=leader_id) + except requests.exceptions.HTTPError as e: + if count <= 10 and e.response.status_code == 503: self.logger.info( - f'Failed to get desired leader, retrying once.') - retry_once = False + f"Got 503: {leader_id}'s metadata hasn't been updated yet" + ) + time.sleep(1) continue - else: - raise e - break # no exception -> success, we can return now + raise + break - self.logger.info(f"Completed transfer to {target_node_id}") + admin.await_stable_leader(topic, + partition=0, + namespace=namespace, + timeout_s=ELECTION_TIMEOUT * 4, + backoff_s=2, + check=lambda node_id: node_id != leader_id) + + self.logger.info(f"Completed transfer to {target_id}") @cluster(num_nodes=3) def test_one_node_down(self): @@ -404,7 +412,7 @@ def test_leader_transfers_recovery(self, acks): # is tripping up the cluster when we have so many leadership transfers. # https://github.com/redpanda-data/redpanda/issues/2623 - admin = Admin(self.redpanda) + admin = Admin(self.redpanda, retry_codes=[]) initial_leader_id = leader_node_id for n in range(0, transfer_count): diff --git a/tests/rptest/tests/tx_admin_api_test.py b/tests/rptest/tests/tx_admin_api_test.py index a33ab5f631a6..1661250c5e22 100644 --- a/tests/rptest/tests/tx_admin_api_test.py +++ b/tests/rptest/tests/tx_admin_api_test.py @@ -125,7 +125,7 @@ def test_expired_transaction(self): self.admin.transfer_leadership_to(namespace="kafka", topic=topic, partition=partition, - target=None) + target_id=None) def leader_is_changed(): new_leader = self.admin.get_partition_leader( diff --git a/tests/rptest/util.py b/tests/rptest/util.py index e7b22ba055ba..0049cda0cc86 100644 --- a/tests/rptest/util.py +++ b/tests/rptest/util.py @@ -11,8 +11,9 @@ from contextlib import contextmanager from requests.exceptions import HTTPError -from ducktape.utils.util import wait_until from rptest.clients.kafka_cli_tools import KafkaCliTools +from ducktape.errors import TimeoutError +import time class Scale: @@ -47,6 +48,31 @@ def release(self): return self._scale == Scale.RELEASE +def wait_until(condition, + timeout_sec, + backoff_sec=.1, + err_msg="", + retry_on_exc=False): + start = time.time() + stop = start + timeout_sec + last_exception = None + while time.time() < stop: + try: + if condition(): + return + else: + last_exception = None + except BaseException as e: + last_exception = e + if not retry_on_exc: + raise e + time.sleep(backoff_sec) + + # it is safe to call Exception from None - will be just treated as a normal exception + raise TimeoutError( + err_msg() if callable(err_msg) else err_msg) from last_exception + + def wait_until_result(condition, *args, **kwargs): """ a near drop-in replacement for ducktape's wait_util except that when