From 68848c1deb6b3b715834fa2a47156ead6e9ef6fb Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Sat, 18 Jun 2022 20:37:27 -0700 Subject: [PATCH] rm_stm: add leadership transfer coordination Leadership transfer and rm_stm should be coordinated to avoid a possibility of the live lock: - leadership transfer sets _transferring_leadership - it causes a write to fail - failing write initiate leader step down - the step down causes the transfer to fail --- src/v/cluster/partition.h | 6 +++++- src/v/cluster/rm_stm.cc | 9 +++++++++ src/v/cluster/rm_stm.h | 3 +++ src/v/cluster/scheduling/leader_balancer.cc | 6 +++--- src/v/redpanda/admin_server.cc | 8 ++++---- 5 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index a6e0d31dd6f0..31ef78b983b8 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -150,7 +150,11 @@ class partition { ss::future transfer_leadership(std::optional target) { - return _raft->do_transfer_leadership(target); + if (_rm_stm) { + return _rm_stm->transfer_leadership(target); + } else { + return _raft->do_transfer_leadership(target); + } } ss::future diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 2de99fd38774..4766ad986f21 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -790,6 +790,15 @@ ss::future> rm_stm::replicate( return do_replicate(bid, std::move(r), opts, enqueued); } +ss::future +rm_stm::transfer_leadership(std::optional target) { + return _state_lock.hold_write_lock().then( + [this, target](ss::basic_rwlock<>::holder unit) { + return _c->do_transfer_leadership(target).finally( + [u = std::move(unit)] {}); + }); +} + ss::future> rm_stm::do_replicate( model::batch_identity bid, model::record_batch_reader b, diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 40c450f7817c..48f3fa0da964 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -179,6 +179,9 @@ class rm_stm final : public persisted_stm { model::record_batch_reader, raft::replicate_options); + ss::future + transfer_leadership(std::optional); + ss::future<> stop() override; void testing_only_disable_auto_abort() { _is_autoabort_enabled = false; } diff --git a/src/v/cluster/scheduling/leader_balancer.cc b/src/v/cluster/scheduling/leader_balancer.cc index 4768f72cb610..f82a3eea62f6 100644 --- a/src/v/cluster/scheduling/leader_balancer.cc +++ b/src/v/cluster/scheduling/leader_balancer.cc @@ -626,8 +626,8 @@ leader_balancer::do_transfer_local(reassignment transfer) const { } auto shard = _shard_table.local().shard_for(transfer.group); auto func = [transfer, shard](cluster::partition_manager& pm) { - auto consensus = pm.consensus_for(transfer.group); - if (!consensus) { + auto partition = pm.partition_for(transfer.group); + if (!partition) { vlog( clusterlog.info, "Cannot complete group {} leader transfer: group instance " @@ -636,7 +636,7 @@ leader_balancer::do_transfer_local(reassignment transfer) const { shard); return ss::make_ready_future(false); } - return consensus->do_transfer_leadership(transfer.to.node_id) + return partition->transfer_leadership(transfer.to.node_id) .then([group = transfer.group](std::error_code err) { if (err) { vlog( diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index 1ca85c7a733a..907ab11c386a 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -1142,12 +1142,12 @@ void admin_server::register_raft_routes() { shard, [group_id, target, this, req = std::move(req)]( cluster::partition_manager& pm) mutable { - auto consensus = pm.consensus_for(group_id); - if (!consensus) { + auto partition = pm.partition_for(group_id); + if (!partition) { throw ss::httpd::not_found_exception(); } - const auto ntp = consensus->ntp(); - return consensus->do_transfer_leadership(target).then( + const auto ntp = partition->ntp(); + return partition->transfer_leadership(target).then( [this, req = std::move(req), ntp](std::error_code err) -> ss::future { co_await throw_on_error(*req, err, ntp);