diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index cf60f98142e5..230e42acc1fb 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -428,15 +428,15 @@ void application::initialize( [this] { smp_service_groups.destroy_groups().get(); }); if (groups) { - _scheduling_groups = *groups; + sched_groups = *groups; return; } - _scheduling_groups.create_groups().get(); - _scheduling_groups_probe.wire_up(_scheduling_groups); + sched_groups.create_groups().get(); + _scheduling_groups_probe.wire_up(sched_groups); _deferred.emplace_back([this] { _scheduling_groups_probe.clear(); - _scheduling_groups.destroy_groups().get(); + sched_groups.destroy_groups().get(); }); if (proxy_cfg) { @@ -817,7 +817,7 @@ void application::configure_admin_server() { syschecks::systemd_message("constructing http server").get(); construct_service( _admin, - admin_server_cfg_from_global_cfg(_scheduling_groups), + admin_server_cfg_from_global_cfg(sched_groups), std::ref(partition_manager), std::ref(cp_partition_manager), controller.get(), @@ -1014,7 +1014,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { raft_group_manager .start( node_id, - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), [] { return raft::group_manager::configuration{ .heartbeat_interval @@ -1087,7 +1087,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { _archival_upload_housekeeping, std::ref(cloud_storage_api), ss::sharded_parameter( - [sg = _scheduling_groups.archival_upload()] { return sg; })) + [sg = sched_groups.archival_upload()] { return sg; })) .get(); _archival_upload_housekeeping .invoke_on_all(&archival::upload_housekeeping_service::start) @@ -1107,7 +1107,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { std::ref(cloud_storage_api), std::ref(shadow_index_cache), ss::sharded_parameter( - [sg = _scheduling_groups.archival_upload(), + [sg = sched_groups.archival_upload(), p = archival_priority(), enabled = archival_storage_enabled()]() -> ss::lw_shared_ptr { @@ -1150,7 +1150,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { node_id, std::ref(local_monitor), std::ref(_connection_cache), - _scheduling_groups.self_test_sg()) + sched_groups.self_test_sg()) .get(); construct_single_service_sharded( @@ -1244,7 +1244,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { construct_service( _archival_upload_controller, std::ref(partition_manager), - make_upload_controller_config(_scheduling_groups.archival_upload())) + make_upload_controller_config(sched_groups.archival_upload())) .get(); construct_service( @@ -1293,7 +1293,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { syschecks::systemd_message("Creating kafka group router").get(); construct_service( group_router, - _scheduling_groups.kafka_sg(), + sched_groups.kafka_sg(), smp_service_groups.kafka_smp_sg(), std::ref(_group_manager), std::ref(shard_table), @@ -1503,6 +1503,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { .start( &kafka_cfg, smp_service_groups.kafka_smp_sg(), + sched_groups.fetch_sg(), std::ref(metadata_cache), std::ref(controller->get_topics_frontend()), std::ref(controller->get_config_frontend()), @@ -1532,7 +1533,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { _compaction_controller, std::ref(storage), compaction_controller_config( - _scheduling_groups.compaction_sg(), + sched_groups.compaction_sg(), priority_manager::local().compaction_priority())) .get(); } @@ -1574,9 +1575,9 @@ void application::wire_up_bootstrap_services() { storage, []() { return kvstore_config_from_global_config(); }, [this]() { - auto log_cfg = manager_config_from_global_config(_scheduling_groups); + auto log_cfg = manager_config_from_global_config(sched_groups); log_cfg.reclaim_opts.background_reclaimer_sg - = _scheduling_groups.cache_background_reclaim_sg(); + = sched_groups.cache_background_reclaim_sg(); return log_cfg; }, std::ref(feature_table)) @@ -1787,7 +1788,7 @@ void application::start_bootstrap_services() { std::vector> bootstrap_service; bootstrap_service.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(storage))); s.add_services(std::move(bootstrap_service)); @@ -1925,7 +1926,7 @@ void application::start_runtime_services( runtime_services.push_back(std::make_unique>( - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), smp_service_groups.raft_smp_sg(), partition_manager, shard_table.local(), @@ -1949,13 +1950,13 @@ void application::start_runtime_services( .invoke_on_all([this, start_raft_rpc_early](rpc::rpc_server& s) { std::vector> runtime_services; runtime_services.push_back(std::make_unique( - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), smp_service_groups.raft_smp_sg(), std::ref(id_allocator_frontend))); // _rm_group_proxy is wrap around a sharded service with only // `.local()' access so it's ok to share without foreign_ptr runtime_services.push_back(std::make_unique( - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), smp_service_groups.raft_smp_sg(), std::ref(tx_gateway_frontend), _rm_group_proxy.get(), @@ -1965,7 +1966,7 @@ void application::start_runtime_services( runtime_services.push_back(std::make_unique>( - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), smp_service_groups.raft_smp_sg(), partition_manager, shard_table.local(), @@ -1973,7 +1974,7 @@ void application::start_runtime_services( } runtime_services.push_back(std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(controller->get_topics_frontend()), std::ref(controller->get_members_manager()), @@ -1991,37 +1992,37 @@ void application::start_runtime_services( runtime_services.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(controller->get_partition_leaders()))); runtime_services.push_back( std::make_unique( - _scheduling_groups.node_status(), + sched_groups.node_status(), smp_service_groups.cluster_smp_sg(), std::ref(node_status_backend))); runtime_services.push_back( std::make_unique( - _scheduling_groups.node_status(), + sched_groups.node_status(), smp_service_groups.cluster_smp_sg(), std::ref(self_test_backend))); runtime_services.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(controller->get_partition_balancer()))); runtime_services.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(controller->get_ephemeral_credential_frontend()))); runtime_services.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(topic_recovery_service))); s.add_services(std::move(runtime_services)); diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index 54004fd675b9..cd7e01c50a21 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -92,6 +92,7 @@ class application { ss::future<> set_proxy_client_config(ss::sstring name, std::any val); smp_groups smp_service_groups; + scheduling_groups sched_groups; // Sorted list of services (public members) ss::sharded shadow_index_cache; @@ -230,7 +231,6 @@ class application { std::optional _schema_reg_config; std::optional _schema_reg_client_config; - scheduling_groups _scheduling_groups; scheduling_groups_probe _scheduling_groups_probe; ss::logger _log;