Skip to content

Commit

Permalink
Merge pull request #5323 from mmaslankaprv/nodes-recommissioning
Browse files Browse the repository at this point in the history
Implemented nodes recommissioning
  • Loading branch information
mmaslankaprv committed Jul 14, 2022
2 parents f600964 + c95cbcc commit 48d5057
Show file tree
Hide file tree
Showing 21 changed files with 971 additions and 95 deletions.
10 changes: 10 additions & 0 deletions src/v/cluster/cluster_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,14 @@ inline std::vector<model::broker_shard> subtract_replica_sets(
});
return ret;
}

// check if replica set contains a node
inline bool contains_node(
const std::vector<model::broker_shard>& replicas, model::node_id id) {
return std::find_if(
replicas.begin(),
replicas.end(),
[id](const model::broker_shard& bs) { return bs.node_id == id; })
!= replicas.end();
}
} // namespace cluster
117 changes: 95 additions & 22 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,31 @@ std::vector<model::broker> create_brokers_set(
return brokers;
}

std::vector<raft::broker_revision> create_brokers_set(
const std::vector<model::broker_shard>& replicas,
const absl::flat_hash_map<model::node_id, model::revision_id>&
replica_revisions,
cluster::members_table& members) {
std::vector<raft::broker_revision> brokers;
brokers.reserve(replicas.size());

std::transform(
std::cbegin(replicas),
std::cend(replicas),
std::back_inserter(brokers),
[&members, &replica_revisions](const model::broker_shard& bs) {
auto br = members.get_broker(bs.node_id);
if (!br) {
throw std::logic_error(
fmt::format("Replica node {} is not available", bs.node_id));
}
return raft::broker_revision{
.broker = *br->get(),
.rev = replica_revisions.find(bs.node_id)->second};
});
return brokers;
}

std::optional<ss::shard_id> get_target_shard(
model::node_id id, const std::vector<model::broker_shard>& replicas) {
auto it = std::find_if(
Expand Down Expand Up @@ -519,6 +544,29 @@ find_interrupting_operation(deltas_t::iterator current_it, deltas_t& deltas) {
}
});
}
ss::future<std::error_code> revert_configuration_update(
const model::ntp& ntp,
const std::vector<model::broker_shard>& replicas,
model::revision_id rev,
ss::lw_shared_ptr<partition> p,
members_table& members,
topic_table& topics) {
auto in_progress_it = topics.in_progress_updates().find(ntp);
// no longer in progress
if (in_progress_it == topics.in_progress_updates().end()) {
co_return errc::success;
}
auto brokers = create_brokers_set(
replicas, in_progress_it->second.replicas_revisions, members);
vlog(
clusterlog.debug,
"reverting already finished reconfiguration of {}, revision: {}. Replica "
"set: {} ",
ntp,
rev,
replicas);
co_return co_await p->update_replica_set(std::move(brokers), rev);
}
} // namespace

