diff --git a/src/v/cluster/partition_balancer_backend.cc b/src/v/cluster/partition_balancer_backend.cc index 05853767a97e..d04a8b2c73a7 100644 --- a/src/v/cluster/partition_balancer_backend.cc +++ b/src/v/cluster/partition_balancer_backend.cc @@ -133,18 +133,20 @@ ss::future<> partition_balancer_backend::do_tick() { vlog( clusterlog.info, "violations: {} unavailable nodes, {} full nodes; planned {} " - "reassignments", + "reassignments; cancelled {} reassignments", plan_data.violations.unavailable_nodes.size(), plan_data.violations.full_nodes.size(), - plan_data.reassignments.size()); + plan_data.reassignments.size(), + plan_data.cancellations.size()); } _last_leader_term = _raft0->term(); _last_tick_time = ss::lowres_clock::now(); _last_violations = std::move(plan_data.violations); + if ( - _topic_table.has_updates_in_progress() - || !plan_data.reassignments.empty()) { + _topic_table.has_updates_in_progress() || !plan_data.reassignments.empty() + || !plan_data.cancellations.empty()) { _last_status = partition_balancer_status::in_progress; } else if (plan_data.failed_reassignments_count > 0) { _last_status = partition_balancer_status::stalled; @@ -152,6 +154,23 @@ ss::future<> partition_balancer_backend::do_tick() { _last_status = partition_balancer_status::ready; } + co_await ss::max_concurrent_for_each( + plan_data.cancellations, 32, [this, current_term](model::ntp& ntp) { + vlog(clusterlog.info, "cancel movement for ntp {}", ntp); + return _topics_frontend + .cancel_moving_partition_replicas( + ntp, + model::timeout_clock::now() + add_move_cmd_timeout, + current_term) + .then([ntp = std::move(ntp)](auto errc) { + vlog( + clusterlog.info, + "{} movement cancellation submitted, errc: {}", + ntp, + errc); + }); + }); + co_await ss::max_concurrent_for_each( plan_data.reassignments, 32, diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc index bdbb73a28036..7bd6e5057221 100644 --- a/src/v/cluster/partition_balancer_planner.cc +++ b/src/v/cluster/partition_balancer_planner.cc @@ -405,6 +405,41 @@ void partition_balancer_planner::get_full_node_reassignments( } } +/* + * Cancel movement if new assignments contains unavailble node + * and previous replica set doesn't contain this node + */ +void partition_balancer_planner::get_unavailable_node_movement_cancellations( + std::vector& cancellations, + const reallocation_request_state& rrs) { + for (const auto& update : _topic_table.updates_in_progress()) { + if ( + update.second.state + != topic_table::in_progress_state::update_requested) { + continue; + } + + absl::flat_hash_set previous_replicas_set; + for (const auto& r : update.second.previous_replicas) { + previous_replicas_set.insert(r.node_id); + } + + auto current_assignments = _topic_table.get_partition_assignment( + update.first); + if (!current_assignments.has_value()) { + continue; + } + for (const auto& r : current_assignments->replicas) { + if ( + rrs.unavailable_nodes.contains(r.node_id) + && !previous_replicas_set.contains(r.node_id)) { + cancellations.push_back(update.first); + continue; + } + } + } +} + partition_balancer_planner::plan_data partition_balancer_planner::plan_reassignments( const cluster_health_report& health_report, @@ -416,6 +451,10 @@ partition_balancer_planner::plan_reassignments( calculate_unavailable_nodes(follower_metrics, rrs, result); calculate_nodes_with_disk_constraints_violation(rrs, result); + if (_topic_table.has_updates_in_progress()) { + get_unavailable_node_movement_cancellations(result.cancellations, rrs); + } + if ( !_topic_table.has_updates_in_progress() && !result.violations.is_empty()) { diff --git a/src/v/cluster/partition_balancer_planner.h b/src/v/cluster/partition_balancer_planner.h index b5818cea1b2a..121ba081f907 100644 --- a/src/v/cluster/partition_balancer_planner.h +++ b/src/v/cluster/partition_balancer_planner.h @@ -41,6 +41,7 @@ class partition_balancer_planner { struct plan_data { partition_balancer_violations violations; std::vector reassignments; + std::vector cancellations; size_t failed_reassignments_count = 0; }; @@ -77,6 +78,10 @@ class partition_balancer_planner { void get_full_node_reassignments(plan_data&, reallocation_request_state&); + void get_unavailable_node_movement_cancellations( + std::vector& cancellations, + const reallocation_request_state&); + void calculate_nodes_with_disk_constraints_violation( reallocation_request_state&, plan_data&); diff --git a/src/v/cluster/tests/partition_balancer_planner_fixture.h b/src/v/cluster/tests/partition_balancer_planner_fixture.h index f5e5c075eec7..87191f47bbd1 100644 --- a/src/v/cluster/tests/partition_balancer_planner_fixture.h +++ b/src/v/cluster/tests/partition_balancer_planner_fixture.h @@ -133,6 +133,22 @@ struct partition_balancer_planner_fixture { } } + cluster::move_partition_replicas_cmd make_move_partition_replicas_cmd( + model::ntp ntp, std::vector replica_set) { + return cluster::move_partition_replicas_cmd( + std::move(ntp), std::move(replica_set)); + } + + void move_partition_replicas(cluster::ntp_reassignments& reassignment) { + auto cmd = make_move_partition_replicas_cmd( + reassignment.ntp, + reassignment.allocation_units.get_assignments().front().replicas); + auto res = workers.dispatcher + .apply_update(serialize_cmd(std::move(cmd)).get()) + .get(); + BOOST_REQUIRE_EQUAL(res, cluster::errc::success); + } + std::vector create_follower_metrics(const std::set& unavailable_nodes = {}) { std::vector metrics; diff --git a/src/v/cluster/tests/partition_balancer_planner_test.cc b/src/v/cluster/tests/partition_balancer_planner_test.cc index a0f853955910..bfc5258f6e1d 100644 --- a/src/v/cluster/tests/partition_balancer_planner_test.cc +++ b/src/v/cluster/tests/partition_balancer_planner_test.cc @@ -616,3 +616,58 @@ FIXTURE_TEST(test_lot_of_partitions, partition_balancer_planner_fixture) { BOOST_REQUIRE_EQUAL(node_3_counter, node_4_counter); BOOST_REQUIRE_EQUAL(node_4_counter, movement_batch_partitions_amount / 2); } + +/* + * 4 nodes; 1 topic; 1+1 node down + * Node 3 is down after planning + * Initial + * node_0: partitions: 1; down: True; disk: unfilled; + * node_1: partitions: 1; down: False; disk: unfilled; + * node_2: partitions: 1; down: False; disk: unfilled; + * node_3: partitions: 0; down: False; disk: unfilled; + * Expected + * node_0: partitions: 1; + * node_1: partitions: 1; + * node_2: partitions: 1; + * node_3: partitions: 0; + */ +FIXTURE_TEST(test_node_cancelation, partition_balancer_planner_fixture) { + vlog(logger.debug, "test_node_cancelation"); + allocator_register_nodes(3); + create_topic("topic-1", 1, 3); + allocator_register_nodes(1); + + auto hr = create_health_report(); + + std::set unavailable_nodes = {0}; + auto fm = create_follower_metrics(unavailable_nodes); + + auto planner_result = planner.plan_reassignments(hr, fm); + + BOOST_REQUIRE_EQUAL(planner_result.reassignments.size(), 1); + + auto ntp = planner_result.reassignments.front().ntp; + + std::unordered_set expected_nodes( + {model::node_id(1), model::node_id(2), model::node_id(3)}); + + auto new_replicas = planner_result.reassignments.front() + .allocation_units.get_assignments() + .front() + .replicas; + check_expected_assignments(new_replicas, expected_nodes); + + for (auto& reassignment : planner_result.reassignments) { + move_partition_replicas(reassignment); + } + + hr = create_health_report(); + + unavailable_nodes = {0, 3}; + fm = create_follower_metrics(unavailable_nodes); + + planner_result = planner.plan_reassignments(hr, fm); + BOOST_REQUIRE(planner_result.reassignments.size() == 0); + BOOST_REQUIRE(planner_result.cancellations.size() == 1); + BOOST_REQUIRE(planner_result.cancellations.front() == ntp); +} diff --git a/tests/rptest/tests/partition_balancer_test.py b/tests/rptest/tests/partition_balancer_test.py index 111d60d23a8a..2d0614ce9769 100644 --- a/tests/rptest/tests/partition_balancer_test.py +++ b/tests/rptest/tests/partition_balancer_test.py @@ -27,34 +27,39 @@ def __init__(self, ctx, *args, **kwargs): ctx, *args, extra_rp_conf={ - 'partition_autobalancing_mode': 'continuous', - 'partition_autobalancing_node_availability_timeout_sec': + "partition_autobalancing_mode": "continuous", + "partition_autobalancing_node_availability_timeout_sec": self.NODE_AVAILABILITY_TIMEOUT, - 'partition_autobalancing_tick_interval_ms': 5000, - 'raft_learner_recovery_rate': 1_000_000, + "partition_autobalancing_tick_interval_ms": 5000, + "raft_learner_recovery_rate": 1_000_000, }, - **kwargs) + **kwargs, + ) + + def topic_partitions_replicas(self, topic): - def node2partition_count(self): - topics = [self.topic] rpk = RpkTool(self.redpanda) - ret = {} - for topic in topics: - num_partitions = topic.partition_count + num_partitions = topic.partition_count - def all_partitions_ready(): - try: - partitions = list(rpk.describe_topic(topic.name)) - except RpkException: - return False - return (len(partitions) == num_partitions, partitions) + def all_partitions_ready(): + try: + partitions = list(rpk.describe_topic(topic.name)) + except RpkException: + return False + return (len(partitions) == num_partitions, partitions) - partitions = wait_until_result( - all_partitions_ready, - timeout_sec=120, - backoff_sec=1, - err_msg="failed to wait until all partitions have leaders") + return wait_until_result( + all_partitions_ready, + timeout_sec=120, + backoff_sec=1, + err_msg="failed to wait until all partitions have leaders", + ) + def node2partition_count(self): + topics = [self.topic] + ret = {} + for topic in topics: + partitions = self.topic_partitions_replicas(topic) for p in partitions: for r in p.replicas: ret[r] = ret.setdefault(r, 0) + 1 @@ -69,22 +74,26 @@ def check(): req_start = time.time() status = admin.get_partition_balancer_status(timeout=1) - self.logger.info(f'partition balancer status: {status}') + self.logger.info(f"partition balancer status: {status}") - if 'seconds_since_last_tick' not in status: + if "seconds_since_last_tick" not in status: return False - return (req_start - status['seconds_since_last_tick'] - 1 > start - and predicate(status), status) + return ( + req_start - status["seconds_since_last_tick"] - 1 > start + and predicate(status), + status, + ) return wait_until_result( check, timeout_sec=timeout_sec, backoff_sec=2, - err_msg="failed to wait until status condition") + err_msg="failed to wait until status condition", + ) def wait_until_ready(self, timeout_sec=120): return self.wait_until_status( - lambda status: status['status'] == 'ready', + lambda status: status["status"] == "ready", timeout_sec=timeout_sec) @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) @@ -106,8 +115,9 @@ def test_unavailable_nodes(self): for n in range(10): node = self.redpanda.nodes[n % 5] failure_types = [ - FailureSpec.FAILURE_KILL, FailureSpec.FAILURE_TERMINATE, - FailureSpec.FAILURE_SUSPEND + FailureSpec.FAILURE_KILL, + FailureSpec.FAILURE_TERMINATE, + FailureSpec.FAILURE_SUSPEND, ] failure = FailureSpec(random.choice(failure_types), node) f_injector._start_func(failure.type)(failure.node) @@ -118,22 +128,77 @@ def test_unavailable_nodes(self): time.sleep(self.NODE_AVAILABILITY_TIMEOUT) - # TODO: enable when cancellation gets implemented - # wait_for_quiescent_state = random.random() < 0.5 - wait_for_quiescent_state = True + self.wait_until_ready() - if wait_for_quiescent_state: - self.wait_until_ready() + node2pc = self.node2partition_count() + self.logger.info(f"partition counts after: {node2pc}") - node2pc = self.node2partition_count() - self.logger.info(f'partition counts after: {node2pc}') - - assert sum(node2pc.values()) == total_replicas - assert self.redpanda.idx(node) not in node2pc - else: - self.wait_until_status(lambda s: s['status'] == 'in_progress' - or s['status'] == 'ready') + assert sum(node2pc.values()) == total_replicas + assert self.redpanda.idx(node) not in node2pc prev_failure = failure self.run_validation() + + def _throttle_recovery(self, new_value): + self.redpanda.set_cluster_config( + {"raft_learner_recovery_rate": str(new_value)}) + + @cluster(num_nodes=6, log_allow_list=CHAOS_LOG_ALLOW_LIST) + def test_movement_cancellations(self): + self.start_redpanda(num_nodes=4) + + self.topic = TopicSpec(partition_count=random.randint(20, 30)) + + f_injector = FailureInjector(self.redpanda) + + self.client().create_topic(self.topic) + + self.start_producer(1) + self.start_consumer(1) + self.await_startup() + + empty_node = self.redpanda.nodes[-1] + + # clean node + initial_failure = FailureSpec(FailureSpec.FAILURE_KILL, empty_node) + f_injector._start_func(initial_failure.type)(initial_failure.node) + time.sleep(self.NODE_AVAILABILITY_TIMEOUT) + self.wait_until_ready() + f_injector._stop_func(initial_failure.type)(initial_failure.node) + node2pc = self.node2partition_count() + self.logger.info(f"partition counts after: {node2pc}") + assert self.redpanda.idx(empty_node) not in node2pc + + # throttle recovery to prevent partition move from finishing + self._throttle_recovery(10) + + total_replicas = sum(self.node2partition_count().values()) + + for n in range(3): + node = self.redpanda.nodes[n % 3] + failure = FailureSpec(FailureSpec.FAILURE_KILL, node) + f_injector._start_func(failure.type)(failure.node) + + time.sleep(self.NODE_AVAILABILITY_TIMEOUT) + + # wait until movement start + self.wait_until_status(lambda s: s["status"] == "in_progress") + f_injector._stop_func(failure.type)(failure.node) + + # stop empty node + f_injector._start_func(initial_failure.type)(initial_failure.node) + time.sleep(self.NODE_AVAILABILITY_TIMEOUT) + + # wait until movements are cancelled + self.wait_until_ready() + + node2pc = self.node2partition_count() + self.logger.info(f"partition counts after: {node2pc}") + + assert sum(node2pc.values()) == total_replicas + assert self.redpanda.idx(empty_node) not in node2pc + + f_injector._stop_func(initial_failure.type)(initial_failure.node) + + self.run_validation()