Skip to content

Commit

Permalink
Use fetch scheduling group to handle fetches
Browse files Browse the repository at this point in the history
Add the with_scheduling_group call to switch to the kafka scheduling
group in the fetch handling flow. See prior change for context.

Issue redpanda-data/core-internal/#435.
  • Loading branch information
travisdowns committed Apr 26, 2023
1 parent d89da57 commit 10721ef
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 16 deletions.
39 changes: 23 additions & 16 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@

#include <seastar/core/do_with.hh>
#include <seastar/core/future.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/util/log.hh>

#include <boost/range/irange.hpp>
Expand Down Expand Up @@ -555,22 +557,27 @@ fetch_handler::handle(request_context rctx, ss::smp_service_group ssg) {
return ss::do_with(
std::make_unique<op_context>(std::move(rctx), ssg),
[](std::unique_ptr<op_context>& octx_ptr) {
auto& octx = *octx_ptr;
log_request(octx.rctx.header(), octx.request);
// top-level error is used for session-level errors
if (octx.session_ctx.has_error()) {
octx.response.data.error_code = octx.session_ctx.error();
return std::move(octx).send_response();
}
octx.response.data.error_code = error_code::none;
// first fetch, do not wait
return fetch_topic_partitions(octx)
.then([&octx] {
return ss::do_until(
[&octx] { return octx.should_stop_fetch(); },
[&octx] { return fetch_topic_partitions(octx); });
})
.then([&octx] { return std::move(octx).send_response(); });
auto sg
= octx_ptr->rctx.connection()->server().fetch_scheduling_group();
return ss::with_scheduling_group(sg, [&octx_ptr] {
auto& octx = *octx_ptr;

log_request(octx.rctx.header(), octx.request);
// top-level error is used for session-level errors
if (octx.session_ctx.has_error()) {
octx.response.data.error_code = octx.session_ctx.error();
return std::move(octx).send_response();
}
octx.response.data.error_code = error_code::none;
// first fetch, do not wait
return fetch_topic_partitions(octx)
.then([&octx] {
return ss::do_until(
[&octx] { return octx.should_stop_fetch(); },
[&octx] { return fetch_topic_partitions(octx); });
})
.then([&octx] { return std::move(octx).send_response(); });
});
});
}

Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ namespace kafka {
server::server(
ss::sharded<net::server_configuration>* cfg,
ss::smp_service_group smp,
ss::scheduling_group fetch_sg,
ss::sharded<cluster::metadata_cache>& meta,
ss::sharded<cluster::topics_frontend>& tf,
ss::sharded<cluster::config_frontend>& cf,
Expand All @@ -99,6 +100,7 @@ server::server(
ssx::thread_worker& tw) noexcept
: net::server(cfg, klog)
, _smp_group(smp)
, _fetch_scheduling_group(fetch_sg)
, _topics_frontend(tf)
, _config_frontend(cf)
, _feature_table(ft)
Expand Down
6 changes: 6 additions & 0 deletions src/v/kafka/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "utils/ema.h"

#include <seastar/core/future.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/smp.hh>

Expand All @@ -38,6 +39,7 @@ class server final : public net::server {
server(
ss::sharded<net::server_configuration>*,
ss::smp_service_group,
ss::scheduling_group,
ss::sharded<cluster::metadata_cache>&,
ss::sharded<cluster::topics_frontend>&,
ss::sharded<cluster::config_frontend>&,
Expand Down Expand Up @@ -71,6 +73,9 @@ class server final : public net::server {
ss::future<> apply(ss::lw_shared_ptr<net::connection>) final;

ss::smp_service_group smp_group() const { return _smp_group; }
ss::scheduling_group fetch_scheduling_group() const {
return _fetch_scheduling_group;
}
cluster::topics_frontend& topics_frontend() {
return _topics_frontend.local();
}
Expand Down Expand Up @@ -148,6 +153,7 @@ class server final : public net::server {

private:
ss::smp_service_group _smp_group;
ss::scheduling_group _fetch_scheduling_group;
ss::sharded<cluster::topics_frontend>& _topics_frontend;
ss::sharded<cluster::config_frontend>& _config_frontend;
ss::sharded<features::feature_table>& _feature_table;
Expand Down
1 change: 1 addition & 0 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
.start(
&kafka_cfg,
smp_service_groups.kafka_smp_sg(),
_scheduling_groups.fetch_sg(),
std::ref(metadata_cache),
std::ref(controller->get_topics_frontend()),
std::ref(controller->get_config_frontend()),
Expand Down

0 comments on commit 10721ef

Please sign in to comment.