diff --git a/src/v/cluster/partition_balancer_backend.cc b/src/v/cluster/partition_balancer_backend.cc index 05853767a97ec..d04a8b2c73a7a 100644 --- a/src/v/cluster/partition_balancer_backend.cc +++ b/src/v/cluster/partition_balancer_backend.cc @@ -133,18 +133,20 @@ ss::future<> partition_balancer_backend::do_tick() { vlog( clusterlog.info, "violations: {} unavailable nodes, {} full nodes; planned {} " - "reassignments", + "reassignments; cancelled {} reassignments", plan_data.violations.unavailable_nodes.size(), plan_data.violations.full_nodes.size(), - plan_data.reassignments.size()); + plan_data.reassignments.size(), + plan_data.cancellations.size()); } _last_leader_term = _raft0->term(); _last_tick_time = ss::lowres_clock::now(); _last_violations = std::move(plan_data.violations); + if ( - _topic_table.has_updates_in_progress() - || !plan_data.reassignments.empty()) { + _topic_table.has_updates_in_progress() || !plan_data.reassignments.empty() + || !plan_data.cancellations.empty()) { _last_status = partition_balancer_status::in_progress; } else if (plan_data.failed_reassignments_count > 0) { _last_status = partition_balancer_status::stalled; @@ -152,6 +154,23 @@ ss::future<> partition_balancer_backend::do_tick() { _last_status = partition_balancer_status::ready; } + co_await ss::max_concurrent_for_each( + plan_data.cancellations, 32, [this, current_term](model::ntp& ntp) { + vlog(clusterlog.info, "cancel movement for ntp {}", ntp); + return _topics_frontend + .cancel_moving_partition_replicas( + ntp, + model::timeout_clock::now() + add_move_cmd_timeout, + current_term) + .then([ntp = std::move(ntp)](auto errc) { + vlog( + clusterlog.info, + "{} movement cancellation submitted, errc: {}", + ntp, + errc); + }); + }); + co_await ss::max_concurrent_for_each( plan_data.reassignments, 32, diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc index cf77d6117eb05..b008ed4738faa 100644 --- a/src/v/cluster/partition_balancer_planner.cc +++ b/src/v/cluster/partition_balancer_planner.cc @@ -405,6 +405,45 @@ void partition_balancer_planner::get_full_node_reassignments( } } +/* + * Cancel movement if new assignments contains unavailble node + * and previous replica set doesn't contain this node + */ +void partition_balancer_planner::get_unavailable_node_movement_cancellations( + std::vector& cancellations, + const reallocation_request_state& rrs) { + for (auto& update : _topic_table.updates_in_progress()) { + if ( + update.second.state + != topic_table::in_progress_state::update_requested) { + continue; + } + + auto previous_replicas = _topic_table.get_previous_replica_set( + update.first); + absl::flat_hash_set previous_replicas_set; + if (previous_replicas.has_value()) { + for (auto& r : previous_replicas.value()) { + previous_replicas_set.insert(r.node_id); + } + } + + auto current_assignments = _topic_table.get_partition_assignment( + update.first); + if (!current_assignments.has_value()) { + continue; + } + for (auto& r : current_assignments->replicas) { + if ( + rrs.unavailable_nodes.contains(r.node_id) + && !previous_replicas_set.contains(r.node_id)) { + cancellations.push_back(update.first); + continue; + } + } + } +} + partition_balancer_planner::plan_data partition_balancer_planner::plan_reassignments( const cluster_health_report& health_report, @@ -416,6 +455,10 @@ partition_balancer_planner::plan_reassignments( calculate_unavailable_nodes(follower_metrics, rrs, result); calculate_nodes_with_disk_constraints_violation(rrs, result); + if (_topic_table.has_updates_in_progress()) { + get_unavailable_node_movement_cancellations(result.cancellations, rrs); + } + if ( !_topic_table.has_updates_in_progress() && !result.violations.is_empty()) { diff --git a/src/v/cluster/partition_balancer_planner.h b/src/v/cluster/partition_balancer_planner.h index b5818cea1b2a4..121ba081f9077 100644 --- a/src/v/cluster/partition_balancer_planner.h +++ b/src/v/cluster/partition_balancer_planner.h @@ -41,6 +41,7 @@ class partition_balancer_planner { struct plan_data { partition_balancer_violations violations; std::vector reassignments; + std::vector cancellations; size_t failed_reassignments_count = 0; }; @@ -77,6 +78,10 @@ class partition_balancer_planner { void get_full_node_reassignments(plan_data&, reallocation_request_state&); + void get_unavailable_node_movement_cancellations( + std::vector& cancellations, + const reallocation_request_state&); + void calculate_nodes_with_disk_constraints_violation( reallocation_request_state&, plan_data&);