diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 5b9a461d2bc2..f5cdfef3e6f1 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -517,6 +517,12 @@ configuration::configuration() "Maximum number of bytes returned in fetch request", {.needs_restart = needs_restart::no, .visibility = visibility::user}, 55_MiB) + , use_fetch_scheduler_group( + *this, + "use_fetch_scheduler_group", + "Use a separate scheduler group for fetch processing", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + true) , metadata_status_wait_timeout_ms( *this, "metadata_status_wait_timeout_ms", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index d8005941a050..31c21906606f 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -120,6 +120,7 @@ struct configuration final : public config_store { enum_property log_message_timestamp_type; enum_property log_compression_type; property fetch_max_bytes; + property use_fetch_scheduler_group; property metadata_status_wait_timeout_ms; bounded_property> kafka_connection_rate_limit; property> kafka_connection_rate_limit_overrides; diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 3d6f428d5ad1..9aecf7d9f1df 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -35,8 +35,10 @@ #include #include +#include #include #include +#include #include #include @@ -555,22 +557,27 @@ fetch_handler::handle(request_context rctx, ss::smp_service_group ssg) { return ss::do_with( std::make_unique(std::move(rctx), ssg), [](std::unique_ptr& octx_ptr) { - auto& octx = *octx_ptr; - log_request(octx.rctx.header(), octx.request); - // top-level error is used for session-level errors - if (octx.session_ctx.has_error()) { - octx.response.data.error_code = octx.session_ctx.error(); - return std::move(octx).send_response(); - } - octx.response.data.error_code = error_code::none; - // first fetch, do not wait - return fetch_topic_partitions(octx) - .then([&octx] { - return ss::do_until( - [&octx] { return octx.should_stop_fetch(); }, - [&octx] { return fetch_topic_partitions(octx); }); - }) - .then([&octx] { return std::move(octx).send_response(); }); + auto sg + = octx_ptr->rctx.connection()->server().fetch_scheduling_group(); + return ss::with_scheduling_group(sg, [&octx_ptr] { + auto& octx = *octx_ptr; + + log_request(octx.rctx.header(), octx.request); + // top-level error is used for session-level errors + if (octx.session_ctx.has_error()) { + octx.response.data.error_code = octx.session_ctx.error(); + return std::move(octx).send_response(); + } + octx.response.data.error_code = error_code::none; + // first fetch, do not wait + return fetch_topic_partitions(octx) + .then([&octx] { + return ss::do_until( + [&octx] { return octx.should_stop_fetch(); }, + [&octx] { return fetch_topic_partitions(octx); }); + }) + .then([&octx] { return std::move(octx).send_response(); }); + }); }); } diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index a343bca391ab..c130676ddfa4 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -77,6 +77,7 @@ namespace kafka { server::server( ss::sharded* cfg, ss::smp_service_group smp, + ss::scheduling_group fetch_sg, ss::sharded& meta, ss::sharded& tf, ss::sharded& cf, @@ -99,6 +100,7 @@ server::server( ssx::thread_worker& tw) noexcept : net::server(cfg, klog) , _smp_group(smp) + , _fetch_scheduling_group(fetch_sg) , _topics_frontend(tf) , _config_frontend(cf) , _feature_table(ft) @@ -134,6 +136,12 @@ server::server( _probe.setup_public_metrics(); } +ss::scheduling_group server::fetch_scheduling_group() const { + return config::shard_local_cfg().use_fetch_scheduler_group() + ? _fetch_scheduling_group + : ss::default_scheduling_group(); +} + coordinator_ntp_mapper& server::coordinator_mapper() { return _group_router.local().coordinator_mapper().local(); } diff --git a/src/v/kafka/server/server.h b/src/v/kafka/server/server.h index 934bb7fd786b..e36c2cdf828e 100644 --- a/src/v/kafka/server/server.h +++ b/src/v/kafka/server/server.h @@ -28,6 +28,7 @@ #include "utils/ema.h" #include +#include #include #include @@ -38,6 +39,7 @@ class server final : public net::server { server( ss::sharded*, ss::smp_service_group, + ss::scheduling_group, ss::sharded&, ss::sharded&, ss::sharded&, @@ -71,6 +73,16 @@ class server final : public net::server { ss::future<> apply(ss::lw_shared_ptr) final; ss::smp_service_group smp_group() const { return _smp_group; } + + /** + * @brief Return the scheduling group to use for fetch requests. + * + * By default, fetches use a dedicated scheduling group, but this may + * be changed by configuration to restore the old behavior of lumping + * them with most other tasks in the default scheduling group. + */ + ss::scheduling_group fetch_scheduling_group() const; + cluster::topics_frontend& topics_frontend() { return _topics_frontend.local(); } @@ -148,6 +160,7 @@ class server final : public net::server { private: ss::smp_service_group _smp_group; + ss::scheduling_group _fetch_scheduling_group; ss::sharded& _topics_frontend; ss::sharded& _config_frontend; ss::sharded& _feature_table; diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index cf60f98142e5..230e42acc1fb 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -428,15 +428,15 @@ void application::initialize( [this] { smp_service_groups.destroy_groups().get(); }); if (groups) { - _scheduling_groups = *groups; + sched_groups = *groups; return; } - _scheduling_groups.create_groups().get(); - _scheduling_groups_probe.wire_up(_scheduling_groups); + sched_groups.create_groups().get(); + _scheduling_groups_probe.wire_up(sched_groups); _deferred.emplace_back([this] { _scheduling_groups_probe.clear(); - _scheduling_groups.destroy_groups().get(); + sched_groups.destroy_groups().get(); }); if (proxy_cfg) { @@ -817,7 +817,7 @@ void application::configure_admin_server() { syschecks::systemd_message("constructing http server").get(); construct_service( _admin, - admin_server_cfg_from_global_cfg(_scheduling_groups), + admin_server_cfg_from_global_cfg(sched_groups), std::ref(partition_manager), std::ref(cp_partition_manager), controller.get(), @@ -1014,7 +1014,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { raft_group_manager .start( node_id, - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), [] { return raft::group_manager::configuration{ .heartbeat_interval @@ -1087,7 +1087,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { _archival_upload_housekeeping, std::ref(cloud_storage_api), ss::sharded_parameter( - [sg = _scheduling_groups.archival_upload()] { return sg; })) + [sg = sched_groups.archival_upload()] { return sg; })) .get(); _archival_upload_housekeeping .invoke_on_all(&archival::upload_housekeeping_service::start) @@ -1107,7 +1107,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { std::ref(cloud_storage_api), std::ref(shadow_index_cache), ss::sharded_parameter( - [sg = _scheduling_groups.archival_upload(), + [sg = sched_groups.archival_upload(), p = archival_priority(), enabled = archival_storage_enabled()]() -> ss::lw_shared_ptr { @@ -1150,7 +1150,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { node_id, std::ref(local_monitor), std::ref(_connection_cache), - _scheduling_groups.self_test_sg()) + sched_groups.self_test_sg()) .get(); construct_single_service_sharded( @@ -1244,7 +1244,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { construct_service( _archival_upload_controller, std::ref(partition_manager), - make_upload_controller_config(_scheduling_groups.archival_upload())) + make_upload_controller_config(sched_groups.archival_upload())) .get(); construct_service( @@ -1293,7 +1293,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { syschecks::systemd_message("Creating kafka group router").get(); construct_service( group_router, - _scheduling_groups.kafka_sg(), + sched_groups.kafka_sg(), smp_service_groups.kafka_smp_sg(), std::ref(_group_manager), std::ref(shard_table), @@ -1503,6 +1503,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { .start( &kafka_cfg, smp_service_groups.kafka_smp_sg(), + sched_groups.fetch_sg(), std::ref(metadata_cache), std::ref(controller->get_topics_frontend()), std::ref(controller->get_config_frontend()), @@ -1532,7 +1533,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) { _compaction_controller, std::ref(storage), compaction_controller_config( - _scheduling_groups.compaction_sg(), + sched_groups.compaction_sg(), priority_manager::local().compaction_priority())) .get(); } @@ -1574,9 +1575,9 @@ void application::wire_up_bootstrap_services() { storage, []() { return kvstore_config_from_global_config(); }, [this]() { - auto log_cfg = manager_config_from_global_config(_scheduling_groups); + auto log_cfg = manager_config_from_global_config(sched_groups); log_cfg.reclaim_opts.background_reclaimer_sg - = _scheduling_groups.cache_background_reclaim_sg(); + = sched_groups.cache_background_reclaim_sg(); return log_cfg; }, std::ref(feature_table)) @@ -1787,7 +1788,7 @@ void application::start_bootstrap_services() { std::vector> bootstrap_service; bootstrap_service.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(storage))); s.add_services(std::move(bootstrap_service)); @@ -1925,7 +1926,7 @@ void application::start_runtime_services( runtime_services.push_back(std::make_unique>( - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), smp_service_groups.raft_smp_sg(), partition_manager, shard_table.local(), @@ -1949,13 +1950,13 @@ void application::start_runtime_services( .invoke_on_all([this, start_raft_rpc_early](rpc::rpc_server& s) { std::vector> runtime_services; runtime_services.push_back(std::make_unique( - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), smp_service_groups.raft_smp_sg(), std::ref(id_allocator_frontend))); // _rm_group_proxy is wrap around a sharded service with only // `.local()' access so it's ok to share without foreign_ptr runtime_services.push_back(std::make_unique( - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), smp_service_groups.raft_smp_sg(), std::ref(tx_gateway_frontend), _rm_group_proxy.get(), @@ -1965,7 +1966,7 @@ void application::start_runtime_services( runtime_services.push_back(std::make_unique>( - _scheduling_groups.raft_sg(), + sched_groups.raft_sg(), smp_service_groups.raft_smp_sg(), partition_manager, shard_table.local(), @@ -1973,7 +1974,7 @@ void application::start_runtime_services( } runtime_services.push_back(std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(controller->get_topics_frontend()), std::ref(controller->get_members_manager()), @@ -1991,37 +1992,37 @@ void application::start_runtime_services( runtime_services.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(controller->get_partition_leaders()))); runtime_services.push_back( std::make_unique( - _scheduling_groups.node_status(), + sched_groups.node_status(), smp_service_groups.cluster_smp_sg(), std::ref(node_status_backend))); runtime_services.push_back( std::make_unique( - _scheduling_groups.node_status(), + sched_groups.node_status(), smp_service_groups.cluster_smp_sg(), std::ref(self_test_backend))); runtime_services.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(controller->get_partition_balancer()))); runtime_services.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(controller->get_ephemeral_credential_frontend()))); runtime_services.push_back( std::make_unique( - _scheduling_groups.cluster_sg(), + sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(topic_recovery_service))); s.add_services(std::move(runtime_services)); diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index 54004fd675b9..cd7e01c50a21 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -92,6 +92,7 @@ class application { ss::future<> set_proxy_client_config(ss::sstring name, std::any val); smp_groups smp_service_groups; + scheduling_groups sched_groups; // Sorted list of services (public members) ss::sharded shadow_index_cache; @@ -230,7 +231,6 @@ class application { std::optional _schema_reg_config; std::optional _schema_reg_client_config; - scheduling_groups _scheduling_groups; scheduling_groups_probe _scheduling_groups_probe; ss::logger _log; diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index 488628b92ef1..282457fae386 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -115,6 +115,7 @@ class redpanda_thread_fixture { proto = std::make_unique( &configs, app.smp_service_groups.kafka_smp_sg(), + app.sched_groups.fetch_sg(), app.metadata_cache, app.controller->get_topics_frontend(), app.controller->get_config_frontend(), diff --git a/src/v/resource_mgmt/cpu_scheduling.h b/src/v/resource_mgmt/cpu_scheduling.h index 19f0a99be150..6a1432c05f69 100644 --- a/src/v/resource_mgmt/cpu_scheduling.h +++ b/src/v/resource_mgmt/cpu_scheduling.h @@ -38,6 +38,7 @@ class scheduling_groups final { "archival_upload", 100); _node_status = co_await ss::create_scheduling_group("node_status", 50); _self_test = co_await ss::create_scheduling_group("self_test", 100); + _fetch = co_await ss::create_scheduling_group("fetch", 1000); } ss::future<> destroy_groups() { @@ -52,6 +53,7 @@ class scheduling_groups final { co_await destroy_scheduling_group(_archival_upload); co_await destroy_scheduling_group(_node_status); co_await destroy_scheduling_group(_self_test); + co_await destroy_scheduling_group(_fetch); co_return; } @@ -70,6 +72,16 @@ class scheduling_groups final { ss::scheduling_group archival_upload() { return _archival_upload; } ss::scheduling_group node_status() { return _node_status; } ss::scheduling_group self_test_sg() { return _self_test; } + /** + * @brief Scheduling group for fetch requests. + * + * This scheduling group is used for consumer fetch processing. We assign + * it the same priority as the default group (where most other kafka + * handling takes place), but by putting it into its own group we prevent + * non-fetch requests from being significantly delayed when fetch requests + * use all the CPU. + */ + ss::scheduling_group fetch_sg() { return _fetch; } std::vector> all_scheduling_groups() const { @@ -85,7 +97,8 @@ class scheduling_groups final { std::cref(_raft_learner_recovery), std::cref(_archival_upload), std::cref(_node_status), - std::cref(_self_test)}; + std::cref(_self_test), + std::cref(_fetch)}; } private: @@ -102,4 +115,5 @@ class scheduling_groups final { ss::scheduling_group _archival_upload; ss::scheduling_group _node_status; ss::scheduling_group _self_test; + ss::scheduling_group _fetch; };