Skip to content

Commit

Permalink
c/partition_balancer: planner cancelations
Browse files Browse the repository at this point in the history
planner cancel movement to unavailable nodes
  • Loading branch information
ZeDRoman committed Jul 18, 2022
1 parent 4e67b24 commit 8bb7725
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 4 deletions.
27 changes: 23 additions & 4 deletions src/v/cluster/partition_balancer_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,25 +133,44 @@ ss::future<> partition_balancer_backend::do_tick() {
vlog(
clusterlog.info,
"violations: {} unavailable nodes, {} full nodes; planned {} "
"reassignments",
"reassignments; cancelled {} reassignments",
plan_data.violations.unavailable_nodes.size(),
plan_data.violations.full_nodes.size(),
plan_data.reassignments.size());
plan_data.reassignments.size(),
plan_data.cancellations.size());
}

_last_leader_term = _raft0->term();
_last_tick_time = ss::lowres_clock::now();
_last_violations = std::move(plan_data.violations);

if (
_topic_table.has_updates_in_progress()
|| !plan_data.reassignments.empty()) {
_topic_table.has_updates_in_progress() || !plan_data.reassignments.empty()
|| !plan_data.cancellations.empty()) {
_last_status = partition_balancer_status::in_progress;
} else if (plan_data.failed_reassignments_count > 0) {
_last_status = partition_balancer_status::stalled;
} else {
_last_status = partition_balancer_status::ready;
}

co_await ss::max_concurrent_for_each(
plan_data.cancellations, 32, [this, current_term](model::ntp& ntp) {
vlog(clusterlog.info, "cancel movement for ntp {}", ntp);
return _topics_frontend
.cancel_moving_partition_replicas(
ntp,
model::timeout_clock::now() + add_move_cmd_timeout,
current_term)
.then([ntp = std::move(ntp)](auto errc) {
vlog(
clusterlog.info,
"{} movement cancellation submitted, errc: {}",
ntp,
errc);
});
});

co_await ss::max_concurrent_for_each(
plan_data.reassignments,
32,
Expand Down
43 changes: 43 additions & 0 deletions src/v/cluster/partition_balancer_planner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,45 @@ void partition_balancer_planner::get_full_node_reassignments(
}
}

/*
* Cancel movement if new assignments contains unavailble node
* and previous replica set doesn't contain this node
*/
void partition_balancer_planner::get_unavailable_node_movement_cancellations(
std::vector<model::ntp>& cancellations,
const reallocation_request_state& rrs) {
for (auto& update : _topic_table.updates_in_progress()) {
if (
update.second.state
!= topic_table::in_progress_state::update_requested) {
continue;
}

auto previous_replicas = _topic_table.get_previous_replica_set(
update.first);
absl::flat_hash_set<model::node_id> previous_replicas_set;
if (previous_replicas.has_value()) {
for (auto& r : previous_replicas.value()) {
previous_replicas_set.insert(r.node_id);
}
}

auto current_assignments = _topic_table.get_partition_assignment(
update.first);
if (!current_assignments.has_value()) {
continue;
}
for (auto& r : current_assignments->replicas) {
if (
rrs.unavailable_nodes.contains(r.node_id)
&& !previous_replicas_set.contains(r.node_id)) {
cancellations.push_back(update.first);
continue;
}
}
}
}

partition_balancer_planner::plan_data
partition_balancer_planner::plan_reassignments(
const cluster_health_report& health_report,
Expand All @@ -416,6 +455,10 @@ partition_balancer_planner::plan_reassignments(
calculate_unavailable_nodes(follower_metrics, rrs, result);
calculate_nodes_with_disk_constraints_violation(rrs, result);

if (_topic_table.has_updates_in_progress()) {
get_unavailable_node_movement_cancellations(result.cancellations, rrs);
}

if (
!_topic_table.has_updates_in_progress()
&& !result.violations.is_empty()) {
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/partition_balancer_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class partition_balancer_planner {
struct plan_data {
partition_balancer_violations violations;
std::vector<ntp_reassignments> reassignments;
std::vector<model::ntp> cancellations;
size_t failed_reassignments_count = 0;
};

Expand Down Expand Up @@ -77,6 +78,10 @@ class partition_balancer_planner {

void get_full_node_reassignments(plan_data&, reallocation_request_state&);

void get_unavailable_node_movement_cancellations(
std::vector<model::ntp>& cancellations,
const reallocation_request_state&);

void calculate_nodes_with_disk_constraints_violation(
reallocation_request_state&, plan_data&);

Expand Down

0 comments on commit 8bb7725

Please sign in to comment.