From f4401b0be2d17c0af77781a2af06d201fe90da00 Mon Sep 17 00:00:00 2001 From: ZeDRoman Date: Mon, 4 Jul 2022 13:49:09 +0300 Subject: [PATCH 1/3] cluster: partition allocator disk usage constraint Constraints for partition autobalancing Hard constraint - if new partition will break specialized disk limits on node Soft constraint - score nodes for free disk spac --- src/v/cluster/partition_balancer_types.h | 40 +++++++ src/v/cluster/scheduling/constraints.cc | 130 +++++++++++++++++++++++ src/v/cluster/scheduling/constraints.h | 29 +++++ 3 files changed, 199 insertions(+) create mode 100644 src/v/cluster/partition_balancer_types.h diff --git a/src/v/cluster/partition_balancer_types.h b/src/v/cluster/partition_balancer_types.h new file mode 100644 index 000000000000..af677c97575e --- /dev/null +++ b/src/v/cluster/partition_balancer_types.h @@ -0,0 +1,40 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "model/fundamental.h" +#include "model/metadata.h" + +namespace cluster { + +struct node_disk_space { + model::node_id node_id; + uint64_t free_space; + uint64_t total_space; + double free_space_rate; + + inline node_disk_space( + model::node_id node_id, uint64_t free_space, uint64_t total_space) + : node_id(node_id) + , free_space(free_space) + , total_space(total_space) + , free_space_rate(double(free_space) / double(total_space)) {} + + bool operator==(const node_disk_space& other) const { + return node_id == other.node_id; + } + + bool operator<(const node_disk_space& other) const { + return free_space_rate < other.free_space_rate; + } +}; + +} // namespace cluster \ No newline at end of file diff --git a/src/v/cluster/scheduling/constraints.cc b/src/v/cluster/scheduling/constraints.cc index 8d5ef5e0aa23..5fd466e400d7 100644 --- a/src/v/cluster/scheduling/constraints.cc +++ b/src/v/cluster/scheduling/constraints.cc @@ -133,6 +133,71 @@ distinct_from(const std::vector& current) { return hard_constraint_evaluator(std::make_unique(current)); } +hard_constraint_evaluator disk_not_overflowed_by_partition( + const double max_disk_usage_ratio, + const size_t partition_size, + const absl::flat_hash_map& + assigned_reallocation_sizes, + const absl::flat_hash_map& + node_disk_reports) { + class impl : public hard_constraint_evaluator::impl { + public: + impl( + const double max_disk_usage_ratio, + const size_t partition_size, + const absl::flat_hash_map& + assigned_reallocation_sizes, + const absl::flat_hash_map& + node_disk_reports) + : _max_disk_usage_ratio(max_disk_usage_ratio) + , _partition_size(partition_size) + , _assigned_reallocation_sizes(assigned_reallocation_sizes) + , _node_disk_reports(node_disk_reports) {} + + bool evaluate(const allocation_node& node) const final { + auto node_disk_report = _node_disk_reports.find(node.id()); + if (node_disk_report == _node_disk_reports.end()) { + return false; + } else { + uint64_t assigned_reallocation_size = 0; + if (const auto& asigned_size + = _assigned_reallocation_sizes.find(node.id()); + asigned_size != _assigned_reallocation_sizes.end()) { + assigned_reallocation_size = asigned_size->second; + } + auto disk_usage = node_disk_report->second.total_space + - node_disk_report->second.free_space + + _partition_size + + assigned_reallocation_size; + return _max_disk_usage_ratio + * double(node_disk_report->second.total_space) + > double(disk_usage); + } + return false; + } + + void print(std::ostream& o) const final { + fmt::print( + o, + "partition with size {} doesn't overfill disk", + _partition_size); + } + + const double _max_disk_usage_ratio; + const size_t _partition_size; + const absl::flat_hash_map& + _assigned_reallocation_sizes; + const absl::flat_hash_map& + _node_disk_reports; + }; + + return hard_constraint_evaluator(std::make_unique( + max_disk_usage_ratio, + partition_size, + assigned_reallocation_sizes, + node_disk_reports)); +} + soft_constraint_evaluator least_allocated() { class impl : public soft_constraint_evaluator::impl { public: @@ -190,4 +255,69 @@ soft_constraint_evaluator distinct_rack( return soft_constraint_evaluator(std::make_unique(replicas, state)); } +soft_constraint_evaluator least_disk_filled( + const double max_disk_usage_ratio, + const absl::flat_hash_map& + assigned_reallocation_sizes, + const absl::flat_hash_map& + node_disk_reports) { + class impl : public soft_constraint_evaluator::impl { + public: + impl( + const double max_disk_usage_ratio, + const absl::flat_hash_map& + assigned_reallocation_sizes, + const absl::flat_hash_map& + node_disk_reports) + : _max_disk_usage_ratio(max_disk_usage_ratio) + , _assigned_reallocation_sizes(assigned_reallocation_sizes) + , _node_disk_reports(node_disk_reports) {} + uint64_t score(const allocation_node& node) const final { + // we return 0 for node filled more or equal to max_disk_usage_ratio + // and 10'000'000 for nodes empty disks + auto node_disk_report = _node_disk_reports.find(node.id()); + if (node_disk_report == _node_disk_reports.end()) { + return 0; + } else { + uint64_t assigned_reallocation_size = 0; + if (const auto& asigned_size + = _assigned_reallocation_sizes.find(node.id()); + asigned_size != _assigned_reallocation_sizes.end()) { + assigned_reallocation_size = asigned_size->second; + } + if (node_disk_report->second.total_space == 0) { + return 0; + } + auto disk_free_space_rate + = double( + node_disk_report->second.free_space + - assigned_reallocation_size) + / double(node_disk_report->second.total_space); + auto min_disk_free_space = double(1) - _max_disk_usage_ratio; + if (disk_free_space_rate < min_disk_free_space) { + return 0; + } else { + return uint64_t( + soft_constraint_evaluator::max_score + * ((disk_free_space_rate - min_disk_free_space) / _max_disk_usage_ratio)); + } + } + return 0; + } + + void print(std::ostream& o) const final { + fmt::print(o, "least filled disk"); + } + + const double _max_disk_usage_ratio; + const absl::flat_hash_map& + _assigned_reallocation_sizes; + const absl::flat_hash_map& + _node_disk_reports; + }; + + return soft_constraint_evaluator(std::make_unique( + max_disk_usage_ratio, assigned_reallocation_sizes, node_disk_reports)); +} + } // namespace cluster diff --git a/src/v/cluster/scheduling/constraints.h b/src/v/cluster/scheduling/constraints.h index 18eacd2ff3f7..24d243a1ce40 100644 --- a/src/v/cluster/scheduling/constraints.h +++ b/src/v/cluster/scheduling/constraints.h @@ -10,9 +10,12 @@ */ #pragma once +#include "cluster/partition_balancer_types.h" #include "cluster/scheduling/types.h" #include "model/metadata.h" +#include + namespace cluster { class allocation_state; @@ -27,8 +30,34 @@ hard_constraint_evaluator on_nodes(const std::vector&); hard_constraint_evaluator distinct_from(const std::vector&); +/* + * constraint checks that new partition won't violate max_disk_usage_ratio + * partition_size is size of partition that is going to be allocated + * assigned_reallocation_sizes is sizes of partitions that are going to be + * allocated on node + */ +hard_constraint_evaluator disk_not_overflowed_by_partition( + const double max_disk_usage_ratio, + const size_t partition_size, + const absl::flat_hash_map& + assigned_reallocation_sizes, + const absl::flat_hash_map& + node_disk_reports); + soft_constraint_evaluator least_allocated(); +/* + * constraint scores nodes on free disk space + * assigned_reallocation_sizes is sizes of partitions that are going to be + * allocated on node + */ +soft_constraint_evaluator least_disk_filled( + const double max_disk_usage_ratio, + const absl::flat_hash_map& + assigned_reallocation_sizes, + const absl::flat_hash_map& + node_disk_reports); + soft_constraint_evaluator distinct_rack(const std::vector&, const allocation_state&); From ee9ad9096129870fa3a88bc51fabfa37a53d73bc Mon Sep 17 00:00:00 2001 From: ZeDRoman Date: Mon, 4 Jul 2022 14:45:06 +0300 Subject: [PATCH 2/3] c/partition_balancer: planner calculate movement calculate movement for partitions on unavalible and disk-full nodes. For unavalible node it moves all partition For full node it takes partition until available size of node will satisfy constraint --- src/v/cluster/CMakeLists.txt | 1 + src/v/cluster/partition_balancer_planner.cc | 420 ++++++++++++++++++++ src/v/cluster/partition_balancer_planner.h | 105 +++++ 3 files changed, 526 insertions(+) create mode 100644 src/v/cluster/partition_balancer_planner.cc create mode 100644 src/v/cluster/partition_balancer_planner.h diff --git a/src/v/cluster/CMakeLists.txt b/src/v/cluster/CMakeLists.txt index bda98b0e7a5c..2176c5f2e2b9 100644 --- a/src/v/cluster/CMakeLists.txt +++ b/src/v/cluster/CMakeLists.txt @@ -93,6 +93,7 @@ v_cc_library( feature_table.cc drain_manager.cc read_replica_manager.cc + partition_balancer_planner.cc DEPS Seastar::seastar controller_rpc diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc new file mode 100644 index 000000000000..f3d9cd3509db --- /dev/null +++ b/src/v/cluster/partition_balancer_planner.cc @@ -0,0 +1,420 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "cluster/partition_balancer_planner.h" + +#include "cluster/partition_balancer_types.h" +#include "cluster/scheduling/constraints.h" +#include "cluster/scheduling/types.h" + +#include + +namespace cluster { + +partition_balancer_planner::partition_balancer_planner( + planner_config config, + topic_table& topic_table, + partition_allocator& partition_allocator) + : _config(config) + , _topic_table(topic_table) + , _partition_allocator(partition_allocator) {} + +void partition_balancer_planner:: + init_ntp_sizes_and_node_disk_reports_from_health_report( + const cluster_health_report& health_report, + reallocation_request_state& rrs) { + for (const auto& node_report : health_report.node_reports) { + uint64_t total = 0; + uint64_t free = 0; + for (const auto& disk : node_report.local_state.disks) { + total += disk.total; + free += disk.free; + } + rrs.node_disk_reports.insert( + {node_report.id, node_disk_space(node_report.id, free, total)}); + for (const auto& tp_ns : node_report.topics) { + for (const auto& partition : tp_ns.partitions) { + rrs.ntp_sizes[model::ntp( + tp_ns.tp_ns.ns, tp_ns.tp_ns.tp, partition.id)] + = partition.size_bytes; + } + } + } + return; +} + +void partition_balancer_planner:: + calculate_nodes_with_disk_constraints_violation( + reallocation_request_state& rrs) { + for (const auto& n : rrs.node_disk_reports) { + if ( + n.second.free_space_rate < double(1) - _config.max_disk_usage_ratio) { + rrs.nodes_with_disk_constraints_violation.insert(n.first); + rrs.full_nodes_disk_sizes.insert(n.second); + } + } +} + +void partition_balancer_planner::calculate_unavailable_nodes( + const std::vector& follower_metrics, + reallocation_request_state& rrs) { + const auto now = raft::clock_type::now(); + for (const auto& follower : follower_metrics) { + if ( + now - follower.last_heartbeat + > _config.node_availability_timeout_sec) { + rrs.unavailable_nodes.insert(follower.id); + } + } +} + +std::vector +partition_balancer_planner::get_stable_replicas( + const std::vector& replicas, + const reallocation_request_state& rrs, + size_t full_nodes_leave_amount) { + std::vector stable_replicas; + absl::flat_hash_map + replicas_breaking_disk_constraints; + + // add stable nodes + for (const auto& r : replicas) { + if (!rrs.unavailable_nodes.contains(r.node_id)) { + if (!rrs.nodes_with_disk_constraints_violation.contains( + r.node_id)) { + stable_replicas.push_back(r); + } else { + replicas_breaking_disk_constraints[r.node_id] = r.shard; + } + } + } + + // add 'full_nodes_leave_amount' of nodes with disk constraints violation + // ordered by free_space rate + for (auto node = rrs.full_nodes_disk_sizes.rbegin(); + node != rrs.full_nodes_disk_sizes.rend(); + node++) { + if (full_nodes_leave_amount == 0) { + return stable_replicas; + } + if (const auto& r = replicas_breaking_disk_constraints.find( + node->node_id); + r != replicas_breaking_disk_constraints.end()) { + stable_replicas.push_back(model::broker_shard{r->first, r->second}); + full_nodes_leave_amount--; + } + } + + return stable_replicas; +} + +size_t partition_balancer_planner::get_full_nodes_amount( + const std::vector& replicas, + const reallocation_request_state& rrs) { + size_t full_nodes_amount = 0; + for (const auto& r : replicas) { + if (rrs.nodes_with_disk_constraints_violation.contains(r.node_id)) { + full_nodes_amount += 1; + } + } + return full_nodes_amount; +} + +bool partition_balancer_planner::is_partition_movement_possible( + const std::vector& current_replicas, + const reallocation_request_state& rrs) { + // Check that nodes quorum is available + size_t available_nodes_amount = std::count_if( + current_replicas.begin(), + current_replicas.end(), + [&rrs](const model::broker_shard& bs) { + return rrs.unavailable_nodes.find(bs.node_id) + == rrs.unavailable_nodes.end(); + }); + if (available_nodes_amount * 2 < current_replicas.size()) { + return false; + } + return true; +} + +std::optional partition_balancer_planner::get_partition_size( + const model::ntp& ntp, const reallocation_request_state& rrs) { + const auto ntp_data = rrs.ntp_sizes.find(ntp); + if (ntp_data == rrs.ntp_sizes.end()) { + vlog( + clusterlog.error, + "Partition {} status was not found in cluster health " + "report", + ntp); + } else { + return ntp_data->second; + } + return std::nullopt; +} + +/* + * Function tries to find max reallocation + * It tries to move ntp out of 'bad' nodes + * Initially it considers as 'bad' all unavailable nodes + * and all nodes that are breaking max_disk_usage_ratio + * If reallocation request fails, it removes least filled node that + * is available but is breaking max_disk_usage_ratio + * It repeats until it doesn't work out to calculate reallocation + * or all nodes that are breaking max_disk_usage_ratio are removed out of 'bad' + */ +result partition_balancer_planner::get_reallocation( + const partition_assignment& assignments, + const topic_metadata& topic_metadata, + size_t partition_size, + bool use_max_disk_constraint, + reallocation_request_state& rrs) { + allocation_constraints allocation_constraints; + + // Add constraint on least disk usage + allocation_constraints.soft_constraints.push_back( + ss::make_lw_shared(least_disk_filled( + _config.max_disk_usage_ratio, + rrs.assigned_reallocation_sizes, + rrs.node_disk_reports))); + + if (use_max_disk_constraint) { + // Add constraint on partition max_disk_usage_ratio overfill + allocation_constraints.hard_constraints.push_back( + ss::make_lw_shared( + disk_not_overflowed_by_partition( + _config.max_disk_usage_ratio, + partition_size, + rrs.assigned_reallocation_sizes, + rrs.node_disk_reports))); + } else { + // Add constraint on partition disk overfill + allocation_constraints.hard_constraints.push_back( + ss::make_lw_shared( + disk_not_overflowed_by_partition( + 1, + partition_size, + rrs.assigned_reallocation_sizes, + rrs.node_disk_reports))); + } + + // Add constraint on unavailable nodes + std::vector unavailable_nodes; + unavailable_nodes.reserve(unavailable_nodes.size()); + for (auto node_id : rrs.unavailable_nodes) { + unavailable_nodes.push_back(model::broker_shard{node_id, 0}); + } + allocation_constraints.hard_constraints.push_back( + ss::make_lw_shared( + distinct_from(unavailable_nodes))); + + auto constraints = partition_constraints( + assignments.id, + topic_metadata.get_configuration().replication_factor, + allocation_constraints); + + size_t full_node_amount = get_full_nodes_amount(assignments.replicas, rrs); + for (size_t i = 0; i <= full_node_amount; ++i) { + auto stable_replicas = get_stable_replicas( + assignments.replicas, rrs, i); + + if (stable_replicas.size() == assignments.replicas.size()) { + break; + } + + auto stable_assigments = partition_assignment( + assignments.group, assignments.id, stable_replicas); + + auto reallocation = _partition_allocator.reallocate_partition( + constraints, stable_assigments); + + auto ntp = model::ntp( + topic_metadata.get_configuration().tp_ns.ns, + topic_metadata.get_configuration().tp_ns.tp, + assignments.id); + if (reallocation.has_value()) { + rrs.moving_partitions.insert(ntp); + rrs.planned_movement_disk_size += partition_size; + for (const auto r : + reallocation.value().get_assignments().front().replicas) { + if ( + std::find(stable_replicas.begin(), stable_replicas.end(), r) + == stable_replicas.end()) { + rrs.assigned_reallocation_sizes[r.node_id] + += partition_size; + } + if ( + std::find( + assignments.replicas.begin(), assignments.replicas.end(), r) + == assignments.replicas.end()) { + rrs.node_released_disk_size[r.node_id] += partition_size; + } + } + return reallocation; + } else { + vlog( + clusterlog.info, + "Can't find movement for ntp {} with stable_replicas: {}, " + "Error: {}", + ntp, + stable_replicas, + reallocation.error().message()); + } + } + return errc::no_eligible_allocation_nodes; +} + +/* + * Function is trying to move ntp out of unavailable nodes + * It can move to nodes that are violating max_disk_usage_ratio constraint + */ +void partition_balancer_planner::get_unavailable_nodes_reassignments( + std::vector& reassignments, + reallocation_request_state& rrs) { + for (const auto& t : _topic_table.topics_map()) { + for (const auto& a : t.second.get_assignments()) { + auto ntp = model::ntp(t.first.ns, t.first.tp, a.id); + if (rrs.moving_partitions.contains(ntp)) { + continue; + } + // Get all available nodes + auto available_replicas = get_stable_replicas( + a.replicas, rrs, SIZE_MAX); + if (available_replicas.size() != a.replicas.size()) { + auto partition_size = get_partition_size(ntp, rrs); + if ( + !partition_size.has_value() + || !is_partition_movement_possible(a.replicas, rrs)) { + continue; + } + auto new_allocation_units = get_reallocation( + a, t.second, partition_size.value(), false, rrs); + if (!new_allocation_units) { + vlog( + clusterlog.info, + "Can't reallocate {} from down node", + ntp); + } else { + reassignments.emplace_back(ntp_reassignments{ + .ntp = ntp, + .allocation_units = std::move( + new_allocation_units.value())}); + } + } + + // End adding movements if batch is collected + if ( + rrs.planned_movement_disk_size + >= _config.movement_disk_size_batch) { + return; + } + } + } +} + +/* + * Function is trying to move ntps out of node that are violating + * max_disk_usage_ratio It takes nodes in free space rate order For each node it + * is trying to collect set of partitions to move Partitions are selected in + * ascending order of their size. + */ +void partition_balancer_planner::get_full_node_reassignments( + std::vector& reassignments, + reallocation_request_state& rrs) { + absl::flat_hash_map> ntp_on_nodes; + for (const auto& t : _topic_table.topics_map()) { + for (const auto& a : t.second.get_assignments()) { + for (const auto& r : a.replicas) { + ntp_on_nodes[r.node_id].emplace_back( + t.first.ns, t.first.tp, a.id); + } + } + } + + for (const auto& node_disk_space : rrs.full_nodes_disk_sizes) { + if ( + rrs.planned_movement_disk_size >= _config.movement_disk_size_batch) { + return; + } + absl::btree_multimap ntp_on_node_sizes; + for (const auto& ntp : ntp_on_nodes[node_disk_space.node_id]) { + auto partition_size_opt = get_partition_size(ntp, rrs); + if (partition_size_opt.has_value()) { + ntp_on_node_sizes.emplace(partition_size_opt.value(), ntp); + } + } + auto ntp_size_it = ntp_on_node_sizes.begin(); + while (double( + rrs.node_released_disk_size[node_disk_space.node_id] + + node_disk_space.free_space) + / double(node_disk_space.total_space) + < double(1) - _config.max_disk_usage_ratio + && ntp_size_it != ntp_on_node_sizes.end()) { + const auto& partition_to_move = ntp_size_it->second; + if (rrs.moving_partitions.contains(partition_to_move)) { + ntp_size_it++; + continue; + } + + const auto& topic_metadata = _topic_table.topics_map().at( + model::topic_namespace_view(partition_to_move)); + const auto& current_assignments + = topic_metadata.get_assignments().find( + partition_to_move.tp.partition); + if (!is_partition_movement_possible( + current_assignments->replicas, rrs)) { + ntp_size_it++; + continue; + } + auto new_allocation_units = get_reallocation( + *current_assignments, + topic_metadata, + ntp_size_it->first, + true, + rrs); + if (!new_allocation_units) { + vlog(clusterlog.info, "Can't reallocate {}", partition_to_move); + } else { + reassignments.emplace_back(ntp_reassignments{ + .ntp = partition_to_move, + .allocation_units = std::move(new_allocation_units.value())}); + } + ntp_size_it++; + if ( + rrs.planned_movement_disk_size + >= _config.movement_disk_size_batch) { + return; + } + } + } +} + +std::vector +partition_balancer_planner::get_ntp_reassignments( + const cluster_health_report& health_report, + const std::vector& follower_metrics) { + if (_topic_table.has_updates_in_progress()) { + return {}; + } + + reallocation_request_state rrs; + std::vector reassignments; + + init_ntp_sizes_and_node_disk_reports_from_health_report(health_report, rrs); + + calculate_unavailable_nodes(follower_metrics, rrs); + calculate_nodes_with_disk_constraints_violation(rrs); + + get_unavailable_nodes_reassignments(reassignments, rrs); + get_full_node_reassignments(reassignments, rrs); + + return reassignments; +} + +} // namespace cluster \ No newline at end of file diff --git a/src/v/cluster/partition_balancer_planner.h b/src/v/cluster/partition_balancer_planner.h new file mode 100644 index 000000000000..3f5c7e5a0947 --- /dev/null +++ b/src/v/cluster/partition_balancer_planner.h @@ -0,0 +1,105 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "cluster/health_monitor_types.h" +#include "cluster/partition_balancer_types.h" +#include "cluster/scheduling/partition_allocator.h" +#include "cluster/topic_table.h" + +#include + +namespace cluster { + +struct ntp_reassignments { + model::ntp ntp; + allocation_units allocation_units; +}; + +struct planner_config { + double max_disk_usage_ratio; + // Size of partitions that can be planned to move in one request + size_t movement_disk_size_batch; + std::chrono::seconds node_availability_timeout_sec; +}; + +class partition_balancer_planner { +public: + partition_balancer_planner( + planner_config config, + topic_table& topic_table, + partition_allocator& partition_allocator); + + std::vector get_ntp_reassignments( + const cluster_health_report&, const std::vector&); + +private: + struct reallocation_request_state { + uint64_t planned_movement_disk_size = 0; + absl::flat_hash_map ntp_sizes; + absl::flat_hash_map node_disk_reports; + absl::flat_hash_set unavailable_nodes; + absl::flat_hash_set + nodes_with_disk_constraints_violation; + absl::btree_set full_nodes_disk_sizes; + // Partitions that are planned to move in current planner request + absl::flat_hash_set moving_partitions; + // Size of partitions that are planned to move from node + absl::flat_hash_map node_released_disk_size; + // Size of partitions that are planned to move on node + absl::flat_hash_map + assigned_reallocation_sizes; + }; + + result get_reallocation( + const partition_assignment& assignments, + const topic_metadata& topic_metadata, + size_t partition_size, + bool use_max_disk_constraint, + reallocation_request_state&); + + void get_unavailable_nodes_reassignments( + std::vector&, reallocation_request_state&); + + void get_full_node_reassignments( + std::vector&, reallocation_request_state&); + + void calculate_nodes_with_disk_constraints_violation( + reallocation_request_state&); + + void calculate_unavailable_nodes( + const std::vector&, reallocation_request_state&); + + size_t get_full_nodes_amount( + const std::vector& replicas, + const reallocation_request_state& rrs); + + std::vector get_stable_replicas( + const std::vector& replicas, + const reallocation_request_state&, + size_t full_nodes_leave_amount); + + bool is_partition_movement_possible( + const std::vector& current_replicas, + const reallocation_request_state&); + + void init_ntp_sizes_and_node_disk_reports_from_health_report( + const cluster_health_report& health_report, reallocation_request_state&); + + std::optional get_partition_size( + const model::ntp& ntp, const reallocation_request_state&); + + planner_config _config; + topic_table& _topic_table; + partition_allocator& _partition_allocator; +}; + +} // namespace cluster From dd82c36c8aec0c7002defb4f4ba172923c6ac598 Mon Sep 17 00:00:00 2001 From: ZeDRoman Date: Mon, 4 Jul 2022 14:45:14 +0300 Subject: [PATCH 3/3] utest: partition autorebalancing planner tests --- src/v/cluster/tests/CMakeLists.txt | 1 + .../partition_balancer_planner_fixture.h | 195 ++++++ .../tests/partition_balancer_planner_test.cc | 568 ++++++++++++++++++ 3 files changed, 764 insertions(+) create mode 100644 src/v/cluster/tests/partition_balancer_planner_fixture.h create mode 100644 src/v/cluster/tests/partition_balancer_planner_test.cc diff --git a/src/v/cluster/tests/CMakeLists.txt b/src/v/cluster/tests/CMakeLists.txt index d999ee693c7a..e8ffe54429f3 100644 --- a/src/v/cluster/tests/CMakeLists.txt +++ b/src/v/cluster/tests/CMakeLists.txt @@ -17,6 +17,7 @@ rp_test( set(srcs partition_allocator_tests.cc + partition_balancer_planner_test.cc simple_batch_builder_test.cc serialization_rt_test.cc cluster_utils_tests.cc diff --git a/src/v/cluster/tests/partition_balancer_planner_fixture.h b/src/v/cluster/tests/partition_balancer_planner_fixture.h new file mode 100644 index 000000000000..bd505b71e7e7 --- /dev/null +++ b/src/v/cluster/tests/partition_balancer_planner_fixture.h @@ -0,0 +1,195 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "cluster/members_table.h" +#include "cluster/partition_balancer_planner.h" +#include "cluster/tests/utils.h" +#include "cluster/topic_updates_dispatcher.h" +#include "model/metadata.h" +#include "test_utils/fixture.h" +#include "units.h" + +#include + +constexpr uint64_t node_size = 200_MiB; +constexpr uint64_t full_node_free_size = 5_MiB; +constexpr uint64_t nearly_full_node_free_size = 41_MiB; +constexpr uint64_t default_partition_size = 10_MiB; +constexpr uint64_t not_full_node_free_size = 150_MiB; +constexpr uint64_t reallocation_batch_size = default_partition_size * 2 - 1_MiB; +constexpr std::chrono::seconds node_unavailable_timeout = std::chrono::minutes( + 5); + +static constexpr uint32_t partitions_per_shard = 7000; +static constexpr uint32_t partitions_reserve_shard0 = 2; + +static std::unique_ptr +create_allocation_node(model::node_id nid, uint32_t cores) { + return std::make_unique( + nid, + cores, + absl::node_hash_map{}, + std::nullopt, + config::mock_binding(uint32_t{partitions_per_shard}), + config::mock_binding(uint32_t{partitions_reserve_shard0})); +} + +struct controller_workers { +public: + controller_workers() + : dispatcher(allocator, table, leaders) { + table.start().get(); + members.start_single().get(); + allocator + .start_single( + std::ref(members), + config::mock_binding>(std::nullopt), + config::mock_binding>(std::nullopt), + config::mock_binding(uint32_t{partitions_per_shard}), + config::mock_binding(uint32_t{partitions_reserve_shard0}), + config::mock_binding(16_KiB), + config::mock_binding(false)) + .get(); + } + + ~controller_workers() { + table.stop().get(); + allocator.stop().get(); + members.stop().get(); + } + + ss::sharded members; + ss::sharded allocator; + ss::sharded table; + ss::sharded leaders; + cluster::topic_updates_dispatcher dispatcher; +}; + +struct partition_balancer_planner_fixture { + partition_balancer_planner_fixture() + : planner( + cluster::planner_config{ + .max_disk_usage_ratio = 0.8, + .movement_disk_size_batch = reallocation_batch_size, + .node_availability_timeout_sec = std::chrono::minutes(1)}, + workers.table.local(), + workers.allocator.local()) + , workers() {} + + cluster::topic_configuration_assignment make_tp_configuration( + const ss::sstring& topic, int partitions, int16_t replication_factor) { + cluster::topic_configuration cfg( + test_ns, model::topic(topic), partitions, replication_factor); + + cluster::allocation_request req; + req.partitions.reserve(partitions); + for (auto p = 0; p < partitions; ++p) { + req.partitions.emplace_back( + model::partition_id(p), replication_factor); + } + + auto pas = workers.allocator.local() + .allocate(std::move(req)) + .value() + .get_assignments(); + + return {cfg, std::move(pas)}; + } + + cluster::create_topic_cmd make_create_topic_cmd( + const ss::sstring& name, int partitions, int16_t replication_factor) { + return { + make_tp_ns(name), + make_tp_configuration(name, partitions, replication_factor)}; + } + + model::topic_namespace make_tp_ns(const ss::sstring& tp) { + return {test_ns, model::topic(tp)}; + } + + void create_topic( + const ss::sstring& name, int partitions, int16_t replication_factor) { + auto cmd = make_create_topic_cmd(name, partitions, replication_factor); + auto res = workers.dispatcher + .apply_update(serialize_cmd(std::move(cmd)).get()) + .get(); + BOOST_REQUIRE_EQUAL(res, cluster::errc::success); + } + + void allocator_register_nodes(size_t nodes_amount) { + for (size_t i = 0; i < nodes_amount; ++i) { + workers.allocator.local().register_node( + create_allocation_node(model::node_id(last_node_idx), 4)); + last_node_idx++; + } + } + + std::vector + create_follower_metrics(const std::set& unavailable_nodes = {}) { + std::vector metrics; + metrics.reserve(last_node_idx); + for (size_t i = 0; i < last_node_idx; ++i) { + if (unavailable_nodes.contains(i)) { + metrics.push_back(raft::follower_metrics{ + .id = model::node_id(i), + .last_heartbeat = raft::clock_type::now() + - node_unavailable_timeout}); + } else { + metrics.push_back(raft::follower_metrics{ + .id = model::node_id(i), + .last_heartbeat = raft::clock_type::now()}); + } + } + return metrics; + } + + cluster::cluster_health_report create_health_report( + const std::set& full_nodes = {}, + const std::set& nearly_full_nodes = {}, + uint64_t partition_size = default_partition_size) { + cluster::cluster_health_report health_report; + std::vector topics; + for (const auto& topic : workers.table.local().topics_map()) { + cluster::topic_status ts; + ts.tp_ns = topic.second.get_configuration().tp_ns; + for (size_t i = 0; + i < topic.second.get_configuration().partition_count; + ++i) { + cluster::partition_status ps; + ps.id = model::partition_id(i); + ps.size_bytes = partition_size; + ts.partitions.push_back(ps); + } + topics.push_back(ts); + } + for (int i = 0; i < last_node_idx; ++i) { + cluster::node_health_report node_report; + storage::disk node_disk{ + .free = not_full_node_free_size, .total = node_size}; + if (full_nodes.contains(i)) { + node_disk.free = full_node_free_size; + } else if (nearly_full_nodes.contains(i)) { + node_disk.free = nearly_full_node_free_size; + } + node_report.id = model::node_id(i); + node_report.local_state.disks.push_back(node_disk); + health_report.node_reports.push_back(node_report); + } + health_report.node_reports[0].topics = topics; + return health_report; + } + + controller_workers workers; + cluster::partition_balancer_planner planner; + int last_node_idx{}; +}; diff --git a/src/v/cluster/tests/partition_balancer_planner_test.cc b/src/v/cluster/tests/partition_balancer_planner_test.cc new file mode 100644 index 000000000000..5d62123af4e9 --- /dev/null +++ b/src/v/cluster/tests/partition_balancer_planner_test.cc @@ -0,0 +1,568 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/tests/partition_balancer_planner_fixture.h" +#include "vlog.h" + +#include + +static ss::logger logger("partition_balancer_planner"); + +using namespace std::chrono_literals; + +void check_expected_assignments( + const std::vector& replicas, + const std::unordered_set& expected_nodes) { + BOOST_REQUIRE(replicas.size() == expected_nodes.size()); + for (const auto r : replicas) { + BOOST_REQUIRE(expected_nodes.contains(r.node_id)); + } +} + +/* + * 4 nodes; 1 topic; + * Actual + * node_0: partitions: 1; down: False; disk: unfilled; + * node_1: partitions: 1; down: False; disk: unfilled; + * node_2: partitions: 1; down: False; disk: unfilled; + * node_3: partitions: 0; down: False; disk: unfilled; + * Expected + * node_0: partitions: 1; + * node_1: partitions: 1; + * node_2: partitions: 1; + * node_3: partitions: 0; + */ +FIXTURE_TEST(test_stable, partition_balancer_planner_fixture) { + vlog(logger.debug, "test_stable"); + allocator_register_nodes(4); + create_topic("topic-1", 1, 3); + + auto hr = create_health_report(); + auto fm = create_follower_metrics(); + + auto reassignments = planner.get_ntp_reassignments(hr, fm); + BOOST_REQUIRE(reassignments.empty()); +} + +/* + * 4 nodes; 1 topic; 1 node down + * Actual + * node_0: partitions: 1; down: True; disk: unfilled; + * node_1: partitions: 1; down: False; disk: unfilled; + * node_2: partitions: 1; down: False; disk: unfilled; + * node_3: partitions: 0; down: False; disk: unfilled; + * Expected + * node_0: partitions: 0; + * node_1: partitions: 1; + * node_2: partitions: 1; + * node_3: partitions: 1; + */ +FIXTURE_TEST(test_node_down, partition_balancer_planner_fixture) { + vlog(logger.debug, "test_node_down"); + allocator_register_nodes(3); + create_topic("topic-1", 1, 3); + allocator_register_nodes(1); + + auto hr = create_health_report(); + + std::set unavailable_nodes = {0}; + auto fm = create_follower_metrics(unavailable_nodes); + + auto reassignments = planner.get_ntp_reassignments(hr, fm); + + BOOST_REQUIRE_EQUAL(reassignments.size(), 1); + + std::unordered_set expected_nodes( + {model::node_id(1), model::node_id(2), model::node_id(3)}); + + auto new_replicas = reassignments.front() + .allocation_units.get_assignments() + .front() + .replicas; + check_expected_assignments(new_replicas, expected_nodes); +} + +/* + * 4 nodes; 1 topic; 2 nodes down + * Actual + * node_0: partitions: 1; down: True; disk: unfilled; + * node_1: partitions: 1; down: True; disk: unfilled; + * node_2: partitions: 1; down: False; disk: unfilled; + * node_3: partitions: 0; down: False; disk: unfilled; + * Expected + * node_0: partitions: 1; + * node_1: partitions: 1; + * node_2: partitions: 1; + * node_3: partitions: 0; + */ +FIXTURE_TEST(test_no_quorum_for_partition, partition_balancer_planner_fixture) { + vlog(logger.debug, "test_no_quorum_for_partition"); + allocator_register_nodes(3); + create_topic("topic-1", 1, 3); + allocator_register_nodes(1); + + auto hr = create_health_report(); + + std::set unavailable_nodes = {0, 1}; + auto fm = create_follower_metrics(unavailable_nodes); + + auto reassignments = planner.get_ntp_reassignments(hr, fm); + BOOST_REQUIRE_EQUAL(reassignments.size(), 0); +} + +/* + * 5 nodes; 1 topic; 1 node down; 1 node nearly full + * Actual + * node_0: partitions: 1; down: True; disk: unfilled; + * node_1: partitions: 1; down: False; disk: unfilled; + * node_2: partitions: 1; down: False; disk: unfilled; + * node_3: partitions: 0; down: False; disk: nearly filled; + * node_4: partitions: 0; down: False; disk: unfilled; + * Expected + * node_0: partitions: 0; + * node_1: partitions: 1; + * node_2: partitions: 1; + * node_3: partitions: 0; + * node_4: partitions: 1; + */ +FIXTURE_TEST( + test_node_down_and_node_is_full_but_there_is_empty, + partition_balancer_planner_fixture) { + vlog(logger.debug, "test_node_down_and_node_is_full_but_there_is_empty"); + allocator_register_nodes(3); + create_topic("topic-1", 1, 3); + allocator_register_nodes(2); + + std::set nearly_full_nodes = {3}; + auto hr = create_health_report({}, nearly_full_nodes); + + std::set unavailable_nodes = {0}; + auto fm = create_follower_metrics(unavailable_nodes); + + auto reassignments = planner.get_ntp_reassignments(hr, fm); + + BOOST_REQUIRE_EQUAL(reassignments.size(), 1); + + std::unordered_set expected_nodes( + {model::node_id(1), model::node_id(2), model::node_id(4)}); + + auto new_replicas = reassignments.front() + .allocation_units.get_assignments() + .front() + .replicas; + check_expected_assignments(new_replicas, expected_nodes); +} +/* + * 5 nodes; 1 topic; 1 node down; 1 node nearly full + * Actual + * node_0: partitions: 1; down: True; disk: unfilled; + * node_1: partitions: 1; down: False; disk: unfilled; + * node_2: partitions: 1; down: False; disk: unfilled; + * node_3: partitions: 0; down: False; disk: nearly filled; + * Expected + * node_0: partitions: 0; + * node_1: partitions: 1; + * node_2: partitions: 1; + * node_3: partitions: 1; + */ +FIXTURE_TEST( + test_node_down_and_node_is_nearly_full, partition_balancer_planner_fixture) { + vlog(logger.debug, "test_node_down_and_node_is_nearly_full"); + allocator_register_nodes(3); + create_topic("topic-1", 1, 3); + allocator_register_nodes(1); + + std::set nearly_full_nodes = {3}; + auto hr = create_health_report({}, nearly_full_nodes); + + std::set unavailable_nodes = {0}; + auto fm = create_follower_metrics(unavailable_nodes); + + auto reassignments = planner.get_ntp_reassignments(hr, fm); + + BOOST_REQUIRE_EQUAL(reassignments.size(), 1); + + std::unordered_set expected_nodes( + {model::node_id(1), model::node_id(2), model::node_id(3)}); + auto new_replicas = reassignments.front() + .allocation_units.get_assignments() + .front() + .replicas; + check_expected_assignments(new_replicas, expected_nodes); +} + +/* + * 4 nodes; 1 topic; 1 node down; 1 node full + * Actual + * node_0: partitions: 1; down: True; disk: unfilled; + * node_1: partitions: 1; down: False; disk: unfilled; + * node_2: partitions: 1; down: False; disk: unfilled; + * node_3: partitions: 0; down: False; disk: full; + * Expected + * node_0: partitions: 1; + * node_1: partitions: 1; + * node_2: partitions: 1; + * node_3: partitions: 0; + */ +FIXTURE_TEST( + test_node_down_and_node_is_full, partition_balancer_planner_fixture) { + vlog(logger.debug, "test_node_down_and_node_is_full"); + allocator_register_nodes(3); + create_topic("topic-1", 1, 3); + allocator_register_nodes(1); + + std::set full_nodes = {3}; + auto hr = create_health_report(full_nodes); + + std::set unavailable_nodes = {0}; + auto fm = create_follower_metrics(unavailable_nodes); + + auto reassignments = planner.get_ntp_reassignments(hr, fm); + + BOOST_REQUIRE_EQUAL(reassignments.size(), 0); +} + +/* + * 4 nodes; 1 topic; 1 node full + * Actual + * node_0: partitions: 1; down: False; disk: full; + * node_1: partitions: 1; down: False; disk: unfilled; + * node_2: partitions: 1; down: False; disk: unfilled; + * node_3: partitions: 0; down: False; disk: unfilled; + * Expected + * node_0: partitions: 0; + * node_1: partitions: 1; + * node_2: partitions: 1; + * node_3: partitions: 1; + */ +FIXTURE_TEST(test_move_from_full_node, partition_balancer_planner_fixture) { + vlog(logger.debug, "test_move_from_full_node"); + allocator_register_nodes(3); + create_topic("topic-1", 1, 3); + allocator_register_nodes(1); + + std::set full_nodes = {0}; + auto hr = create_health_report(full_nodes); + + auto fm = create_follower_metrics(); + + auto reassignments = planner.get_ntp_reassignments(hr, fm); + + BOOST_REQUIRE_EQUAL(reassignments.size(), 1); + + std::unordered_set expected_nodes( + {model::node_id(1), model::node_id(2), model::node_id(3)}); + + auto new_replicas = reassignments.front() + .allocation_units.get_assignments() + .front() + .replicas; + check_expected_assignments(new_replicas, expected_nodes); +} + +/* + * 4 nodes; 3 topic; 1 node down; + * Can move only 2 topics by one operation + * Actual + * node_0: partitions: 3; down: True; disk: unfilled; + * node_1: partitions: 3; down: False; disk: unfilled; + * node_2: partitions: 3; down: False; disk: unfilled; + * node_3: partitions: 0; down: False; disk: unfilled; + * Expected + * node_0: partitions: 1; + * node_1: partitions: 3; + * node_2: partitions: 3; + * node_3: partitions: 2; + */ +FIXTURE_TEST( + test_move_multiple_partitions_batched_node_down, + partition_balancer_planner_fixture) { + vlog(logger.debug, "test_move_multiple_partitions_with_batch"); + allocator_register_nodes(3); + create_topic("topic-1", 3, 3); + allocator_register_nodes(1); + + auto hr = create_health_report(); + + std::set unavailable_nodes = {0}; + auto fm = create_follower_metrics(unavailable_nodes); + + auto reassignments = planner.get_ntp_reassignments(hr, fm); + + BOOST_REQUIRE_EQUAL(reassignments.size(), 2); + + std::unordered_set expected_nodes( + {model::node_id(1), model::node_id(2), model::node_id(3)}); + + auto new_replicas_1 + = reassignments[0].allocation_units.get_assignments().front().replicas; + check_expected_assignments(new_replicas_1, expected_nodes); + + auto new_replicas_2 + = reassignments[1].allocation_units.get_assignments().front().replicas; + check_expected_assignments(new_replicas_2, expected_nodes); +} + +/* + * 6 nodes; 3 topic; 3 node full; + * Can move only 2 topics by one operation + * Actual + * node_0: partitions: 3; down: False; disk: full; + * node_1: partitions: 3; down: False; disk: full; + * node_2: partitions: 3; down: False; disk: full; + * node_3: partitions: 0; down: False; disk: unfilled; + * node_4: partitions: 0; down: False; disk: unfilled; + * node_5: partitions: 0; down: False; disk: unfilled; + * Expected + * node_0: partitions: 1; + * node_1: partitions: 1; + * node_2: partitions: 1; + * node_3: partitions: 2; + * node_4: partitions: 2; + * node_5: partitions: 2; + */ +FIXTURE_TEST( + test_move_multiple_partitions_batched_node_overfill, + partition_balancer_planner_fixture) { + vlog(logger.debug, "test_move_multiple_partitions_batched_node_overfill"); + allocator_register_nodes(3); + create_topic("topic-1", 3, 3); + allocator_register_nodes(3); + + std::set full_nodes = {0, 1, 2}; + auto hr = create_health_report(full_nodes); + auto fm = create_follower_metrics(); + + auto reassignments = planner.get_ntp_reassignments(hr, fm); + + BOOST_REQUIRE_EQUAL(reassignments.size(), 2); + + std::unordered_set expected_nodes( + {model::node_id(3), model::node_id(4), model::node_id(5)}); + + auto new_replicas_1 + = reassignments[0].allocation_units.get_assignments().front().replicas; + BOOST_REQUIRE_EQUAL(new_replicas_1.size(), expected_nodes.size()); + check_expected_assignments(new_replicas_1, expected_nodes); + + auto new_replicas_2 + = reassignments[1].allocation_units.get_assignments().front().replicas; + check_expected_assignments(new_replicas_2, expected_nodes); +} + +/* + * 5 nodes; 1 topic; 1 node down; 1 node full; + * Can move only 2 topics by one operation + * Actual + * node_0: partitions: 1; down: True; disk: unfilled; + * node_1: partitions: 1; down: False; disk: full; + * node_2: partitions: 1; down: False; disk: unfilled; + * node_3: partitions: 0; down: False; disk: unfilled; + * node_4: partitions: 0; down: False; disk: unfilled; + * Expected + * node_0: partitions: 0; + * node_1: partitions: 0; + * node_2: partitions: 1; + * node_3: partitions: 1; + * node_4: partitions: 1; + */ +FIXTURE_TEST( + test_one_node_down_one_node_full, partition_balancer_planner_fixture) { + vlog(logger.debug, "test_one_node_down_one_node_full"); + allocator_register_nodes(3); + create_topic("topic-1", 1, 3); + allocator_register_nodes(2); + + std::set full_nodes = {1}; + auto hr = create_health_report(full_nodes); + + std::set unavailable_nodes = {0}; + auto fm = create_follower_metrics(unavailable_nodes); + + auto reassignments = planner.get_ntp_reassignments(hr, fm); + + BOOST_REQUIRE_EQUAL(reassignments.size(), 1); + + std::unordered_set expected_nodes( + {model::node_id(2), model::node_id(3), model::node_id(4)}); + + auto new_replicas + = reassignments[0].allocation_units.get_assignments().front().replicas; + check_expected_assignments(new_replicas, expected_nodes); +} + +/* + * 5 nodes; 1 topic; 3 node full; + * Can move part of replicas + * Actual + * node_0: partitions: 1; down: False; disk: full; + * node_1: partitions: 1; down: False; disk: full; + * node_2: partitions: 1; down: False; disk: full; + * node_3: partitions: 0; down: False; disk: unfilled; + * node_4: partitions: 0; down: False; disk: unfilled; + * Expected + * node_0: partitions: 1; + * node_1: partitions: 0; + * node_2: partitions: 0; + * node_3: partitions: 1; + * node_4: partitions: 1; + */ +FIXTURE_TEST(test_move_part_of_replicas, partition_balancer_planner_fixture) { + vlog(logger.debug, "test_move_part_of_replicas"); + allocator_register_nodes(3); + create_topic("topic-1", 1, 3); + allocator_register_nodes(2); + + std::set full_nodes = {0, 1, 2}; + auto hr = create_health_report(full_nodes); + + auto fm = create_follower_metrics(); + + // Set order of full nodes + hr.node_reports[1].local_state.disks[0].free -= 1_MiB; + hr.node_reports[2].local_state.disks[0].free -= 2_MiB; + + auto reassignments = planner.get_ntp_reassignments(hr, fm); + + BOOST_REQUIRE_EQUAL(reassignments.size(), 1); + + std::unordered_set expected_nodes( + {model::node_id(0), model::node_id(3), model::node_id(4)}); + + auto new_replicas + = reassignments[0].allocation_units.get_assignments().front().replicas; + check_expected_assignments(new_replicas, expected_nodes); +} + +/* + * 3 nodes; 2 topic; 2 node full; + * Movement should be from more filled node + * Actual + * node_0: topics: topic-1; partitions: 3; down: False; disk: full; + * node_1: topics: topic-2; partitions: 1; down: False; disk: full; + * node_2: partitions: 0; down: False; disk: unfilled; + * Expected + * node_0: topics: topic-1; partitions: 1; + * node_1: topics: topic-2; partitions: 1; + * node_2: topics: topic-1; partitions: 2; + */ +FIXTURE_TEST( + test_movement_from_more_filled_node, partition_balancer_planner_fixture) { + vlog(logger.debug, "test_movement_from_more_filled_node"); + allocator_register_nodes(1); + create_topic("topic-1", 3, 1); + allocator_register_nodes(1); + create_topic("topic-2", 1, 1); + allocator_register_nodes(1); + + std::set full_nodes = {0, 1}; + auto hr = create_health_report(full_nodes); + auto fm = create_follower_metrics(); + + // Set order of full nodes + hr.node_reports[0].local_state.disks[0].free -= 1_MiB; + + // Set partition sizes + for (auto& topic : hr.node_reports[0].topics) { + if (topic.tp_ns.tp == "topic-1") { + for (auto& partition : topic.partitions) { + if (partition.id == 1) { + partition.size_bytes = default_partition_size - 1_KiB; + } + if (partition.id == 2) { + partition.size_bytes = default_partition_size - 2_KiB; + } + } + } + } + + auto reassignments = planner.get_ntp_reassignments(hr, fm); + + BOOST_REQUIRE_EQUAL(reassignments.size(), 2); + std::unordered_set expected_nodes({model::node_id(2)}); + + auto new_replicas_1 + = reassignments[0].allocation_units.get_assignments().front().replicas; + + check_expected_assignments(new_replicas_1, expected_nodes); + // First move less size node + BOOST_REQUIRE_EQUAL(reassignments[0].ntp.tp.topic, "topic-1"); + BOOST_REQUIRE_EQUAL(reassignments[0].ntp.tp.partition, 2); + + auto new_replicas_2 + = reassignments[1].allocation_units.get_assignments().front().replicas; + check_expected_assignments(new_replicas_2, expected_nodes); + BOOST_REQUIRE_EQUAL(reassignments[1].ntp.tp.topic, "topic-1"); + BOOST_REQUIRE_EQUAL(reassignments[1].ntp.tp.partition, 1); +} + +/* + * 5 nodes; 1 topic; 1000 partitions; 1 node down; + * Partition size 10_KiB + * Batch size 19_MiB + * Can move 1900 partitions + * Actual + * node_0: partitions: 2000; down: True; disk: unfilled; + * node_1: partitions: 2000; down: False; disk: unfilled; + * node_2: partitions: 2000; down: False; disk: unfilled; + * node_3: partitions: 0; down: False; disk: unfilled; + * node_4: partitions: 0; down: False; disk: unfilled; + * Expected + * node_0: partitions: 100; + * node_1: partitions: 2000; + * node_2: partitions: 2000; + * node_3: partitions: 950; + * node_4: partitions: 950; + */ +FIXTURE_TEST(test_lot_of_partitions, partition_balancer_planner_fixture) { + vlog(logger.debug, "test_lot_of_partitions"); + allocator_register_nodes(3); + create_topic("topic-1", 2000, 3); + allocator_register_nodes(2); + + uint64_t local_partition_size = 10_KiB; + uint64_t movement_batch_partitions_amount = (reallocation_batch_size + + local_partition_size - 1) + / local_partition_size; + + auto hr = create_health_report({}, {}, local_partition_size); + + std::set unavailable_nodes = {0}; + auto fm = create_follower_metrics(unavailable_nodes); + + auto reassignments = planner.get_ntp_reassignments(hr, fm); + BOOST_REQUIRE_EQUAL(reassignments.size(), movement_batch_partitions_amount); + + std::unordered_set expected_nodes( + {model::node_id(1), + model::node_id(2), + model::node_id(3), + model::node_id(4)}); + + size_t node_3_counter = 0; + size_t node_4_counter = 0; + + for (auto& reassignment : reassignments) { + auto new_replicas + = reassignment.allocation_units.get_assignments().front().replicas; + BOOST_REQUIRE(new_replicas.size() == 3); + for (const auto r : new_replicas) { + BOOST_REQUIRE(expected_nodes.contains(r.node_id)); + if (r.node_id == model::node_id(3)) { + node_3_counter += 1; + } + if (r.node_id == model::node_id(4)) { + node_4_counter += 1; + } + } + } + + BOOST_REQUIRE_EQUAL(node_3_counter, node_4_counter); + BOOST_REQUIRE_EQUAL(node_4_counter, movement_batch_partitions_amount / 2); +} \ No newline at end of file