diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 3d6f428d5ad1d..9aecf7d9f1dff 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -35,8 +35,10 @@ #include #include +#include #include #include +#include #include #include @@ -555,22 +557,27 @@ fetch_handler::handle(request_context rctx, ss::smp_service_group ssg) { return ss::do_with( std::make_unique(std::move(rctx), ssg), [](std::unique_ptr& octx_ptr) { - auto& octx = *octx_ptr; - log_request(octx.rctx.header(), octx.request); - // top-level error is used for session-level errors - if (octx.session_ctx.has_error()) { - octx.response.data.error_code = octx.session_ctx.error(); - return std::move(octx).send_response(); - } - octx.response.data.error_code = error_code::none; - // first fetch, do not wait - return fetch_topic_partitions(octx) - .then([&octx] { - return ss::do_until( - [&octx] { return octx.should_stop_fetch(); }, - [&octx] { return fetch_topic_partitions(octx); }); - }) - .then([&octx] { return std::move(octx).send_response(); }); + auto sg + = octx_ptr->rctx.connection()->server().fetch_scheduling_group(); + return ss::with_scheduling_group(sg, [&octx_ptr] { + auto& octx = *octx_ptr; + + log_request(octx.rctx.header(), octx.request); + // top-level error is used for session-level errors + if (octx.session_ctx.has_error()) { + octx.response.data.error_code = octx.session_ctx.error(); + return std::move(octx).send_response(); + } + octx.response.data.error_code = error_code::none; + // first fetch, do not wait + return fetch_topic_partitions(octx) + .then([&octx] { + return ss::do_until( + [&octx] { return octx.should_stop_fetch(); }, + [&octx] { return fetch_topic_partitions(octx); }); + }) + .then([&octx] { return std::move(octx).send_response(); }); + }); }); } diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index a343bca391ab1..a37da6568c2af 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -77,6 +77,7 @@ namespace kafka { server::server( ss::sharded* cfg, ss::smp_service_group smp, + ss::scheduling_group fetch_sg, ss::sharded& meta, ss::sharded& tf, ss::sharded& cf, @@ -99,6 +100,7 @@ server::server( ssx::thread_worker& tw) noexcept : net::server(cfg, klog) , _smp_group(smp) + , _fetch_scheduling_group(fetch_sg) , _topics_frontend(tf) , _config_frontend(cf) , _feature_table(ft) diff --git a/src/v/kafka/server/server.h b/src/v/kafka/server/server.h index 934bb7fd786bf..d97bc0eca10db 100644 --- a/src/v/kafka/server/server.h +++ b/src/v/kafka/server/server.h @@ -28,6 +28,7 @@ #include "utils/ema.h" #include +#include #include #include @@ -38,6 +39,7 @@ class server final : public net::server { server( ss::sharded*, ss::smp_service_group, + ss::scheduling_group, ss::sharded&, ss::sharded&, ss::sharded&, @@ -71,6 +73,9 @@ class server final : public net::server { ss::future<> apply(ss::lw_shared_ptr) final; ss::smp_service_group smp_group() const { return _smp_group; } + ss::scheduling_group fetch_scheduling_group() const { + return _fetch_scheduling_group; + } cluster::topics_frontend& topics_frontend() { return _topics_frontend.local(); } @@ -148,6 +153,7 @@ class server final : public net::server { private: ss::smp_service_group _smp_group; + ss::scheduling_group _fetch_scheduling_group; ss::sharded& _topics_frontend; ss::sharded& _config_frontend; ss::sharded& _feature_table; diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index 488628b92ef15..282457fae386c 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -115,6 +115,7 @@ class redpanda_thread_fixture { proto = std::make_unique( &configs, app.smp_service_groups.kafka_smp_sg(), + app.sched_groups.fetch_sg(), app.metadata_cache, app.controller->get_topics_frontend(), app.controller->get_config_frontend(),