From d5056932505428a60e81334b72799408490e7f51 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 26 Apr 2023 18:27:07 -0400 Subject: [PATCH 1/4] Make _scheduling_groups public in application By analogy with smp_groups which is already public we need to access this outside of application.cc to pass the right scheduling groups through to instantiations of services in our redpanda fixture. --- src/v/redpanda/application.cc | 53 ++++++++++++++++++----------------- src/v/redpanda/application.h | 2 +- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index cf60f98142e5..230e42acc1fb 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -428,15 +428,15 @@ void application::initialize( [this] { smp_service_groups.destroy_groups().get(); }); if (groups) { - _scheduling_groups = *groups; + sched_groups = *groups; return; } - _scheduling_groups.create_groups().get(); - _scheduling_groups_probe.wire_up(_scheduling_groups); + sched_groups.create_groups().get(); + _scheduling_groups_probe.wire_up(sched_groups); _deferred.emplace_back([this] { _scheduling_groups_probe.clear(); - _scheduling_groups.destroy_groups().get(); + sched_groups.destroy_groups().get(); }); if (proxy_cfg) { @@ -817,7 +817,7 @@ void application::configure_admin_server() { syschecks::systemd_message("constructing http server").get(); construct_service( _admin, - admin_server_cfg_from_global_cfg(_scheduling_groups), + admin_server_cfg_from_global_cfg(sched_groups), std::ref(partition_manager), std::ref(cp_partition_manager), controller.get(), @@ -1014,7 +1014,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { raft_group_manager .start( node_id, - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), [] { return raft::group_manager::configuration{ .heartbeat_interval @@ -1087,7 +1087,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { _archival_upload_housekeeping, std::ref(cloud_storage_api), ss::sharded_parameter( - [sg = _scheduling_groups.archival_upload()] { return sg; })) + [sg = sched_groups.archival_upload()] { return sg; })) .get(); _archival_upload_housekeeping .invoke_on_all(&archival::upload_housekeeping_service::start) @@ -1107,7 +1107,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { std::ref(cloud_storage_api), std::ref(shadow_index_cache), ss::sharded_parameter( - [sg = _scheduling_groups.archival_upload(), + [sg = sched_groups.archival_upload(), p = archival_priority(), enabled = archival_storage_enabled()]() -> ss::lw_shared_ptr { @@ -1150,7 +1150,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { node_id, std::ref(local_monitor), std::ref(_connection_cache), - _scheduling_groups.self_test_sg()) + sched_groups.self_test_sg()) .get(); construct_single_service_sharded( @@ -1244,7 +1244,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { construct_service( _archival_upload_controller, std::ref(partition_manager), - make_upload_controller_config(_scheduling_groups.archival_upload())) + make_upload_controller_config(sched_groups.archival_upload())) .get(); construct_service( @@ -1293,7 +1293,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { syschecks::systemd_message("Creating kafka group router").get(); construct_service( group_router, - _scheduling_groups.kafka_sg(), + sched_groups.kafka_sg(), smp_service_groups.kafka_smp_sg(), std::ref(_group_manager), std::ref(shard_table), @@ -1503,6 +1503,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { .start( &kafka_cfg, smp_service_groups.kafka_smp_sg(), + sched_groups.fetch_sg(), std::ref(metadata_cache), std::ref(controller->get_topics_frontend()), std::ref(controller->get_config_frontend()), @@ -1532,7 +1533,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { _compaction_controller, std::ref(storage), compaction_controller_config( - _scheduling_groups.compaction_sg(), + sched_groups.compaction_sg(), priority_manager::local().compaction_priority())) .get(); } @@ -1574,9 +1575,9 @@ void application::wire_up_bootstrap_services() { storage, []() { return kvstore_config_from_global_config(); }, [this]() { - auto log_cfg = manager_config_from_global_config(_scheduling_groups); + auto log_cfg = manager_config_from_global_config(sched_groups); log_cfg.reclaim_opts.background_reclaimer_sg - = _scheduling_groups.cache_background_reclaim_sg(); + = sched_groups.cache_background_reclaim_sg(); return log_cfg; }, std::ref(feature_table)) @@ -1787,7 +1788,7 @@ void application::start_bootstrap_services() { std::vector> bootstrap_service; bootstrap_service.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(storage))); s.add_services(std::move(bootstrap_service)); @@ -1925,7 +1926,7 @@ void application::start_runtime_services( runtime_services.push_back(std::make_unique>( - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), smp_service_groups.raft_smp_sg(), partition_manager, shard_table.local(), @@ -1949,13 +1950,13 @@ void application::start_runtime_services( .invoke_on_all([this, start_raft_rpc_early](rpc::rpc_server& s) { std::vector> runtime_services; runtime_services.push_back(std::make_unique( - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), smp_service_groups.raft_smp_sg(), std::ref(id_allocator_frontend))); // _rm_group_proxy is wrap around a sharded service with only // `.local()' access so it's ok to share without foreign_ptr runtime_services.push_back(std::make_unique( - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), smp_service_groups.raft_smp_sg(), std::ref(tx_gateway_frontend), _rm_group_proxy.get(), @@ -1965,7 +1966,7 @@ void application::start_runtime_services( runtime_services.push_back(std::make_unique>( - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), smp_service_groups.raft_smp_sg(), partition_manager, shard_table.local(), @@ -1973,7 +1974,7 @@ void application::start_runtime_services( } runtime_services.push_back(std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(controller->get_topics_frontend()), std::ref(controller->get_members_manager()), @@ -1991,37 +1992,37 @@ void application::start_runtime_services( runtime_services.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(controller->get_partition_leaders()))); runtime_services.push_back( std::make_unique( - _scheduling_groups.node_status(), + sched_groups.node_status(), smp_service_groups.cluster_smp_sg(), std::ref(node_status_backend))); runtime_services.push_back( std::make_unique( - _scheduling_groups.node_status(), + sched_groups.node_status(), smp_service_groups.cluster_smp_sg(), std::ref(self_test_backend))); runtime_services.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(controller->get_partition_balancer()))); runtime_services.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(controller->get_ephemeral_credential_frontend()))); runtime_services.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(topic_recovery_service))); s.add_services(std::move(runtime_services)); diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index 54004fd675b9..cd7e01c50a21 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -92,6 +92,7 @@ class application { ss::future<> set_proxy_client_config(ss::sstring name, std::any val); smp_groups smp_service_groups; + scheduling_groups sched_groups; // Sorted list of services (public members) ss::sharded shadow_index_cache; @@ -230,7 +231,6 @@ class application { std::optional _schema_reg_config; std::optional _schema_reg_client_config; - scheduling_groups _scheduling_groups; scheduling_groups_probe _scheduling_groups_probe; ss::logger _log; From d133843a574bdebdfea715915922bd7e3cd7ff68 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 26 Apr 2023 15:19:10 -0400 Subject: [PATCH 2/4] Add fetch scheduling group This scheduling group is used for consumer fetch processing. We assign it the same priority as the default group (where most other kafka handling takes place), but by putting it into its own group we prevent non-fetch requests from being significantly delayed when fetch requests use all the CPU. Issue redpanda-data/core-internal#435 --- src/v/resource_mgmt/cpu_scheduling.h | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/v/resource_mgmt/cpu_scheduling.h b/src/v/resource_mgmt/cpu_scheduling.h index 19f0a99be150..6a1432c05f69 100644 --- a/src/v/resource_mgmt/cpu_scheduling.h +++ b/src/v/resource_mgmt/cpu_scheduling.h @@ -38,6 +38,7 @@ class scheduling_groups final { "archival_upload", 100); _node_status = co_await ss::create_scheduling_group("node_status", 50); _self_test = co_await ss::create_scheduling_group("self_test", 100); + _fetch = co_await ss::create_scheduling_group("fetch", 1000); } ss::future<> destroy_groups() { @@ -52,6 +53,7 @@ class scheduling_groups final { co_await destroy_scheduling_group(_archival_upload); co_await destroy_scheduling_group(_node_status); co_await destroy_scheduling_group(_self_test); + co_await destroy_scheduling_group(_fetch); co_return; } @@ -70,6 +72,16 @@ class scheduling_groups final { ss::scheduling_group archival_upload() { return _archival_upload; } ss::scheduling_group node_status() { return _node_status; } ss::scheduling_group self_test_sg() { return _self_test; } + /** + * @brief Scheduling group for fetch requests. + * + * This scheduling group is used for consumer fetch processing. We assign + * it the same priority as the default group (where most other kafka + * handling takes place), but by putting it into its own group we prevent + * non-fetch requests from being significantly delayed when fetch requests + * use all the CPU. + */ + ss::scheduling_group fetch_sg() { return _fetch; } std::vector> all_scheduling_groups() const { @@ -85,7 +97,8 @@ class scheduling_groups final { std::cref(_raft_learner_recovery), std::cref(_archival_upload), std::cref(_node_status), - std::cref(_self_test)}; + std::cref(_self_test), + std::cref(_fetch)}; } private: @@ -102,4 +115,5 @@ class scheduling_groups final { ss::scheduling_group _archival_upload; ss::scheduling_group _node_status; ss::scheduling_group _self_test; + ss::scheduling_group _fetch; }; From 9a93a9c22238e145dfa9d7fe297eb494d7c5f0bf Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 26 Apr 2023 15:26:22 -0400 Subject: [PATCH 3/4] Use fetch scheduling group to handle fetches Add the with_scheduling_group call to switch to the kafka scheduling group in the fetch handling flow. See prior change for context. Issue redpanda-data/core-internal/#435. --- src/v/kafka/server/handlers/fetch.cc | 39 ++++++++++++++++------------ src/v/kafka/server/server.cc | 2 ++ src/v/kafka/server/server.h | 6 +++++ src/v/redpanda/tests/fixture.h | 1 + 4 files changed, 32 insertions(+), 16 deletions(-) diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 3d6f428d5ad1..9aecf7d9f1df 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -35,8 +35,10 @@ #include #include +#include #include #include +#include #include #include @@ -555,22 +557,27 @@ fetch_handler::handle(request_context rctx, ss::smp_service_group ssg) { return ss::do_with( std::make_unique(std::move(rctx), ssg), [](std::unique_ptr& octx_ptr) { - auto& octx = *octx_ptr; - log_request(octx.rctx.header(), octx.request); - // top-level error is used for session-level errors - if (octx.session_ctx.has_error()) { - octx.response.data.error_code = octx.session_ctx.error(); - return std::move(octx).send_response(); - } - octx.response.data.error_code = error_code::none; - // first fetch, do not wait - return fetch_topic_partitions(octx) - .then([&octx] { - return ss::do_until( - [&octx] { return octx.should_stop_fetch(); }, - [&octx] { return fetch_topic_partitions(octx); }); - }) - .then([&octx] { return std::move(octx).send_response(); }); + auto sg + = octx_ptr->rctx.connection()->server().fetch_scheduling_group(); + return ss::with_scheduling_group(sg, [&octx_ptr] { + auto& octx = *octx_ptr; + + log_request(octx.rctx.header(), octx.request); + // top-level error is used for session-level errors + if (octx.session_ctx.has_error()) { + octx.response.data.error_code = octx.session_ctx.error(); + return std::move(octx).send_response(); + } + octx.response.data.error_code = error_code::none; + // first fetch, do not wait + return fetch_topic_partitions(octx) + .then([&octx] { + return ss::do_until( + [&octx] { return octx.should_stop_fetch(); }, + [&octx] { return fetch_topic_partitions(octx); }); + }) + .then([&octx] { return std::move(octx).send_response(); }); + }); }); } diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index a343bca391ab..a37da6568c2a 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -77,6 +77,7 @@ namespace kafka { server::server( ss::sharded* cfg, ss::smp_service_group smp, + ss::scheduling_group fetch_sg, ss::sharded& meta, ss::sharded& tf, ss::sharded& cf, @@ -99,6 +100,7 @@ server::server( ssx::thread_worker& tw) noexcept : net::server(cfg, klog) , _smp_group(smp) + , _fetch_scheduling_group(fetch_sg) , _topics_frontend(tf) , _config_frontend(cf) , _feature_table(ft) diff --git a/src/v/kafka/server/server.h b/src/v/kafka/server/server.h index 934bb7fd786b..d97bc0eca10d 100644 --- a/src/v/kafka/server/server.h +++ b/src/v/kafka/server/server.h @@ -28,6 +28,7 @@ #include "utils/ema.h" #include +#include #include #include @@ -38,6 +39,7 @@ class server final : public net::server { server( ss::sharded*, ss::smp_service_group, + ss::scheduling_group, ss::sharded&, ss::sharded&, ss::sharded&, @@ -71,6 +73,9 @@ class server final : public net::server { ss::future<> apply(ss::lw_shared_ptr) final; ss::smp_service_group smp_group() const { return _smp_group; } + ss::scheduling_group fetch_scheduling_group() const { + return _fetch_scheduling_group; + } cluster::topics_frontend& topics_frontend() { return _topics_frontend.local(); } @@ -148,6 +153,7 @@ class server final : public net::server { private: ss::smp_service_group _smp_group; + ss::scheduling_group _fetch_scheduling_group; ss::sharded& _topics_frontend; ss::sharded& _config_frontend; ss::sharded& _feature_table; diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index 488628b92ef1..282457fae386 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -115,6 +115,7 @@ class redpanda_thread_fixture { proto = std::make_unique( &configs, app.smp_service_groups.kafka_smp_sg(), + app.sched_groups.fetch_sg(), app.metadata_cache, app.controller->get_topics_frontend(), app.controller->get_config_frontend(), From bc72f5ebacd20b90be6d34d5ff76e8868a6494a8 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Thu, 27 Apr 2023 02:06:31 -0400 Subject: [PATCH 4/4] Make the fetch scheduling group configurable Introduce a config tunable which would let users set the scheduling group to use for kafka fetch request back to the default group. Also useful for quick A/B testing of the perf effect of the change. --- src/v/config/configuration.cc | 6 ++++++ src/v/config/configuration.h | 1 + src/v/kafka/server/server.cc | 6 ++++++ src/v/kafka/server/server.h | 13 ++++++++++--- 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 5b9a461d2bc2..f5cdfef3e6f1 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -517,6 +517,12 @@ configuration::configuration() "Maximum number of bytes returned in fetch request", {.needs_restart = needs_restart::no, .visibility = visibility::user}, 55_MiB) + , use_fetch_scheduler_group( + *this, + "use_fetch_scheduler_group", + "Use a separate scheduler group for fetch processing", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + true) , metadata_status_wait_timeout_ms( *this, "metadata_status_wait_timeout_ms", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index d8005941a050..31c21906606f 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -120,6 +120,7 @@ struct configuration final : public config_store { enum_property log_message_timestamp_type; enum_property log_compression_type; property fetch_max_bytes; + property use_fetch_scheduler_group; property metadata_status_wait_timeout_ms; bounded_property> kafka_connection_rate_limit; property> kafka_connection_rate_limit_overrides; diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index a37da6568c2a..c130676ddfa4 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -136,6 +136,12 @@ server::server( _probe.setup_public_metrics(); } +ss::scheduling_group server::fetch_scheduling_group() const { + return config::shard_local_cfg().use_fetch_scheduler_group() + ? _fetch_scheduling_group + : ss::default_scheduling_group(); +} + coordinator_ntp_mapper& server::coordinator_mapper() { return _group_router.local().coordinator_mapper().local(); } diff --git a/src/v/kafka/server/server.h b/src/v/kafka/server/server.h index d97bc0eca10d..e36c2cdf828e 100644 --- a/src/v/kafka/server/server.h +++ b/src/v/kafka/server/server.h @@ -73,9 +73,16 @@ class server final : public net::server { ss::future<> apply(ss::lw_shared_ptr) final; ss::smp_service_group smp_group() const { return _smp_group; } - ss::scheduling_group fetch_scheduling_group() const { - return _fetch_scheduling_group; - } + + /** + * @brief Return the scheduling group to use for fetch requests. + * + * By default, fetches use a dedicated scheduling group, but this may + * be changed by configuration to restore the old behavior of lumping + * them with most other tasks in the default scheduling group. + */ + ss::scheduling_group fetch_scheduling_group() const; + cluster::topics_frontend& topics_frontend() { return _topics_frontend.local(); }