diff --git a/src/v/cluster/cluster_utils.h b/src/v/cluster/cluster_utils.h index 4aa9380e47b7..371cd2f4527f 100644 --- a/src/v/cluster/cluster_utils.h +++ b/src/v/cluster/cluster_utils.h @@ -271,4 +271,14 @@ inline std::vector subtract_replica_sets( }); return ret; } + +// check if replica set contains a node +inline bool contains_node( + const std::vector& replicas, model::node_id id) { + return std::find_if( + replicas.begin(), + replicas.end(), + [id](const model::broker_shard& bs) { return bs.node_id == id; }) + != replicas.end(); +} } // namespace cluster diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index 3d0bce521fde..6635c21c38ea 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -90,6 +90,31 @@ std::vector create_brokers_set( return brokers; } +std::vector create_brokers_set( + const std::vector& replicas, + const absl::flat_hash_map& + replica_revisions, + cluster::members_table& members) { + std::vector brokers; + brokers.reserve(replicas.size()); + + std::transform( + std::cbegin(replicas), + std::cend(replicas), + std::back_inserter(brokers), + [&members, &replica_revisions](const model::broker_shard& bs) { + auto br = members.get_broker(bs.node_id); + if (!br) { + throw std::logic_error( + fmt::format("Replica node {} is not available", bs.node_id)); + } + return raft::broker_revision{ + .broker = *br->get(), + .rev = replica_revisions.find(bs.node_id)->second}; + }); + return brokers; +} + std::optional get_target_shard( model::node_id id, const std::vector& replicas) { auto it = std::find_if( @@ -519,6 +544,29 @@ find_interrupting_operation(deltas_t::iterator current_it, deltas_t& deltas) { } }); } +ss::future revert_configuration_update( + const model::ntp& ntp, + const std::vector& replicas, + model::revision_id rev, + ss::lw_shared_ptr p, + members_table& members, + topic_table& topics) { + auto in_progress_it = topics.in_progress_updates().find(ntp); + // no longer in progress + if (in_progress_it == topics.in_progress_updates().end()) { + co_return errc::success; + } + auto brokers = create_brokers_set( + replicas, in_progress_it->second.replicas_revisions, members); + vlog( + clusterlog.debug, + "reverting already finished reconfiguration of {}, revision: {}. Replica " + "set: {} ", + ntp, + rev, + replicas); + co_return co_await p->update_replica_set(std::move(brokers), rev); +} } // namespace ss::future<> controller_backend::reconcile_ntp(deltas_t& deltas) { @@ -1197,21 +1245,25 @@ ss::future controller_backend::cancel_replica_set_update( replicas, rev, [this, &ntp, rev, replicas](ss::lw_shared_ptr p) { + const auto current_cfg = p->group_configuration(); + // we do not have to request update/cancellation twice + if (current_cfg.revision_id() == rev) { + return ss::make_ready_future( + errc::waiting_for_recovery); + } + const auto raft_cfg_update_finished - = are_configuration_replicas_up_to_date( - p->group_configuration(), replicas); + = current_cfg.type() == raft::configuration_type::simple; // raft already finished its part, we need to move replica back if (raft_cfg_update_finished) { - auto brokers = create_brokers_set( - replicas, _members_table.local()); - vlog( - clusterlog.debug, - "raft reconfiguration finished, moving partition {} " - "configuration back to requested state: {}", + return revert_configuration_update( ntp, - replicas); - return p->update_replica_set(std::move(brokers), rev); + replicas, + rev, + std::move(p), + _members_table.local(), + _topics.local()); } else { vlog( clusterlog.debug, @@ -1233,21 +1285,42 @@ ss::future controller_backend::force_abort_replica_set_update( if (!partition) { co_return errc::partition_not_exists; } + const auto current_cfg = partition->group_configuration(); - const auto raft_cfg_update_finished = are_configuration_replicas_up_to_date( - partition->group_configuration(), replicas); - if (raft_cfg_update_finished) { - co_return co_await update_partition_replica_set(ntp, replicas, rev); - } else { - // wait for configuration update, only declare success - // when configuration was actually updated - auto update_ec = check_configuration_update( - _self, partition, replicas, rev); + // wait for configuration update, only declare success + // when configuration was actually updated + auto update_ec = check_configuration_update( + _self, partition, replicas, rev); - if (!update_ec) { - co_return errc::success; - } + if (!update_ec) { + co_return errc::success; + } + + // we do not have to request update/cancellation twice + if (current_cfg.revision_id() == rev) { + co_return errc::waiting_for_recovery; + } + + const auto raft_cfg_update_finished = current_cfg.type() + == raft::configuration_type::simple; + + if (raft_cfg_update_finished) { + co_return co_await apply_configuration_change_on_leader( + ntp, + replicas, + rev, + [this, rev, &replicas, &ntp]( + ss::lw_shared_ptr p) { + return revert_configuration_update( + ntp, + replicas, + rev, + std::move(p), + _members_table.local(), + _topics.local()); + }); + } else { auto ec = co_await partition->force_abort_replica_set_update(rev); if (ec) { diff --git a/src/v/cluster/controller_backend.h b/src/v/cluster/controller_backend.h index cbdb850eac00..2febe8dd27d5 100644 --- a/src/v/cluster/controller_backend.h +++ b/src/v/cluster/controller_backend.h @@ -30,8 +30,190 @@ namespace cluster { -/// on every core, sharded - +/** + * + * # Reconciliation + * + * Controller backend is responsible for making sure that the cluster state is + * in align with the topic and partition state gathered in topic_table. + * + * Controller backend lives on each core on every node in the cluster. Each + * instance of controller backend is responsible for dealing with core & node + * local partition replicas (instances of `cluster::partition` object that are + * supposed to be instantiated on given core and given node). Controller backend + * manages partition replica lifecycle. It instantiates/deletes + * `cluster::partition` instances and registers them in shard table. + * + * Controller backend operations are driven by deltas generated in topics table. + * Backend waits for the new deltas using condition variable. Each delta + * represent an operation that must be executed for ntp f.e. create, update + * properties, move, etc. + * + * Each controller backend in the cluster (on each node and each core) process + * all the deltas and based on the situation it either executes an operation or + * ignore it (command pattern). + * + * Deltas vector for each NTP is processed in separate fiber in other words + * deltas for different NTPs are executed concurrently but for the same NTP + * sequentially. + * + * Each delta has revision assigned revision for the delta is assigned based on + * the raft0 log offset of command that the delta is related with. The same + * delta has the same revision globally. + * + * Deltas are executed in order from oldest revision up to the newest. + * + * + * NTP_1 + * Loop until finished or cancelled + * + * ┌──────────────────┐ + * │ │ + * │ │ + * ┌────────────┐ ┌────────────┐ ┌────────────┐ │ ┌────────────┐ │ + * │ delta │ │ delta │ │ delta │ │ │ delta │ │ + * │ │ │ │ │ ├──► └─►│ ├──┘ + * │ revision: 3│ │ revision: 2│ │ revision: 1│ │ revision: 0│ + * └────────────┘ └────────────┘ └────────────┘ └────────────┘ + * + * . + * . + * . + * NTP_N + * Loop until finished or cancelled + * + * ┌──────────────────┐ + * │ │ + * │ │ + * ┌────────────┐ ┌────────────┐ ┌────────────┐ │ ┌────────────┐ │ + * │ delta │ │ delta │ │ delta │ │ │ delta │ │ + * │ │ │ │ │ ├──► └─►│ ├──┘ + * │ revision: 3│ │ revision: 2│ │ revision: 1│ │ revision: 0│ + * └────────────┘ └────────────┘ └────────────┘ └────────────┘ + * + * # Revisions + * + * As each reconciliation loops are not coordinated we must be able to recognize + * epochs. Consider a situation in which a stream of deltas executed by the + * backend leads to the state which is identical from end user perspective f.e. + * topic with the same name and configuration was deleted and then created back + * again. We must be able to recognize if the instance of partition replica that + * has been created for the topic belongs to the original topic or the one that + * was re created. In order to introduce differentiation between the two not + * distinguishable states we use revision_id as an epoch. Revision is used + * whenever partition is created or its replicas are moved. This way controller + * backend is able to recognize if partition replicas have already been updated + * or if action is required. + * + * ## Revisions and raft vnode + * + * Whenever a new replica is added to raft configuration it has new revision + * assigned. In raft each raft group participant is described by a tuple of + * model::node_id and model::revision_id. This way every time the node is re + * added to the configuration (consider a situation in which partition with + * single replica is moved back and forth between two nodes f.e. 1 -> 2 -> 1 + * -> 2...) it is recognized as a new node. This fencing mechanism prevents the + * up to date raft group replicas from communicating with one from previous + * epoch. + * + * # Partition movement + * + * Partition movement in Redpanda is based on the Raft protocol mechanism called + * Joint Consensus. When requested Raft implementation is able to move data + * between nodes in a safe and consistent way. However requesting Raft to + * reconfigure a raft group is not enough to complete a partition move. When + * partition move is requested based on the current situation some of the + * controller backend may have to create new partition replica instances while + * other have to delete the one that are not longer part of raft group. + * Additionally there may be a need to move partition instance between cores on + * the same node. + * + * Every time partition move is requested each reconciliation loop executes an + * operation based on current and requested state and poll for its completion. + * + * Partition movement finish is coordinated using a designated finish command. + * + * Partition movement finish command is replicated from one of the replicas that + * was changed during reconfiguration process. + * + * IMPORTANT: + * Partition replicas are only deleted when executing delta for operation + * finished command. This way when partition replica is deleted it is guaranteed + * to not longer be needed. + * + * Example: + * + * Consider moving partition between a set of nodes: + * + * replicas on nodes (1,2,3) -> replicas on nodes (2,3,4) + * + * (for simplicity we ignore core assignment in this example) + * + * Assumptions: + * - node 1 is a leader for the partition. + * + * Operations that has to be executed on every node: + * + * Node 1: + * - node 1 is a leader, leader is the only one that can replicate data so it + * will be asked for reconfiguration + * - after partition replica is not longer needed on this node it may be removed + * + * Node 2 & 3: + * - node 2 will wait until configuration will be up to date with requested. In + * case leadership from node 1 moved it will ask for reconfiguration + * + * Node 4: + * - node 4 will create a new instance of partition replica and wait for the + * configuration to be up to date. + * - after successful reconfiguration node 4 will dispatch finish update command + * + * + * When finish update command will be received by node 1 it will remove the + * partition replica instance. + * + * + * ## Interrupting partition movement + * + * Partition movement interruption may only be accepted after topic table + * processed move command but before the finish update command was processed. We + * use topics table as a single source of truth to decide if the update may + * still be canceled or if it has finished. This way we must be able to revert + * configuration change even if raft already finished reconfiguration. + * + * Partition move interruption does not mark the reconfiguration process as + * finished i.e. it will still be represented as in progress when queried from + * topic table. Move interruption will only finish when reconfiguration is + * finished in raft and finish move command is issued by the controller backend + * + * In general the interrupt may happen in the following situations: + * + * 1) before raft reconfiguration was requested + * 2) when raft reconfiguration is in progress + * 3) when raft reconfiguration has already finished but before finish command + * was replicated + * + * In all of the situations we must move back to the raft group configuration + * which was active before the move was scheduled. The set of actions that must + * be taken to finish the interruption is different based on the situation in + * which interruption happened. + * + * For 1) controller backend must simply update raft configuration revision to + * be able to decide if action related with given revision_id has been executed. + * + * For 2) controller backend with request reconfiguration cancellation on a + * leader and will wait until raft configuration is up to date with what was + * observed before the move. Any replicas that were created for the purpose of + * move will be removed when processing finished move command. + * + * For 3) controller backend must request reconfiguration with the same exact + * replica set as before the move was requested. It is important to notice that + * no partition replicas were yet removed as finish command wasn't yet + * processed. Since cancelling partition move does not create new partition + * replica instances (instances of `cluster::partition`) but reuse the existing + * one we must reuse revision id of currently existing replica instances. + * + */ class controller_backend : public ss::peering_sharded_service { public: diff --git a/src/v/cluster/members_backend.cc b/src/v/cluster/members_backend.cc index 65c4f57d03ab..f69505b91d60 100644 --- a/src/v/cluster/members_backend.cc +++ b/src/v/cluster/members_backend.cc @@ -118,9 +118,9 @@ void members_backend::handle_single_update( vlog(clusterlog.debug, "membership update received: {}", update); switch (update.type) { case update_t::recommissioned: - // if node was recommissioned simply remove all decommissioning - // updates handle_recommissioned(update); + _updates.emplace_back(update); + _new_updates.signal(); return; case update_t::reallocation_finished: handle_reallocation_finished(update.id); @@ -132,6 +132,8 @@ void members_backend::handle_single_update( return; case update_t::decommissioned: stop_node_addition(update.id); + _decommission_command_revision.emplace( + update.id, model::revision_id(update.offset)); _updates.emplace_back(update); _new_updates.signal(); return; @@ -171,6 +173,9 @@ void members_backend::calculate_reallocations(update_meta& meta) { case members_manager::node_update_type::added: calculate_reallocations_after_node_added(meta); return; + case members_manager::node_update_type::recommissioned: + calculate_reallocations_after_recommissioned(meta); + return; default: return; } @@ -331,11 +336,80 @@ void members_backend::calculate_reallocations_after_node_added( } } +std::vector members_backend::ntps_moving_from_node_older_than( + model::node_id node, model::revision_id revision) const { + std::vector ret; + + for (const auto& [ntp, state] : _topics.local().in_progress_updates()) { + if (state.update_revision < revision) { + continue; + } + if (!contains_node(state.previous_replicas, node)) { + continue; + } + + auto current_assignment = _topics.local().get_partition_assignment(ntp); + if (unlikely(!current_assignment)) { + continue; + } + + if (!contains_node(current_assignment->replicas, node)) { + ret.push_back(ntp); + } + } + return ret; +} + +void members_backend::calculate_reallocations_after_recommissioned( + update_meta& meta) const { + auto it = _decommission_command_revision.find(meta.update.id); + vassert( + it != _decommission_command_revision.end(), + "members backend should hold a revision of nodes being decommissioned, " + "node_id: {}", + meta.update.id); + auto ntps = ntps_moving_from_node_older_than(meta.update.id, it->second); + // reallocate all partitions for which any of replicas is placed on + // decommissioned node + meta.partition_reallocations.reserve(ntps.size()); + for (auto& ntp : ntps) { + partition_reallocation reallocation(ntp); + reallocation.state = reallocation_state::request_cancel; + auto current_assignment = _topics.local().get_partition_assignment(ntp); + auto previous_replica_set = _topics.local().get_previous_replica_set( + ntp); + if ( + !current_assignment.has_value() + || !previous_replica_set.has_value()) { + continue; + } + reallocation.current_replica_set = std::move( + current_assignment->replicas); + reallocation.new_replica_set = std::move(*previous_replica_set); + + meta.partition_reallocations.push_back(std::move(reallocation)); + } +} + ss::future<> members_backend::reconcile() { // if nothing to do, wait co_await _new_updates.wait([this] { return !_updates.empty(); }); auto u = co_await _lock.get_units(); - + // remove stored revisions of previous decommissioning nodes, this will only + // happen when update is finished and it is either decommissioning or + // recommissioning of a node + for (const auto& meta : _updates) { + const bool is_decommission + = meta.update.type + == members_manager::node_update_type::decommissioned; + const bool is_recommission + = meta.update.type + == members_manager::node_update_type::recommissioned; + + if (meta.finished && (is_decommission || is_recommission)) { + _decommission_command_revision.erase(meta.update.id); + } + } // remove finished updates std::erase_if( _updates, [](const update_meta& meta) { return meta.finished; }); @@ -450,7 +524,6 @@ ss::future<> members_backend::reconcile() { const auto allocator_empty = _allocator.local().is_empty( meta.update.id); - if ( is_draining && all_reallocations_finished && allocator_empty && !updates_in_progress) { @@ -478,6 +551,15 @@ ss::future<> members_backend::reconcile() { all_reallocations_finished, allocator_empty, updates_in_progress); + if (!allocator_empty && all_reallocations_finished) { + // recalculate reallocations + vlog( + clusterlog.info, + "[update: {}] decommissioning in progress. recalculating " + "reallocations", + meta.update); + calculate_reallocations(meta); + } } } } @@ -641,6 +723,11 @@ ss::future<> members_backend::reallocate_replica_set( meta.current_replica_set, meta.new_replica_set, error.message()); + if (error == errc::no_update_in_progress) { + // mark reallocation as finished, reallocations will be + // recalculated if required + meta.state = reallocation_state::finished; + } co_return; } // success, update state and move on diff --git a/src/v/cluster/members_backend.h b/src/v/cluster/members_backend.h index aaae37e8b834..918ac7742361 100644 --- a/src/v/cluster/members_backend.h +++ b/src/v/cluster/members_backend.h @@ -10,6 +10,7 @@ #include +#include #include #include @@ -98,6 +99,9 @@ class members_backend { void reassign_replicas(partition_assignment&, partition_reallocation&); void calculate_reallocations_after_node_added(update_meta&) const; void calculate_reallocations_after_decommissioned(update_meta&) const; + void calculate_reallocations_after_recommissioned(update_meta&) const; + std::vector ntps_moving_from_node_older_than( + model::node_id, model::revision_id) const; void setup_metrics(); ss::sharded& _topics_frontend; ss::sharded& _topics; @@ -118,6 +122,14 @@ class members_backend { ss::timer<> _retry_timer; ss::condition_variable _new_updates; ss::metrics::metric_groups _metrics; + /** + * store revision of node decommissioning update, decommissioning command + * revision is stored when node is being decommissioned, it is used to + * determine which partition movements were scheduled before the node was + * decommissioned, recommissioning process will not abort those movements. + */ + absl::flat_hash_map + _decommission_command_revision; }; std::ostream& operator<<(std::ostream&, const members_backend::reallocation_state&); diff --git a/src/v/cluster/members_manager.cc b/src/v/cluster/members_manager.cc index ba0d8df25051..ac09f17569bc 100644 --- a/src/v/cluster/members_manager.cc +++ b/src/v/cluster/members_manager.cc @@ -226,12 +226,15 @@ members_manager::apply_update(model::record_batch b) { [this, update_offset](decommission_node_cmd cmd) mutable { auto id = cmd.key; return dispatch_updates_to_cores(update_offset, cmd) - .then([this, id](std::error_code error) { + .then([this, id, update_offset](std::error_code error) { auto f = ss::now(); if (!error) { _allocator.local().decommission_node(id); f = _update_queue.push_eventually(node_update{ - .id = id, .type = node_update_type::decommissioned}); + .id = id, + .type = node_update_type::decommissioned, + .offset = update_offset, + }); } return f.then([error] { return error; }); }); @@ -239,23 +242,27 @@ members_manager::apply_update(model::record_batch b) { [this, update_offset](recommission_node_cmd cmd) mutable { auto id = cmd.key; return dispatch_updates_to_cores(update_offset, cmd) - .then([this, id](std::error_code error) { + .then([this, id, update_offset](std::error_code error) { auto f = ss::now(); if (!error) { _allocator.local().recommission_node(id); f = _update_queue.push_eventually(node_update{ - .id = id, .type = node_update_type::recommissioned}); + .id = id, + .type = node_update_type::recommissioned, + .offset = update_offset}); } return f.then([error] { return error; }); }); }, - [this](finish_reallocations_cmd cmd) mutable { + [this, update_offset](finish_reallocations_cmd cmd) mutable { // we do not have to dispatch this command to members table since this // command is only used by a backend to signal successfully finished // node reallocations return _update_queue .push_eventually(node_update{ - .id = cmd.key, .type = node_update_type::reallocation_finished}) + .id = cmd.key, + .type = node_update_type::reallocation_finished, + .offset = update_offset}) .then([] { return make_error_code(errc::success); }); }, [this, update_offset](maintenance_mode_cmd cmd) { diff --git a/src/v/cluster/members_manager.h b/src/v/cluster/members_manager.h index 37c019dce75a..6743f709e686 100644 --- a/src/v/cluster/members_manager.h +++ b/src/v/cluster/members_manager.h @@ -53,6 +53,12 @@ class members_manager { model::node_id id; node_update_type type; model::offset offset; + + bool is_commissioning() const { + return type == members_manager::node_update_type::decommissioned + || type == members_manager::node_update_type::recommissioned; + } + friend std::ostream& operator<<(std::ostream&, const node_update&); }; diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 0b3dc9ad47e3..8263b9751761 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -159,6 +159,13 @@ class partition { std::move(brokers), new_revision_id); } + ss::future update_replica_set( + std::vector brokers, + model::revision_id new_revision_id) { + return _raft->replace_configuration( + std::move(brokers), new_revision_id); + } + raft::group_configuration group_configuration() const { return _raft->config(); } diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc index cf77d6117eb0..bdbb73a28036 100644 --- a/src/v/cluster/partition_balancer_planner.cc +++ b/src/v/cluster/partition_balancer_planner.cc @@ -305,7 +305,7 @@ void partition_balancer_planner::get_unavailable_nodes_reassignments( continue; } auto new_allocation_units = get_reallocation( - a, t.second, partition_size.value(), false, rrs); + a, t.second.metadata, partition_size.value(), false, rrs); if (new_allocation_units) { result.reassignments.emplace_back(ntp_reassignments{ .ntp = ntp, @@ -384,7 +384,7 @@ void partition_balancer_planner::get_full_node_reassignments( } auto new_allocation_units = get_reallocation( *current_assignments, - topic_metadata, + topic_metadata.metadata, ntp_size_it->first, true, rrs); diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 9a2f1558bfed..9ad44ca5efce 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -25,16 +25,16 @@ namespace cluster { template -std::vector> +std::vector> topic_table::transform_topics(Func&& f) const { - std::vector> ret; + std::vector> ret; ret.reserve(_topics.size()); std::transform( std::cbegin(_topics), std::cend(_topics), std::back_inserter(ret), [f = std::forward(f)]( - const std::pair& p) { + const std::pair& p) { return f(p.second); }); return ret; @@ -47,22 +47,30 @@ topic_table::apply(create_topic_cmd cmd, model::offset offset) { return ss::make_ready_future( errc::topic_already_exists); } - // calculate delta - for (auto& pas : cmd.value.assignments) { - auto ntp = model::ntp(cmd.key.ns, cmd.key.tp, pas.id); - _pending_deltas.emplace_back( - std::move(ntp), pas, offset, delta::op_type::add); - } std::optional remote_revision = cmd.value.cfg.properties.remote_topic_properties ? std::make_optional( cmd.value.cfg.properties.remote_topic_properties->remote_revision) : std::nullopt; + auto md = topic_metadata_item{ + .metadata = topic_metadata( + std::move(cmd.value), model::revision_id(offset()), remote_revision)}; + // calculate delta + md.replica_revisions.reserve(cmd.value.assignments.size()); + for (auto& pas : md.get_assignments()) { + auto ntp = model::ntp(cmd.key.ns, cmd.key.tp, pas.id); + for (auto& r : pas.replicas) { + md.replica_revisions[pas.id][r.node_id] = model::revision_id( + offset); + } + _pending_deltas.emplace_back( + std::move(ntp), pas, offset, delta::op_type::add); + } - _topics.insert( - {cmd.key, - topic_metadata( - std::move(cmd.value), model::revision_id(offset()), remote_revision)}); + _topics.insert({ + cmd.key, + std::move(md), + }); notify_waiters(); return ss::make_ready_future(errc::success); } @@ -132,6 +140,10 @@ topic_table::apply(create_partition_cmd cmd, model::offset offset) { tp->second.get_assignments().emplace(p_as); // propagate deltas auto ntp = model::ntp(cmd.key.ns, cmd.key.tp, p_as.id); + for (auto& bs : p_as.replicas) { + tp->second.replica_revisions[p_as.id][bs.node_id] + = model::revision_id(offset); + } _pending_deltas.emplace_back( std::move(ntp), std::move(p_as), offset, delta::op_type::add); } @@ -169,16 +181,41 @@ topic_table::apply(move_partition_replicas_cmd cmd, model::offset o) { if (are_replica_sets_equal(current_assignment_it->replicas, cmd.value)) { return ss::make_ready_future(errc::success); } + auto revisions_it = tp->second.replica_revisions.find(cmd.key.tp.partition); + vassert( + revisions_it != tp->second.replica_revisions.end(), + "partition {}, replica revisions map must exists as partition is present", + cmd.key); _updates_in_progress.emplace( cmd.key, in_progress_update{ .previous_replicas = current_assignment_it->replicas, .state = in_progress_state::update_requested, + .update_revision = model::revision_id(o), + // snapshot replicas revisions + .replicas_revisions = revisions_it->second, }); auto previous_assignment = *current_assignment_it; // replace partition replica set current_assignment_it->replicas = cmd.value; + /** + * Update partition replica revisions. Assign new revision to added replicas + * and erase replicas which are removed from replica set + */ + auto added_replicas = subtract_replica_sets( + current_assignment_it->replicas, previous_assignment.replicas); + + for (auto& r : added_replicas) { + revisions_it->second[r.node_id] = model::revision_id(o); + } + + auto removed_replicas = subtract_replica_sets( + previous_assignment.replicas, current_assignment_it->replicas); + + for (auto& removed : removed_replicas) { + revisions_it->second.erase(removed.node_id); + } /// Update all non_replicable topics to have the same 'in-progress' state auto found = _topics_hierarchy.find(model::topic_namespace_view(cmd.key)); @@ -190,6 +227,7 @@ topic_table::apply(move_partition_replicas_cmd cmd, model::offset o) { in_progress_update{ .previous_replicas = current_assignment_it->replicas, .state = in_progress_state::update_requested, + .update_revision = model::revision_id(o), }); vassert( success, @@ -350,6 +388,13 @@ topic_table::apply(cancel_moving_partition_replicas_cmd cmd, model::offset o) { auto replicas = current_assignment_it->replicas; // replace replica set with set from in progress operation current_assignment_it->replicas = in_progress_it->second.previous_replicas; + auto revisions_it = tp->second.replica_revisions.find(cmd.key.tp.partition); + vassert( + revisions_it != tp->second.replica_revisions.end(), + "partition {} replica revisions map must exists", + cmd.key); + + revisions_it->second = in_progress_it->second.replicas_revisions; /// Update all non_replicable topics to have the same 'in-progress' state auto found = _topics_hierarchy.find(model::topic_namespace_view(cmd.key)); @@ -553,10 +598,14 @@ topic_table::apply(create_non_replicable_topic_cmd cmd, model::offset o) { success, "Duplicate non_replicable_topic detected when it shouldn't exist"); } + auto md = topic_metadata( + std::move(cfg), std::move(p_as), model::revision_id(o()), source.tp); + _topics.insert( {new_non_rep_topic, - topic_metadata( - std::move(cfg), std::move(p_as), model::revision_id(o()), source.tp)}); + topic_metadata_item{ + .metadata = std::move(md), + }}); notify_waiters(); co_return make_error_code(errc::success); } @@ -624,8 +673,9 @@ topic_table::wait_for_changes(ss::abort_source& as) { } std::vector topic_table::all_topics() const { - return transform_topics( - [](const topic_metadata& tp) { return tp.get_configuration().tp_ns; }); + return transform_topics([](const topic_metadata_item& tp) { + return tp.get_configuration().tp_ns; + }); } size_t topic_table::all_topics_count() const { return _topics.size(); } @@ -633,14 +683,14 @@ size_t topic_table::all_topics_count() const { return _topics.size(); } std::optional topic_table::get_topic_metadata(model::topic_namespace_view tp) const { if (auto it = _topics.find(tp); it != _topics.end()) { - return it->second; + return it->second.metadata; } return {}; } std::optional> topic_table::get_topic_metadata_ref(model::topic_namespace_view tp) const { if (auto it = _topics.find(tp); it != _topics.end()) { - return it->second; + return it->second.metadata; } return {}; } @@ -724,6 +774,58 @@ topic_table::get_previous_replica_set(const model::ntp& ntp) const { return std::nullopt; } +std::vector +topic_table::ntps_moving_to_node(model::node_id node) const { + std::vector ret; + + for (const auto& [ntp, state] : _updates_in_progress) { + if (contains_node(state.previous_replicas, node)) { + continue; + } + + auto current_assignment = get_partition_assignment(ntp); + if (unlikely(!current_assignment)) { + continue; + } + + if (contains_node(current_assignment->replicas, node)) { + ret.push_back(ntp); + } + } + return ret; +} + +std::vector +topic_table::ntps_moving_from_node(model::node_id node) const { + std::vector ret; + + for (const auto& [ntp, state] : _updates_in_progress) { + if (!contains_node(state.previous_replicas, node)) { + continue; + } + + auto current_assignment = get_partition_assignment(ntp); + if (unlikely(!current_assignment)) { + continue; + } + + if (!contains_node(current_assignment->replicas, node)) { + ret.push_back(ntp); + } + } + return ret; +} + +std::vector topic_table::all_updates_in_progress() const { + std::vector ret; + ret.reserve(_updates_in_progress.size()); + for (const auto& [ntp, _] : _updates_in_progress) { + ret.push_back(ntp); + } + + return ret; +} + std::ostream& operator<<(std::ostream& o, topic_table::in_progress_state update) { switch (update) { @@ -736,4 +838,5 @@ operator<<(std::ostream& o, topic_table::in_progress_state update) { } __builtin_unreachable(); } + } // namespace cluster diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index 1ea2ac237c78..0e6f7dd9c869 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -41,17 +41,60 @@ class topic_table { cancel_requested, force_cancel_requested }; + /** + * Replicas revision map is used to track revision of brokers in a replica + * set. When a node is added into replica set its gets the revision assigned + */ + using replicas_revision_map + = absl::flat_hash_map; struct in_progress_update { std::vector previous_replicas; in_progress_state state; model::revision_id update_revision; + replicas_revision_map replicas_revisions; + }; + + struct topic_metadata_item { + topic_metadata metadata; + // replicas revisions for each partition + absl::node_hash_map + replica_revisions; + + bool is_topic_replicable() const { + return metadata.is_topic_replicable(); + } + + assignments_set& get_assignments() { + return metadata.get_assignments(); + } + + const assignments_set& get_assignments() const { + return metadata.get_assignments(); + } + model::revision_id get_revision() const { + return metadata.get_revision(); + } + std::optional get_remote_revision() const { + return metadata.get_remote_revision(); + } + const model::topic& get_source_topic() const { + return metadata.get_source_topic(); + } + + const topic_configuration& get_configuration() const { + return metadata.get_configuration(); + } + topic_configuration& get_configuration() { + return metadata.get_configuration(); + } }; + using delta = topic_table_delta; - using underlying_t = absl::flat_hash_map< + using underlying_t = absl::node_hash_map< model::topic_namespace, - topic_metadata, + topic_metadata_item, model::topic_namespace_hash, model::topic_namespace_eq>; using hierarchy_t = absl::node_hash_map< @@ -208,6 +251,23 @@ class topic_table { std::optional> get_previous_replica_set(const model::ntp&) const; + const absl::node_hash_map& + in_progress_updates() const { + return _updates_in_progress; + } + + /** + * Lists all NTPs that replicas are being move to a node + */ + std::vector ntps_moving_to_node(model::node_id) const; + + /** + * Lists all NTPs that replicas are being move from a node + */ + std::vector ntps_moving_from_node(model::node_id) const; + + std::vector all_updates_in_progress() const; + private: struct waiter { explicit waiter(uint64_t id) @@ -221,7 +281,7 @@ class topic_table { void notify_waiters(); template - std::vector> + std::vector> transform_topics(Func&&) const; underlying_t _topics; diff --git a/src/v/cluster/topic_updates_dispatcher.cc b/src/v/cluster/topic_updates_dispatcher.cc index 93f752f96928..2ee7fa3754bb 100644 --- a/src/v/cluster/topic_updates_dispatcher.cc +++ b/src/v/cluster/topic_updates_dispatcher.cc @@ -12,9 +12,13 @@ #include "cluster/cluster_utils.h" #include "cluster/commands.h" #include "cluster/partition_leaders_table.h" +#include "cluster/topic_table.h" +#include "model/fundamental.h" #include "model/metadata.h" #include "raft/types.h" +#include + #include #include #include @@ -37,19 +41,29 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { return ss::visit( std::move(cmd), [this, base_offset](delete_topic_cmd del_cmd) { - // delete case - we need state copy to - auto tp_md = _topic_table.local().get_topic_metadata( - del_cmd.value); + auto tp_ns = del_cmd.key; + auto topic_assignments + = _topic_table.local().get_topic_assignments(del_cmd.value); + in_progress_map in_progress; + + if (topic_assignments) { + in_progress = collect_in_progress( + del_cmd.key, *topic_assignments); + } return dispatch_updates_to_cores(del_cmd, base_offset) - .then([this, tp_md = std::move(tp_md)](std::error_code ec) { - if (ec == errc::success) { - vassert( - tp_md.has_value(), - "Topic had to exist before successful delete"); - deallocate_topic(*tp_md); - } - return ec; - }); + .then( + [this, + topic_assignments = std::move(topic_assignments), + in_progress = std::move(in_progress)](std::error_code ec) { + if (ec == errc::success) { + vassert( + topic_assignments.has_value(), + "Topic had to exist before successful delete"); + deallocate_topic(*topic_assignments, in_progress); + } + + return ec; + }); }, [this, base_offset](create_topic_cmd create_cmd) { return dispatch_updates_to_cores(create_cmd, base_offset) @@ -187,6 +201,22 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { }); }); } +topic_updates_dispatcher::in_progress_map +topic_updates_dispatcher::collect_in_progress( + const model::topic_namespace& tp_ns, + const assignments_set& current_assignments) { + in_progress_map in_progress; + in_progress.reserve(current_assignments.size()); + // collect in progress assignments + for (auto& p : current_assignments) { + auto previous = _topic_table.local().get_previous_replica_set( + model::ntp(tp_ns.ns, tp_ns.tp, p.id)); + if (previous) { + in_progress.emplace(p.id, std::move(previous.value())); + } + } + return in_progress; +} ss::future<> topic_updates_dispatcher::update_leaders_with_estimates( std::vector leaders) { @@ -250,10 +280,19 @@ topic_updates_dispatcher::dispatch_updates_to_cores(Cmd cmd, model::offset o) { }); } -void topic_updates_dispatcher::deallocate_topic(const topic_metadata& tp_md) { - // we have to deallocate topics - for (auto& p : tp_md.get_assignments()) { - _partition_allocator.local().deallocate(p.replicas); +void topic_updates_dispatcher::deallocate_topic( + const assignments_set& topic_assignments, + const in_progress_map& in_progress) { + for (auto& p_as : topic_assignments) { + _partition_allocator.local().deallocate(p_as.replicas); + auto it = in_progress.find(p_as.id); + + // we must remove the allocation that would normally + // be removed with update_finished request + if (it != in_progress.end()) { + auto to_delete = subtract_replica_sets(it->second, p_as.replicas); + _partition_allocator.local().remove_allocations(to_delete); + } } } diff --git a/src/v/cluster/topic_updates_dispatcher.h b/src/v/cluster/topic_updates_dispatcher.h index e383e7c59bd4..0c5f3a7cc44f 100644 --- a/src/v/cluster/topic_updates_dispatcher.h +++ b/src/v/cluster/topic_updates_dispatcher.h @@ -13,6 +13,8 @@ #include "cluster/commands.h" #include "cluster/scheduling/partition_allocator.h" #include "cluster/topic_table.h" +#include "cluster/types.h" +#include "model/fundamental.h" #include "model/record.h" #include @@ -71,6 +73,8 @@ class topic_updates_dispatcher { } private: + using in_progress_map = absl:: + node_hash_map>; template ss::future dispatch_updates_to_cores(Cmd, model::offset); @@ -78,7 +82,11 @@ class topic_updates_dispatcher { ss::future<> update_leaders_with_estimates(std::vector leaders); void update_allocations(std::vector); - void deallocate_topic(const topic_metadata&); + + void deallocate_topic(const assignments_set&, const in_progress_map&); + + in_progress_map + collect_in_progress(const model::topic_namespace&, const assignments_set&); ss::sharded& _partition_allocator; ss::sharded& _topic_table; diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index a372d5b6a4fe..43813d0e5931 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -229,7 +229,7 @@ get_topic_metadata(request_context& ctx, metadata_request& request) { authz_quiet{true})) { continue; } - res.push_back(make_topic_response(ctx, request, md)); + res.push_back(make_topic_response(ctx, request, md.metadata)); } return ss::make_ready_future>( diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index b30bff1c0977..453231188032 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -45,6 +45,7 @@ #include #include +#include template<> struct fmt::formatter final @@ -994,6 +995,18 @@ ss::future consensus::replace_configuration( }); } +ss::future consensus::replace_configuration( + std::vector new_brokers, + model::revision_id new_revision) { + return change_configuration( + [new_brokers = std::move(new_brokers), + new_revision](group_configuration current) mutable { + current.replace(std::move(new_brokers), new_revision); + current.set_revision(new_revision); + return result(std::move(current)); + }); +} + template ss::future consensus::interrupt_configuration_change(model::revision_id revision, Func f) { @@ -1020,12 +1033,23 @@ ss::future consensus::cancel_configuration_change(model::revision_id revision) { vlog( _ctxlog.info, - "requested revert of current configuration change - {}", + "requested cancellation of current configuration change - {}", config()); return interrupt_configuration_change( - revision, [revision](raft::group_configuration cfg) { - cfg.cancel_configuration_change(revision); - return cfg; + revision, + [revision](raft::group_configuration cfg) { + cfg.cancel_configuration_change(revision); + return cfg; + }) + .then([this](std::error_code ec) -> ss::future { + if (!ec) { + // current leader is not a voter, step down + if (!config().is_voter(_self)) { + auto u = co_await _op_lock.get_units(); + do_step_down("current leader is not voter"); + } + } + co_return ec; }); } diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 87f7323d6072..a852c819bc19 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -123,6 +123,11 @@ class consensus { // Replace configuration of raft group with given set of nodes ss::future replace_configuration(std::vector, model::revision_id); + /** + * Replace configuration, uses revision provided with brokers + */ + ss::future + replace_configuration(std::vector, model::revision_id); // Abort ongoing configuration change - may cause data loss ss::future abort_configuration_change(model::revision_id); // Revert current configuration change - this is safe and will never cause diff --git a/src/v/raft/group_configuration.cc b/src/v/raft/group_configuration.cc index 0f8790024b19..76fa0635dad6 100644 --- a/src/v/raft/group_configuration.cc +++ b/src/v/raft/group_configuration.cc @@ -313,6 +313,88 @@ void group_configuration::replace( } } +void group_configuration::replace( + std::vector brokers, model::revision_id rev) { + vassert(!_old, "can not replace joint configuration - {}", *this); + _revision = rev; + + /** + * If configurations are identical do nothing. For identical configuration + * we assume that brokers list hasn't changed (1) and current configuration + * contains all brokers in either voters of learners (2). + */ + // check list of brokers (1) + + // check if all brokers are assigned to current configuration (2) + bool brokers_are_equal + = brokers.size() == _brokers.size() + && std::all_of( + brokers.begin(), brokers.end(), [this](const broker_revision& b) { + // we may do linear lookup in _brokers collection as number of + // brokers is usually very small f.e. 3 or 5 + auto it = std::find_if( + _brokers.begin(), + _brokers.end(), + [&b](const model::broker& existing) { + return b.broker == existing; + }); + + return _current.contains(vnode(b.broker.id(), b.rev)) + && it != _brokers.end(); + }); + + // configurations are identical, do nothing + if (brokers_are_equal) { + return; + } + + _old = _current; + _current.learners.clear(); + _current.voters.clear(); + + for (auto& br : brokers) { + // check if broker is already a voter. voter will stay a voter + auto v_it = std::find_if( + _old->voters.cbegin(), _old->voters.cend(), [&br](const vnode& rni) { + return rni.id() == br.broker.id() && rni.revision() == br.rev; + }); + + if (v_it != _old->voters.cend()) { + _current.voters.push_back(*v_it); + continue; + } + + // check if broker was a learner. learner will stay a learner + auto l_it = std::find_if( + _old->learners.cbegin(), + _old->learners.cend(), + [&br](const vnode& rni) { + return rni.id() == br.broker.id() && rni.revision() == br.rev; + }); + + if (l_it != _old->learners.cend()) { + _current.learners.push_back(*l_it); + continue; + } + + // new broker, use broker revision + _current.learners.emplace_back(br.broker.id(), br.rev); + } + + // if both current and previous configurations are exactly the same, we do + // not need to enter joint consensus + if ( + _current.voters == _old->voters && _current.learners == _old->learners) { + _old.reset(); + } + + for (auto& b : brokers) { + if (!contains_broker(b.broker.id())) { + _brokers.push_back(std::move(b.broker)); + } + } +} + void group_configuration::promote_to_voter(vnode id) { auto it = std::find( _current.learners.cbegin(), _current.learners.cend(), id); diff --git a/src/v/raft/group_configuration.h b/src/v/raft/group_configuration.h index fda463826067..f62fdddb21ad 100644 --- a/src/v/raft/group_configuration.h +++ b/src/v/raft/group_configuration.h @@ -23,6 +23,11 @@ namespace raft { +struct broker_revision { + model::broker broker; + model::revision_id rev; +}; + static constexpr model::revision_id no_revision{}; class vnode : public serde::envelope> { public: @@ -118,6 +123,7 @@ class group_configuration final { void add(std::vector, model::revision_id); void remove(const std::vector&); void replace(std::vector, model::revision_id); + void replace(std::vector, model::revision_id); /** * Updating broker configuration. This operation does not require entering diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index bb75946a1927..59e7348e4036 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -2465,10 +2465,9 @@ void admin_server::register_partition_routes() { replica.core = bs.shard; r.previous_replicas.push(replica); } - co_await ss::coroutine::maybe_yield(); ret.push_back(std::move(r)); } - co_return std::move(ret); + co_return ret; }); } diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index 2c620bcb05a4..23ca749d8b20 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -439,6 +439,14 @@ def decommission_broker(self, id, node=None): self.redpanda.logger.debug(f"decommissioning {path}") return self._request('put', path, node=node) + def recommission_broker(self, id, node=None): + """ + Recommission broker i.e. abort ongoing decommissioning + """ + path = f"brokers/{id}/recommission" + self.redpanda.logger.debug(f"recommissioning {id}") + return self._request('put', path, node=node) + def list_reconfigurations(self, node=None): """ List pending reconfigurations diff --git a/tests/rptest/tests/nodes_decommissioning_test.py b/tests/rptest/tests/nodes_decommissioning_test.py index 2256f2cbf70c..d38d0d451100 100644 --- a/tests/rptest/tests/nodes_decommissioning_test.py +++ b/tests/rptest/tests/nodes_decommissioning_test.py @@ -44,7 +44,7 @@ def _partitions_moving(self): def _partitions_not_moving(self): admin = Admin(self.redpanda) reconfigurations = admin.list_reconfigurations() - return len(reconfigurations) > 0 + return len(reconfigurations) == 0 def _partition_to_move(self, predicate): rpk = RpkTool(self.redpanda) @@ -69,6 +69,38 @@ def _node_removed(self, removed_id, node_to_query): return False return True + def _find_replacement(self, current_replicas, to_remove): + new_replicas = [] + unique_node_ids = set() + for r in current_replicas: + if r['node_id'] != to_remove: + unique_node_ids.add(r['node_id']) + new_replicas.append(r) + + admin = Admin(self.redpanda) + brokers = admin.get_brokers() + + to_add = None + while len(unique_node_ids) < len(current_replicas): + id = random.choice(brokers)['node_id'] + if id == to_remove: + continue + to_add = id + unique_node_ids.add(to_add) + + new_replicas.append({"node_id": to_add, "core": 0}) + return new_replicas + + def _wait_until_status(self, node_id, status, timeout_sec=15): + def requested_status(): + brokers = Admin(self.redpanda).get_brokers() + for broker in brokers: + if broker['node_id'] == node_id: + return broker['membership_status'] == status + return False + + wait_until(requested_status, timeout_sec=timeout_sec, backoff_sec=1) + @cluster( num_nodes=6, # A decom can look like a restart in terms of logs from peers dropping @@ -174,18 +206,6 @@ def test_decommissioning_cancel_ongoing_movements(self): self.logger.info(f"decommissioning node: {to_decommission}", ) admin.decommission_broker(to_decommission) - def check_status(node_id, status): - brokers = admin.get_brokers() - for broker in brokers: - if broker['node_id'] == node_id: - return broker['membership_status'] == status - - return False - - wait_until(lambda: check_status(to_decommission, 'draining'), - timeout_sec=15, - backoff_sec=1) - survivor_node = self._not_decommissioned_node(to_decommission) # adjust recovery throttle to make sure moves will finish rpk.cluster_config_set("raft_learner_recovery_rate", str(2 << 30)) @@ -198,3 +218,141 @@ def check_status(node_id, status): self.redpanda.stop_node(self.redpanda.get_node(to_decommission)) self.run_validation(enable_idempotence=False, consumer_timeout_sec=90) + + @cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST) + def test_recommissioning_node(self): + self.start_redpanda(num_nodes=4) + self._create_topics() + + self.start_producer(1) + self.start_consumer(1) + self.await_startup() + admin = Admin(self.redpanda) + + brokers = admin.get_brokers() + to_decommission = random.choice(brokers)['node_id'] + + # throttle recovery + rpk = RpkTool(self.redpanda) + rpk.cluster_config_set("raft_learner_recovery_rate", str(1)) + + self.logger.info(f"decommissioning node: {to_decommission}", ) + admin.decommission_broker(to_decommission) + + self._wait_until_status(to_decommission, 'draining') + + wait_until(lambda: self._partitions_moving(), + timeout_sec=15, + backoff_sec=1) + + # recommission broker + admin.recommission_broker(to_decommission) + self._wait_until_status(to_decommission, 'active') + + wait_until(lambda: self._partitions_not_moving(), + timeout_sec=15, + backoff_sec=1) + + @cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST) + def test_recommissioning_do_not_stop_all_moves_node(self): + self.start_redpanda(num_nodes=4) + self._create_topics() + + self.start_producer(1) + self.start_consumer(1) + self.await_startup() + admin = Admin(self.redpanda) + + brokers = admin.get_brokers() + to_decommission = random.choice(brokers)['node_id'] + + # throttle recovery + rpk = RpkTool(self.redpanda) + rpk.cluster_config_set("raft_learner_recovery_rate", str(1)) + + # schedule partition move from the node being decommissioned before actually calling decommission + + to_move_tp, to_move_p, _ = self._partition_to_move( + lambda p: to_decommission in p.replicas) + details = admin.get_partitions(topic=to_move_tp, partition=to_move_p) + + new_replicas = self._find_replacement(details['replicas'], + to_decommission) + self.logger.info( + f"moving partition {to_move_tp}/{to_move_p} - {details['replicas']} -> {new_replicas}" + ) + + admin.set_partition_replicas(topic=to_move_tp, + partition=to_move_p, + replicas=new_replicas) + # moving partition should be present in moving list + wait_until(lambda: self._partitions_moving(), + timeout_sec=15, + backoff_sec=1) + + self.logger.info(f"decommissioning node: {to_decommission}", ) + admin.decommission_broker(to_decommission) + + self._wait_until_status(to_decommission, 'draining') + + wait_until(lambda: self._partitions_moving(), + timeout_sec=15, + backoff_sec=1) + + # recommission broker + admin.recommission_broker(to_decommission) + self._wait_until_status(to_decommission, 'active') + + def one_left_moving(): + reconfigurations = admin.list_reconfigurations() + return len(reconfigurations) == 1 + + wait_until(one_left_moving, timeout_sec=15, backoff_sec=1) + + @cluster(num_nodes=7, log_allow_list=RESTART_LOG_ALLOW_LIST) + def test_recommissioning_one_of_decommissioned_nodes(self): + self.start_redpanda(num_nodes=5) + self._create_topics() + + self.start_producer(1) + self.start_consumer(1) + self.await_startup() + admin = Admin(self.redpanda) + + brokers = admin.get_brokers() + to_decommission_1 = random.choice(brokers)['node_id'] + to_decommission_2 = to_decommission_1 + + while to_decommission_1 == to_decommission_2: + to_decommission_2 = random.choice(brokers)['node_id'] + + # throttle recovery + rpk = RpkTool(self.redpanda) + rpk.cluster_config_set("raft_learner_recovery_rate", str(1)) + + self.logger.info(f"decommissioning node: {to_decommission_1}", ) + admin.decommission_broker(to_decommission_1) + self.logger.info(f"decommissioning node: {to_decommission_2}", ) + admin.decommission_broker(to_decommission_2) + + self._wait_until_status(to_decommission_1, 'draining') + self._wait_until_status(to_decommission_2, 'draining') + + wait_until(lambda: self._partitions_moving(), + timeout_sec=15, + backoff_sec=1) + + # recommission broker that was decommissioned first + admin.recommission_broker(to_decommission_1) + self._wait_until_status(to_decommission_1, 'active') + + rpk.cluster_config_set("raft_learner_recovery_rate", str(2 << 30)) + + def node_removed(): + brokers = admin.get_brokers() + for broker in brokers: + if broker['node_id'] == to_decommission_2: + return False + return True + + wait_until(node_removed, 60, 2)