Skip to content

Commit

Permalink
Merge pull request redpanda-data#22881 from bashtanov/migrations-infr…
Browse files Browse the repository at this point in the history
…a-abtwi

Migrations infra part 5: wiring it to various components
  • Loading branch information
mmaslankaprv committed Aug 29, 2024
2 parents b17de7f + 28f3482 commit db9275c
Show file tree
Hide file tree
Showing 36 changed files with 969 additions and 461 deletions.
147 changes: 73 additions & 74 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
#include <seastar/util/later.hh>

#include <chrono>
#include <optional>
namespace cluster {

const bytes controller::invariants_key{"configuration_invariants"};
Expand Down Expand Up @@ -300,6 +301,7 @@ ss::future<> controller::start(

co_await _data_migration_frontend.start(
_raft0->self().id(),
_cloud_storage_api.local_is_initialized(),
std::ref(*_data_migration_table),
std::ref(_feature_table),
std::ref(_stm),
Expand All @@ -311,6 +313,8 @@ ss::future<> controller::start(
_raft0->self().id(),
ss::sharded_parameter(
[this] { return std::ref(_partition_leaders.local()); }),
ss::sharded_parameter(
[this] { return std::ref(_partition_manager.local()); }),
ss::sharded_parameter([this] { return std::ref(_as.local()); }));
{
limiter_configuration limiter_conf{
Expand Down Expand Up @@ -765,8 +769,12 @@ ss::future<> controller::start(
std::ref(_data_migration_frontend.local()),
std::ref(_data_migration_worker),
std::ref(_partition_leaders.local()),
std::ref(_tp_frontend.local()),
std::ref(_tp_state.local()),
std::ref(_shard_table.local()),
_cloud_storage_api.local_is_initialized()
? std::make_optional(std::ref(_cloud_storage_api.local()))
: std::nullopt,
std::ref(_as.local()));
co_await _data_migration_backend.invoke_on_instance(
&data_migrations::backend::start);
Expand Down Expand Up @@ -795,85 +803,76 @@ ss::future<> controller::shutdown_input() {
}

ss::future<> controller::stop() {
auto f = ss::now();
_probe.stop();

if (!_as.local().abort_requested()) {
f = shutdown_input();
co_await shutdown_input();
}

_probe.stop();
return f.then([this] {
auto stop_leader_balancer = _leader_balancer ? _leader_balancer->stop()
: ss::now();
return stop_leader_balancer
.then([this] {
return ss::smp::submit_to(controller_stm_shard, [&stm = _stm] {
if (stm.local_is_initialized()) {
return stm.local().shutdown();
}
return ss::now();
});
})
.then([this] {
if (_metadata_uploader) {
return _metadata_uploader->stop_and_wait();
}
return ss::make_ready_future();
})
.then([this] { return _data_migration_irpc_frontend.stop(); })
.then([this] { return _data_migration_backend.stop(); })
.then([this] {
if (_recovery_backend) {
return _recovery_backend->stop_and_wait();
}
return ss::make_ready_future();
})
.then([this] { return _recovery_manager.stop(); })
.then([this] { return _recovery_table.stop(); })
.then([this] { return _partition_balancer.stop(); })
.then([this] { return _metrics_reporter.stop(); })
.then([this] { return _feature_manager.stop(); })
.then([this] { return _hm_frontend.stop(); })
.then([this] { return _hm_backend.stop(); })
.then([this] { return _health_manager.stop(); })
.then([this] { return _members_backend.stop(); })
.then([this] { return _data_migration_worker.stop(); })
.then([this] { return _data_migration_frontend.stop(); })
.then([this] { return _config_manager.stop(); })
.then([this] { return _api.stop(); })
.then([this] { return _shard_balancer.stop(); })
.then([this] { return _backend.stop(); })
.then([this] { return _tp_frontend.stop(); })
.then([this] { return _plugin_frontend.stop(); })
.then([this] { return _quota_frontend.stop(); })
.then([this] { return _ephemeral_credential_frontend.stop(); })
.then([this] { return _security_frontend.stop(); })
.then([this] { return _members_frontend.stop(); })
.then([this] { return _config_frontend.stop(); })
.then([this] { return _feature_backend.stop(); })
.then([this] { return _bootstrap_backend.stop(); })
.then([this] { return _oidc_service.stop(); })
.then([this] { return _authorizer.stop(); })
.then([this] { return _ephemeral_credentials.stop(); })
.then([this] { return _data_migrated_resources.stop(); })
.then([this] { return _roles.stop(); })
.then([this] { return _credentials.stop(); })
.then([this] { return _tp_state.stop(); })
.then([this] { return _members_manager.stop(); })
.then([this] { return _stm.stop(); })
.then([this] { return _quota_backend.stop(); })
.then([this] { return _quota_store.stop(); })
.then([this] { return _plugin_backend.stop(); })
.then([this] { return _plugin_table.stop(); })
.then([this] { return _drain_manager.stop(); })
.then([this] { return _shard_placement.stop(); })
.then([this] { return _partition_balancer_state.stop(); })
.then([this] { return _partition_allocator.stop(); })
.then([this] { return _partition_leaders.stop(); })
.then([this] { return _members_table.stop(); })
.then([this] { return _gate.close(); })
.then([this] { return _as.stop(); });
if (_leader_balancer) {
co_await _leader_balancer->stop();
}

co_await ss::smp::submit_to(controller_stm_shard, [&stm = _stm] {
if (stm.local_is_initialized()) {
return stm.local().shutdown();
}
return ss::now();
});

if (_metadata_uploader) {
co_await _metadata_uploader->stop_and_wait();
}
co_await _data_migration_irpc_frontend.stop();
co_await _data_migration_backend.stop();
if (_recovery_backend) {
co_await _recovery_backend->stop_and_wait();
}
co_await _recovery_manager.stop();
co_await _recovery_table.stop();
co_await _partition_balancer.stop();
co_await _metrics_reporter.stop();
co_await _feature_manager.stop();
co_await _hm_frontend.stop();
co_await _hm_backend.stop();
co_await _health_manager.stop();
co_await _members_backend.stop();
co_await _data_migration_worker.stop();
co_await _data_migration_frontend.stop();
co_await _config_manager.stop();
co_await _api.stop();
co_await _shard_balancer.stop();
co_await _backend.stop();
co_await _tp_frontend.stop();
co_await _plugin_frontend.stop();
co_await _quota_frontend.stop();
co_await _ephemeral_credential_frontend.stop();
co_await _security_frontend.stop();
co_await _members_frontend.stop();
co_await _config_frontend.stop();
co_await _feature_backend.stop();
co_await _bootstrap_backend.stop();
co_await _oidc_service.stop();
co_await _authorizer.stop();
co_await _ephemeral_credentials.stop();
co_await _data_migrated_resources.stop();
co_await _roles.stop();
co_await _credentials.stop();
co_await _tp_state.stop();
co_await _members_manager.stop();
co_await _stm.stop();
co_await _quota_backend.stop();
co_await _quota_store.stop();
co_await _plugin_backend.stop();
co_await _plugin_table.stop();
co_await _drain_manager.stop();
co_await _shard_placement.stop();
co_await _partition_balancer_state.stop();
co_await _partition_allocator.stop();
co_await _partition_leaders.stop();
co_await _members_table.stop();
co_await _gate.close();
co_await _as.stop();
}

ss::future<> controller::create_cluster(bootstrap_cluster_cmd_data cmd_data) {
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ class controller {
}
ss::sharded<controller_stm>& get_controller_stm() { return _stm; }

ss::sharded<data_migrations::migrated_resources>&
get_data_migrated_resources() {
return _data_migrated_resources;
}
ss::sharded<data_migrations::frontend>& get_data_migration_frontend() {
return _data_migration_frontend;
}
Expand Down
Loading

0 comments on commit db9275c

Please sign in to comment.