Skip to content

Commit

Permalink
rm_stm: add leadership transfer coordination
Browse files Browse the repository at this point in the history
Leadership transfer and rm_stm should be coordinated to avoid a
possibility of the live lock:

  - leadership transfer sets _transferring_leadership
  - it causes a write to fail
  - failing write initiate leader step down
  - the step down causes the transfer to fail
  • Loading branch information
rystsov committed Jun 23, 2022
1 parent 88423d7 commit 68848c1
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 8 deletions.
6 changes: 5 additions & 1 deletion src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ class partition {

ss::future<std::error_code>
transfer_leadership(std::optional<model::node_id> target) {
return _raft->do_transfer_leadership(target);
if (_rm_stm) {
return _rm_stm->transfer_leadership(target);
} else {
return _raft->do_transfer_leadership(target);
}
}

ss::future<std::error_code>
Expand Down
9 changes: 9 additions & 0 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,15 @@ ss::future<result<raft::replicate_result>> rm_stm::replicate(
return do_replicate(bid, std::move(r), opts, enqueued);
}

ss::future<std::error_code>
rm_stm::transfer_leadership(std::optional<model::node_id> target) {
return _state_lock.hold_write_lock().then(
[this, target](ss::basic_rwlock<>::holder unit) {
return _c->do_transfer_leadership(target).finally(
[u = std::move(unit)] {});
});
}

ss::future<result<raft::replicate_result>> rm_stm::do_replicate(
model::batch_identity bid,
model::record_batch_reader b,
Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ class rm_stm final : public persisted_stm {
model::record_batch_reader,
raft::replicate_options);

ss::future<std::error_code>
transfer_leadership(std::optional<model::node_id>);

ss::future<> stop() override;

void testing_only_disable_auto_abort() { _is_autoabort_enabled = false; }
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/scheduling/leader_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,8 @@ leader_balancer::do_transfer_local(reassignment transfer) const {
}
auto shard = _shard_table.local().shard_for(transfer.group);
auto func = [transfer, shard](cluster::partition_manager& pm) {
auto consensus = pm.consensus_for(transfer.group);
if (!consensus) {
auto partition = pm.partition_for(transfer.group);
if (!partition) {
vlog(
clusterlog.info,
"Cannot complete group {} leader transfer: group instance "
Expand All @@ -636,7 +636,7 @@ leader_balancer::do_transfer_local(reassignment transfer) const {
shard);
return ss::make_ready_future<bool>(false);
}
return consensus->do_transfer_leadership(transfer.to.node_id)
return partition->transfer_leadership(transfer.to.node_id)
.then([group = transfer.group](std::error_code err) {
if (err) {
vlog(
Expand Down
8 changes: 4 additions & 4 deletions src/v/redpanda/admin_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1142,12 +1142,12 @@ void admin_server::register_raft_routes() {
shard,
[group_id, target, this, req = std::move(req)](
cluster::partition_manager& pm) mutable {
auto consensus = pm.consensus_for(group_id);
if (!consensus) {
auto partition = pm.partition_for(group_id);
if (!partition) {
throw ss::httpd::not_found_exception();
}
const auto ntp = consensus->ntp();
return consensus->do_transfer_leadership(target).then(
const auto ntp = partition->ntp();
return partition->transfer_leadership(target).then(
[this, req = std::move(req), ntp](std::error_code err)
-> ss::future<ss::json::json_return_type> {
co_await throw_on_error(*req, err, ntp);
Expand Down

0 comments on commit 68848c1

Please sign in to comment.