Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Put consumers in their own scheduling group #10398

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,12 @@ configuration::configuration()
"Maximum number of bytes returned in fetch request",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
55_MiB)
, use_fetch_scheduler_group(
*this,
"use_fetch_scheduler_group",
"Use a separate scheduler group for fetch processing",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
true)
, metadata_status_wait_timeout_ms(
*this,
"metadata_status_wait_timeout_ms",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ struct configuration final : public config_store {
enum_property<model::timestamp_type> log_message_timestamp_type;
enum_property<model::compression> log_compression_type;
property<size_t> fetch_max_bytes;
property<bool> use_fetch_scheduler_group;
property<std::chrono::milliseconds> metadata_status_wait_timeout_ms;
bounded_property<std::optional<int64_t>> kafka_connection_rate_limit;
property<std::vector<ss::sstring>> kafka_connection_rate_limit_overrides;
Expand Down
39 changes: 23 additions & 16 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@

#include <seastar/core/do_with.hh>
#include <seastar/core/future.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/util/log.hh>

#include <boost/range/irange.hpp>
Expand Down Expand Up @@ -555,22 +557,27 @@ fetch_handler::handle(request_context rctx, ss::smp_service_group ssg) {
return ss::do_with(
std::make_unique<op_context>(std::move(rctx), ssg),
[](std::unique_ptr<op_context>& octx_ptr) {
auto& octx = *octx_ptr;
log_request(octx.rctx.header(), octx.request);
// top-level error is used for session-level errors
if (octx.session_ctx.has_error()) {
octx.response.data.error_code = octx.session_ctx.error();
return std::move(octx).send_response();
}
octx.response.data.error_code = error_code::none;
// first fetch, do not wait
return fetch_topic_partitions(octx)
.then([&octx] {
return ss::do_until(
[&octx] { return octx.should_stop_fetch(); },
[&octx] { return fetch_topic_partitions(octx); });
})
.then([&octx] { return std::move(octx).send_response(); });
auto sg
= octx_ptr->rctx.connection()->server().fetch_scheduling_group();
return ss::with_scheduling_group(sg, [&octx_ptr] {
auto& octx = *octx_ptr;

log_request(octx.rctx.header(), octx.request);
// top-level error is used for session-level errors
if (octx.session_ctx.has_error()) {
octx.response.data.error_code = octx.session_ctx.error();
return std::move(octx).send_response();
}
octx.response.data.error_code = error_code::none;
// first fetch, do not wait
return fetch_topic_partitions(octx)
.then([&octx] {
return ss::do_until(
[&octx] { return octx.should_stop_fetch(); },
[&octx] { return fetch_topic_partitions(octx); });
})
.then([&octx] { return std::move(octx).send_response(); });
});
});
}

Expand Down
8 changes: 8 additions & 0 deletions src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ namespace kafka {
server::server(
ss::sharded<net::server_configuration>* cfg,
ss::smp_service_group smp,
ss::scheduling_group fetch_sg,
ss::sharded<cluster::metadata_cache>& meta,
ss::sharded<cluster::topics_frontend>& tf,
ss::sharded<cluster::config_frontend>& cf,
Expand All @@ -99,6 +100,7 @@ server::server(
ssx::thread_worker& tw) noexcept
: net::server(cfg, klog)
, _smp_group(smp)
, _fetch_scheduling_group(fetch_sg)
, _topics_frontend(tf)
, _config_frontend(cf)
, _feature_table(ft)
Expand Down Expand Up @@ -134,6 +136,12 @@ server::server(
_probe.setup_public_metrics();
}

ss::scheduling_group server::fetch_scheduling_group() const {
return config::shard_local_cfg().use_fetch_scheduler_group()
? _fetch_scheduling_group
: ss::default_scheduling_group();
}

coordinator_ntp_mapper& server::coordinator_mapper() {
return _group_router.local().coordinator_mapper().local();
}
Expand Down
13 changes: 13 additions & 0 deletions src/v/kafka/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "utils/ema.h"

#include <seastar/core/future.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/smp.hh>

Expand All @@ -38,6 +39,7 @@ class server final : public net::server {
server(
ss::sharded<net::server_configuration>*,
ss::smp_service_group,
ss::scheduling_group,
ss::sharded<cluster::metadata_cache>&,
ss::sharded<cluster::topics_frontend>&,
ss::sharded<cluster::config_frontend>&,
Expand Down Expand Up @@ -71,6 +73,16 @@ class server final : public net::server {
ss::future<> apply(ss::lw_shared_ptr<net::connection>) final;

ss::smp_service_group smp_group() const { return _smp_group; }

/**
* @brief Return the scheduling group to use for fetch requests.
*
* By default, fetches use a dedicated scheduling group, but this may
* be changed by configuration to restore the old behavior of lumping
* them with most other tasks in the default scheduling group.
*/
ss::scheduling_group fetch_scheduling_group() const;

cluster::topics_frontend& topics_frontend() {
return _topics_frontend.local();
}
Expand Down Expand Up @@ -148,6 +160,7 @@ class server final : public net::server {

private:
ss::smp_service_group _smp_group;
ss::scheduling_group _fetch_scheduling_group;
ss::sharded<cluster::topics_frontend>& _topics_frontend;
ss::sharded<cluster::config_frontend>& _config_frontend;
ss::sharded<features::feature_table>& _feature_table;
Expand Down
53 changes: 27 additions & 26 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,15 +428,15 @@ void application::initialize(
[this] { smp_service_groups.destroy_groups().get(); });

if (groups) {
_scheduling_groups = *groups;
sched_groups = *groups;
return;
}

_scheduling_groups.create_groups().get();
_scheduling_groups_probe.wire_up(_scheduling_groups);
sched_groups.create_groups().get();
_scheduling_groups_probe.wire_up(sched_groups);
_deferred.emplace_back([this] {
_scheduling_groups_probe.clear();
_scheduling_groups.destroy_groups().get();
sched_groups.destroy_groups().get();
});

if (proxy_cfg) {
Expand Down Expand Up @@ -817,7 +817,7 @@ void application::configure_admin_server() {
syschecks::systemd_message("constructing http server").get();
construct_service(
_admin,
admin_server_cfg_from_global_cfg(_scheduling_groups),
admin_server_cfg_from_global_cfg(sched_groups),
std::ref(partition_manager),
std::ref(cp_partition_manager),
controller.get(),
Expand Down Expand Up @@ -1014,7 +1014,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
raft_group_manager
.start(
node_id,
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
[] {
return raft::group_manager::configuration{
.heartbeat_interval
Expand Down Expand Up @@ -1087,7 +1087,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
_archival_upload_housekeeping,
std::ref(cloud_storage_api),
ss::sharded_parameter(
[sg = _scheduling_groups.archival_upload()] { return sg; }))
[sg = sched_groups.archival_upload()] { return sg; }))
.get();
_archival_upload_housekeeping
.invoke_on_all(&archival::upload_housekeeping_service::start)
Expand All @@ -1107,7 +1107,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
std::ref(cloud_storage_api),
std::ref(shadow_index_cache),
ss::sharded_parameter(
[sg = _scheduling_groups.archival_upload(),
[sg = sched_groups.archival_upload(),
p = archival_priority(),
enabled = archival_storage_enabled()]()
-> ss::lw_shared_ptr<archival::configuration> {
Expand Down Expand Up @@ -1150,7 +1150,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
node_id,
std::ref(local_monitor),
std::ref(_connection_cache),
_scheduling_groups.self_test_sg())
sched_groups.self_test_sg())
.get();

construct_single_service_sharded(
Expand Down Expand Up @@ -1244,7 +1244,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
construct_service(
_archival_upload_controller,
std::ref(partition_manager),
make_upload_controller_config(_scheduling_groups.archival_upload()))
make_upload_controller_config(sched_groups.archival_upload()))
.get();

construct_service(
Expand Down Expand Up @@ -1293,7 +1293,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
syschecks::systemd_message("Creating kafka group router").get();
construct_service(
group_router,
_scheduling_groups.kafka_sg(),
sched_groups.kafka_sg(),
smp_service_groups.kafka_smp_sg(),
std::ref(_group_manager),
std::ref(shard_table),
Expand Down Expand Up @@ -1503,6 +1503,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
.start(
&kafka_cfg,
smp_service_groups.kafka_smp_sg(),
sched_groups.fetch_sg(),
std::ref(metadata_cache),
std::ref(controller->get_topics_frontend()),
std::ref(controller->get_config_frontend()),
Expand Down Expand Up @@ -1532,7 +1533,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
_compaction_controller,
std::ref(storage),
compaction_controller_config(
_scheduling_groups.compaction_sg(),
sched_groups.compaction_sg(),
priority_manager::local().compaction_priority()))
.get();
}
Expand Down Expand Up @@ -1574,9 +1575,9 @@ void application::wire_up_bootstrap_services() {
storage,
[]() { return kvstore_config_from_global_config(); },
[this]() {
auto log_cfg = manager_config_from_global_config(_scheduling_groups);
auto log_cfg = manager_config_from_global_config(sched_groups);
log_cfg.reclaim_opts.background_reclaimer_sg
= _scheduling_groups.cache_background_reclaim_sg();
= sched_groups.cache_background_reclaim_sg();
return log_cfg;
},
std::ref(feature_table))
Expand Down Expand Up @@ -1787,7 +1788,7 @@ void application::start_bootstrap_services() {
std::vector<std::unique_ptr<rpc::service>> bootstrap_service;
bootstrap_service.push_back(
std::make_unique<cluster::bootstrap_service>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(storage)));
s.add_services(std::move(bootstrap_service));
Expand Down Expand Up @@ -1925,7 +1926,7 @@ void application::start_runtime_services(
runtime_services.push_back(std::make_unique<raft::service<
cluster::partition_manager,
cluster::shard_table>>(
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
smp_service_groups.raft_smp_sg(),
partition_manager,
shard_table.local(),
Expand All @@ -1949,13 +1950,13 @@ void application::start_runtime_services(
.invoke_on_all([this, start_raft_rpc_early](rpc::rpc_server& s) {
std::vector<std::unique_ptr<rpc::service>> runtime_services;
runtime_services.push_back(std::make_unique<cluster::id_allocator>(
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
smp_service_groups.raft_smp_sg(),
std::ref(id_allocator_frontend)));
// _rm_group_proxy is wrap around a sharded service with only
// `.local()' access so it's ok to share without foreign_ptr
runtime_services.push_back(std::make_unique<cluster::tx_gateway>(
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
smp_service_groups.raft_smp_sg(),
std::ref(tx_gateway_frontend),
_rm_group_proxy.get(),
Expand All @@ -1965,15 +1966,15 @@ void application::start_runtime_services(
runtime_services.push_back(std::make_unique<raft::service<
cluster::partition_manager,
cluster::shard_table>>(
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
smp_service_groups.raft_smp_sg(),
partition_manager,
shard_table.local(),
config::shard_local_cfg().raft_heartbeat_interval_ms()));
}

runtime_services.push_back(std::make_unique<cluster::service>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(controller->get_topics_frontend()),
std::ref(controller->get_members_manager()),
Expand All @@ -1991,37 +1992,37 @@ void application::start_runtime_services(

runtime_services.push_back(
std::make_unique<cluster::metadata_dissemination_handler>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(controller->get_partition_leaders())));

runtime_services.push_back(
std::make_unique<cluster::node_status_rpc_handler>(
_scheduling_groups.node_status(),
sched_groups.node_status(),
smp_service_groups.cluster_smp_sg(),
std::ref(node_status_backend)));

runtime_services.push_back(
std::make_unique<cluster::self_test_rpc_handler>(
_scheduling_groups.node_status(),
sched_groups.node_status(),
smp_service_groups.cluster_smp_sg(),
std::ref(self_test_backend)));

runtime_services.push_back(
std::make_unique<cluster::partition_balancer_rpc_handler>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(controller->get_partition_balancer())));

runtime_services.push_back(
std::make_unique<cluster::ephemeral_credential_service>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(controller->get_ephemeral_credential_frontend())));

runtime_services.push_back(
std::make_unique<cluster::topic_recovery_status_rpc_handler>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(topic_recovery_service)));
s.add_services(std::move(runtime_services));
Expand Down
2 changes: 1 addition & 1 deletion src/v/redpanda/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class application {
ss::future<> set_proxy_client_config(ss::sstring name, std::any val);

smp_groups smp_service_groups;
scheduling_groups sched_groups;

// Sorted list of services (public members)
ss::sharded<cloud_storage::cache> shadow_index_cache;
Expand Down Expand Up @@ -230,7 +231,6 @@ class application {
std::optional<pandaproxy::schema_registry::configuration>
_schema_reg_config;
std::optional<kafka::client::configuration> _schema_reg_client_config;
scheduling_groups _scheduling_groups;
scheduling_groups_probe _scheduling_groups_probe;
ss::logger _log;

Expand Down
1 change: 1 addition & 0 deletions src/v/redpanda/tests/fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class redpanda_thread_fixture {
proto = std::make_unique<kafka::server>(
&configs,
app.smp_service_groups.kafka_smp_sg(),
app.sched_groups.fetch_sg(),
app.metadata_cache,
app.controller->get_topics_frontend(),
app.controller->get_config_frontend(),
Expand Down
Loading