Skip to content

Commit

Permalink
Make _scheduling_groups public in application
Browse files Browse the repository at this point in the history
By analogy with smp_groups which is already public we need to
access this outside of application.cc to pass the right scheduling
groups through to instantiations of services in our redpanda fixture.
  • Loading branch information
travisdowns committed Apr 26, 2023
1 parent 1ceb333 commit d505693
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 27 deletions.
53 changes: 27 additions & 26 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,15 +428,15 @@ void application::initialize(
[this] { smp_service_groups.destroy_groups().get(); });

if (groups) {
_scheduling_groups = *groups;
sched_groups = *groups;
return;
}

_scheduling_groups.create_groups().get();
_scheduling_groups_probe.wire_up(_scheduling_groups);
sched_groups.create_groups().get();
_scheduling_groups_probe.wire_up(sched_groups);
_deferred.emplace_back([this] {
_scheduling_groups_probe.clear();
_scheduling_groups.destroy_groups().get();
sched_groups.destroy_groups().get();
});

if (proxy_cfg) {
Expand Down Expand Up @@ -817,7 +817,7 @@ void application::configure_admin_server() {
syschecks::systemd_message("constructing http server").get();
construct_service(
_admin,
admin_server_cfg_from_global_cfg(_scheduling_groups),
admin_server_cfg_from_global_cfg(sched_groups),
std::ref(partition_manager),
std::ref(cp_partition_manager),
controller.get(),
Expand Down Expand Up @@ -1014,7 +1014,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
raft_group_manager
.start(
node_id,
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
[] {
return raft::group_manager::configuration{
.heartbeat_interval
Expand Down Expand Up @@ -1087,7 +1087,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
_archival_upload_housekeeping,
std::ref(cloud_storage_api),
ss::sharded_parameter(
[sg = _scheduling_groups.archival_upload()] { return sg; }))
[sg = sched_groups.archival_upload()] { return sg; }))
.get();
_archival_upload_housekeeping
.invoke_on_all(&archival::upload_housekeeping_service::start)
Expand All @@ -1107,7 +1107,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
std::ref(cloud_storage_api),
std::ref(shadow_index_cache),
ss::sharded_parameter(
[sg = _scheduling_groups.archival_upload(),
[sg = sched_groups.archival_upload(),
p = archival_priority(),
enabled = archival_storage_enabled()]()
-> ss::lw_shared_ptr<archival::configuration> {
Expand Down Expand Up @@ -1150,7 +1150,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
node_id,
std::ref(local_monitor),
std::ref(_connection_cache),
_scheduling_groups.self_test_sg())
sched_groups.self_test_sg())
.get();

construct_single_service_sharded(
Expand Down Expand Up @@ -1244,7 +1244,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
construct_service(
_archival_upload_controller,
std::ref(partition_manager),
make_upload_controller_config(_scheduling_groups.archival_upload()))
make_upload_controller_config(sched_groups.archival_upload()))
.get();

construct_service(
Expand Down Expand Up @@ -1293,7 +1293,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
syschecks::systemd_message("Creating kafka group router").get();
construct_service(
group_router,
_scheduling_groups.kafka_sg(),
sched_groups.kafka_sg(),
smp_service_groups.kafka_smp_sg(),
std::ref(_group_manager),
std::ref(shard_table),
Expand Down Expand Up @@ -1503,6 +1503,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
.start(
&kafka_cfg,
smp_service_groups.kafka_smp_sg(),
sched_groups.fetch_sg(),
std::ref(metadata_cache),
std::ref(controller->get_topics_frontend()),
std::ref(controller->get_config_frontend()),
Expand Down Expand Up @@ -1532,7 +1533,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
_compaction_controller,
std::ref(storage),
compaction_controller_config(
_scheduling_groups.compaction_sg(),
sched_groups.compaction_sg(),
priority_manager::local().compaction_priority()))
.get();
}
Expand Down Expand Up @@ -1574,9 +1575,9 @@ void application::wire_up_bootstrap_services() {
storage,
[]() { return kvstore_config_from_global_config(); },
[this]() {
auto log_cfg = manager_config_from_global_config(_scheduling_groups);
auto log_cfg = manager_config_from_global_config(sched_groups);
log_cfg.reclaim_opts.background_reclaimer_sg
= _scheduling_groups.cache_background_reclaim_sg();
= sched_groups.cache_background_reclaim_sg();
return log_cfg;
},
std::ref(feature_table))
Expand Down Expand Up @@ -1787,7 +1788,7 @@ void application::start_bootstrap_services() {
std::vector<std::unique_ptr<rpc::service>> bootstrap_service;
bootstrap_service.push_back(
std::make_unique<cluster::bootstrap_service>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(storage)));
s.add_services(std::move(bootstrap_service));
Expand Down Expand Up @@ -1925,7 +1926,7 @@ void application::start_runtime_services(
runtime_services.push_back(std::make_unique<raft::service<
cluster::partition_manager,
cluster::shard_table>>(
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
smp_service_groups.raft_smp_sg(),
partition_manager,
shard_table.local(),
Expand All @@ -1949,13 +1950,13 @@ void application::start_runtime_services(
.invoke_on_all([this, start_raft_rpc_early](rpc::rpc_server& s) {
std::vector<std::unique_ptr<rpc::service>> runtime_services;
runtime_services.push_back(std::make_unique<cluster::id_allocator>(
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
smp_service_groups.raft_smp_sg(),
std::ref(id_allocator_frontend)));
// _rm_group_proxy is wrap around a sharded service with only
// `.local()' access so it's ok to share without foreign_ptr
runtime_services.push_back(std::make_unique<cluster::tx_gateway>(
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
smp_service_groups.raft_smp_sg(),
std::ref(tx_gateway_frontend),
_rm_group_proxy.get(),
Expand All @@ -1965,15 +1966,15 @@ void application::start_runtime_services(
runtime_services.push_back(std::make_unique<raft::service<
cluster::partition_manager,
cluster::shard_table>>(
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
smp_service_groups.raft_smp_sg(),
partition_manager,
shard_table.local(),
config::shard_local_cfg().raft_heartbeat_interval_ms()));
}

runtime_services.push_back(std::make_unique<cluster::service>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(controller->get_topics_frontend()),
std::ref(controller->get_members_manager()),
Expand All @@ -1991,37 +1992,37 @@ void application::start_runtime_services(

runtime_services.push_back(
std::make_unique<cluster::metadata_dissemination_handler>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(controller->get_partition_leaders())));

runtime_services.push_back(
std::make_unique<cluster::node_status_rpc_handler>(
_scheduling_groups.node_status(),
sched_groups.node_status(),
smp_service_groups.cluster_smp_sg(),
std::ref(node_status_backend)));

runtime_services.push_back(
std::make_unique<cluster::self_test_rpc_handler>(
_scheduling_groups.node_status(),
sched_groups.node_status(),
smp_service_groups.cluster_smp_sg(),
std::ref(self_test_backend)));

runtime_services.push_back(
std::make_unique<cluster::partition_balancer_rpc_handler>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(controller->get_partition_balancer())));

runtime_services.push_back(
std::make_unique<cluster::ephemeral_credential_service>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(controller->get_ephemeral_credential_frontend())));

runtime_services.push_back(
std::make_unique<cluster::topic_recovery_status_rpc_handler>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(topic_recovery_service)));
s.add_services(std::move(runtime_services));
Expand Down
2 changes: 1 addition & 1 deletion src/v/redpanda/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class application {
ss::future<> set_proxy_client_config(ss::sstring name, std::any val);

smp_groups smp_service_groups;
scheduling_groups sched_groups;

// Sorted list of services (public members)
ss::sharded<cloud_storage::cache> shadow_index_cache;
Expand Down Expand Up @@ -230,7 +231,6 @@ class application {
std::optional<pandaproxy::schema_registry::configuration>
_schema_reg_config;
std::optional<kafka::client::configuration> _schema_reg_client_config;
scheduling_groups _scheduling_groups;
scheduling_groups_probe _scheduling_groups_probe;
ss::logger _log;

Expand Down

0 comments on commit d505693

Please sign in to comment.