ss::future<> controller_backend::reconcile_ntp(deltas_t& deltas) {
Expand Down Expand Up @@ -1197,21 +1245,25 @@ ss::future<std::error_code> controller_backend::cancel_replica_set_update(
replicas,
rev,
[this, &ntp, rev, replicas](ss::lw_shared_ptr<partition> p) {
const auto current_cfg = p->group_configuration();
// we do not have to request update/cancellation twice
if (current_cfg.revision_id() == rev) {
return ss::make_ready_future<std::error_code>(
errc::waiting_for_recovery);
}

const auto raft_cfg_update_finished
= are_configuration_replicas_up_to_date(
p->group_configuration(), replicas);
= current_cfg.type() == raft::configuration_type::simple;

// raft already finished its part, we need to move replica back
if (raft_cfg_update_finished) {
auto brokers = create_brokers_set(
replicas, _members_table.local());
vlog(
clusterlog.debug,
"raft reconfiguration finished, moving partition {} "
"configuration back to requested state: {}",
return revert_configuration_update(
ntp,
replicas);
return p->update_replica_set(std::move(brokers), rev);
replicas,
rev,
std::move(p),
_members_table.local(),
_topics.local());
} else {
vlog(
clusterlog.debug,
Expand All @@ -1233,21 +1285,42 @@ ss::future<std::error_code> controller_backend::force_abort_replica_set_update(
if (!partition) {
co_return errc::partition_not_exists;
}
const auto current_cfg = partition->group_configuration();

const auto raft_cfg_update_finished = are_configuration_replicas_up_to_date(
partition->group_configuration(), replicas);
if (raft_cfg_update_finished) {
co_return co_await update_partition_replica_set(ntp, replicas, rev);
} else {
// wait for configuration update, only declare success
// when configuration was actually updated
auto update_ec = check_configuration_update(
_self, partition, replicas, rev);
// wait for configuration update, only declare success
// when configuration was actually updated
auto update_ec = check_configuration_update(
_self, partition, replicas, rev);

if (!update_ec) {
co_return errc::success;
}
if (!update_ec) {
co_return errc::success;
}

// we do not have to request update/cancellation twice
if (current_cfg.revision_id() == rev) {
co_return errc::waiting_for_recovery;
}

const auto raft_cfg_update_finished = current_cfg.type()
== raft::configuration_type::simple;

if (raft_cfg_update_finished) {
co_return co_await apply_configuration_change_on_leader(
ntp,
replicas,
rev,
[this, rev, &replicas, &ntp](
ss::lw_shared_ptr<cluster::partition> p) {
return revert_configuration_update(
ntp,
replicas,
rev,
std::move(p),
_members_table.local(),
_topics.local());
});

} else {
auto ec = co_await partition->force_abort_replica_set_update(rev);

if (ec) {
Expand Down
186 changes: 184 additions & 2 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,190 @@

namespace cluster {

/// on every core, sharded

/**
*
* # Reconciliation
*
* Controller backend is responsible for making sure that the cluster state is
* in align with the topic and partition state gathered in topic_table.
*
* Controller backend lives on each core on every node in the cluster. Each
* instance of controller backend is responsible for dealing with core & node
* local partition replicas (instances of `cluster::partition` object that are
* supposed to be instantiated on given core and given node). Controller backend
* manages partition replica lifecycle. It instantiates/deletes
* `cluster::partition` instances and registers them in shard table.
*
* Controller backend operations are driven by deltas generated in topics table.
* Backend waits for the new deltas using condition variable. Each delta
* represent an operation that must be executed for ntp f.e. create, update
* properties, move, etc.
*
* Each controller backend in the cluster (on each node and each core) process
* all the deltas and based on the situation it either executes an operation or
* ignore it (command pattern).
*
* Deltas vector for each NTP is processed in separate fiber in other words
* deltas for different NTPs are executed concurrently but for the same NTP
* sequentially.
*
* Each delta has revision assigned revision for the delta is assigned based on
* the raft0 log offset of command that the delta is related with. The same
* delta has the same revision globally.
*
* Deltas are executed in order from oldest revision up to the newest.
*
*
* NTP_1
* Loop until finished or cancelled
*
* ┌──────────────────┐
* │ │
* │ │
* ┌────────────┐ ┌────────────┐ ┌────────────┐ │ ┌────────────┐ │
* │ delta │ │ delta │ │ delta │ │ │ delta │ │
* │ │ │ │ │ ├──► └─►│ ├──┘
* │ revision: 3│ │ revision: 2│ │ revision: 1│ │ revision: 0│
* └────────────┘ └────────────┘ └────────────┘ └────────────┘
*
* .
* .
* .
* NTP_N
* Loop until finished or cancelled
*
* ┌──────────────────┐
* │ │
* │ │
* ┌────────────┐ ┌────────────┐ ┌────────────┐ │ ┌────────────┐ │
* │ delta │ │ delta │ │ delta │ │ │ delta │ │
* │ │ │ │ │ ├──► └─►│ ├──┘
* │ revision: 3│ │ revision: 2│ │ revision: 1│ │ revision: 0│
* └────────────┘ └────────────┘ └────────────┘ └────────────┘
*
* # Revisions
*
* As each reconciliation loops are not coordinated we must be able to recognize
* epochs. Consider a situation in which a stream of deltas executed by the
* backend leads to the state which is identical from end user perspective f.e.
* topic with the same name and configuration was deleted and then created back
* again. We must be able to recognize if the instance of partition replica that
* has been created for the topic belongs to the original topic or the one that
* was re created. In order to introduce differentiation between the two not
* distinguishable states we use revision_id as an epoch. Revision is used
* whenever partition is created or its replicas are moved. This way controller
* backend is able to recognize if partition replicas have already been updated
* or if action is required.
*
* ## Revisions and raft vnode
*
* Whenever a new replica is added to raft configuration it has new revision
* assigned. In raft each raft group participant is described by a tuple of
* model::node_id and model::revision_id. This way every time the node is re
* added to the configuration (consider a situation in which partition with
* single replica is moved back and forth between two nodes f.e. 1 -> 2 -> 1
* -> 2...) it is recognized as a new node. This fencing mechanism prevents the
* up to date raft group replicas from communicating with one from previous
* epoch.
*
* # Partition movement
*
* Partition movement in Redpanda is based on the Raft protocol mechanism called
* Joint Consensus. When requested Raft implementation is able to move data
* between nodes in a safe and consistent way. However requesting Raft to
* reconfigure a raft group is not enough to complete a partition move. When
* partition move is requested based on the current situation some of the
* controller backend may have to create new partition replica instances while
* other have to delete the one that are not longer part of raft group.
* Additionally there may be a need to move partition instance between cores on
* the same node.
*
* Every time partition move is requested each reconciliation loop executes an
* operation based on current and requested state and poll for its completion.
*
* Partition movement finish is coordinated using a designated finish command.
*
* Partition movement finish command is replicated from one of the replicas that
* was changed during reconfiguration process.
*
* IMPORTANT:
* Partition replicas are only deleted when executing delta for operation
* finished command. This way when partition replica is deleted it is guaranteed
* to not longer be needed.
*
* Example:
*
* Consider moving partition between a set of nodes:
*
* replicas on nodes (1,2,3) -> replicas on nodes (2,3,4)
*
* (for simplicity we ignore core assignment in this example)
*
* Assumptions:
* - node 1 is a leader for the partition.
*
* Operations that has to be executed on every node:
*
* Node 1:
* - node 1 is a leader, leader is the only one that can replicate data so it
* will be asked for reconfiguration
* - after partition replica is not longer needed on this node it may be removed
*
* Node 2 & 3:
* - node 2 will wait until configuration will be up to date with requested. In
* case leadership from node 1 moved it will ask for reconfiguration
*
* Node 4:
* - node 4 will create a new instance of partition replica and wait for the
* configuration to be up to date.
* - after successful reconfiguration node 4 will dispatch finish update command
*
*
* When finish update command will be received by node 1 it will remove the
* partition replica instance.
*
*
* ## Interrupting partition movement
*
* Partition movement interruption may only be accepted after topic table
* processed move command but before the finish update command was processed. We
* use topics table as a single source of truth to decide if the update may
* still be canceled or if it has finished. This way we must be able to revert
* configuration change even if raft already finished reconfiguration.
*
* Partition move interruption does not mark the reconfiguration process as
* finished i.e. it will still be represented as in progress when queried from
* topic table. Move interruption will only finish when reconfiguration is
* finished in raft and finish move command is issued by the controller backend
*
* In general the interrupt may happen in the following situations:
*
* 1) before raft reconfiguration was requested
* 2) when raft reconfiguration is in progress
* 3) when raft reconfiguration has already finished but before finish command
* was replicated
*
* In all of the situations we must move back to the raft group configuration
* which was active before the move was scheduled. The set of actions that must
* be taken to finish the interruption is different based on the situation in
* which interruption happened.
*
* For 1) controller backend must simply update raft configuration revision to
* be able to decide if action related with given revision_id has been executed.
*
* For 2) controller backend with request reconfiguration cancellation on a
* leader and will wait until raft configuration is up to date with what was
* observed before the move. Any replicas that were created for the purpose of
* move will be removed when processing finished move command.
*
* For 3) controller backend must request reconfiguration with the same exact
* replica set as before the move was requested. It is important to notice that
* no partition replicas were yet removed as finish command wasn't yet
* processed. Since cancelling partition move does not create new partition
* replica instances (instances of `cluster::partition`) but reuse the existing
* one we must reuse revision id of currently existing replica instances.
*
*/
class controller_backend
: public ss::peering_sharded_service<controller_backend> {
public:
Expand Down
Loading

0 comments on commit 48d5057

Please sign in to comment.