Skip to content

Commit

Permalink
Merge pull request #10398 from travisdowns/td-435-consumers-in-own-sg
Browse files Browse the repository at this point in the history
Put consumers in their own scheduling group
  • Loading branch information
piyushredpanda committed Apr 27, 2023
2 parents 64eaf55 + bc72f5e commit f23f4ba
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 44 deletions.
6 changes: 6 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,12 @@ configuration::configuration()
"Maximum number of bytes returned in fetch request",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
55_MiB)
, use_fetch_scheduler_group(
*this,
"use_fetch_scheduler_group",
"Use a separate scheduler group for fetch processing",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
true)
, metadata_status_wait_timeout_ms(
*this,
"metadata_status_wait_timeout_ms",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ struct configuration final : public config_store {
enum_property<model::timestamp_type> log_message_timestamp_type;
enum_property<model::compression> log_compression_type;
property<size_t> fetch_max_bytes;
property<bool> use_fetch_scheduler_group;
property<std::chrono::milliseconds> metadata_status_wait_timeout_ms;
bounded_property<std::optional<int64_t>> kafka_connection_rate_limit;
property<std::vector<ss::sstring>> kafka_connection_rate_limit_overrides;
Expand Down
39 changes: 23 additions & 16 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@

#include <seastar/core/do_with.hh>
#include <seastar/core/future.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/util/log.hh>

#include <boost/range/irange.hpp>
Expand Down Expand Up @@ -555,22 +557,27 @@ fetch_handler::handle(request_context rctx, ss::smp_service_group ssg) {
return ss::do_with(
std::make_unique<op_context>(std::move(rctx), ssg),
[](std::unique_ptr<op_context>& octx_ptr) {
auto& octx = *octx_ptr;
log_request(octx.rctx.header(), octx.request);
// top-level error is used for session-level errors
if (octx.session_ctx.has_error()) {
octx.response.data.error_code = octx.session_ctx.error();
return std::move(octx).send_response();
}
octx.response.data.error_code = error_code::none;
// first fetch, do not wait
return fetch_topic_partitions(octx)
.then([&octx] {
return ss::do_until(
[&octx] { return octx.should_stop_fetch(); },
[&octx] { return fetch_topic_partitions(octx); });
})
.then([&octx] { return std::move(octx).send_response(); });
auto sg
= octx_ptr->rctx.connection()->server().fetch_scheduling_group();
return ss::with_scheduling_group(sg, [&octx_ptr] {
auto& octx = *octx_ptr;

log_request(octx.rctx.header(), octx.request);
// top-level error is used for session-level errors
if (octx.session_ctx.has_error()) {
octx.response.data.error_code = octx.session_ctx.error();
return std::move(octx).send_response();
}
octx.response.data.error_code = error_code::none;
// first fetch, do not wait
return fetch_topic_partitions(octx)
.then([&octx] {
return ss::do_until(
[&octx] { return octx.should_stop_fetch(); },
[&octx] { return fetch_topic_partitions(octx); });
})
.then([&octx] { return std::move(octx).send_response(); });
});
});
}

Expand Down
8 changes: 8 additions & 0 deletions src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ namespace kafka {
server::server(
ss::sharded<net::server_configuration>* cfg,
ss::smp_service_group smp,
ss::scheduling_group fetch_sg,
ss::sharded<cluster::metadata_cache>& meta,
ss::sharded<cluster::topics_frontend>& tf,
ss::sharded<cluster::config_frontend>& cf,
Expand All @@ -102,6 +103,7 @@ server::server(
ssx::thread_worker& tw) noexcept
: net::server(cfg, klog)
, _smp_group(smp)
, _fetch_scheduling_group(fetch_sg)
, _topics_frontend(tf)
, _config_frontend(cf)
, _feature_table(ft)
Expand Down Expand Up @@ -137,6 +139,12 @@ server::server(
_probe.setup_public_metrics();
}

ss::scheduling_group server::fetch_scheduling_group() const {
return config::shard_local_cfg().use_fetch_scheduler_group()
? _fetch_scheduling_group
: ss::default_scheduling_group();
}

coordinator_ntp_mapper& server::coordinator_mapper() {
return _group_router.local().coordinator_mapper().local();
}
Expand Down
13 changes: 13 additions & 0 deletions src/v/kafka/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "utils/ema.h"

#include <seastar/core/future.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/smp.hh>

Expand All @@ -38,6 +39,7 @@ class server final : public net::server {
server(
ss::sharded<net::server_configuration>*,
ss::smp_service_group,
ss::scheduling_group,
ss::sharded<cluster::metadata_cache>&,
ss::sharded<cluster::topics_frontend>&,
ss::sharded<cluster::config_frontend>&,
Expand Down Expand Up @@ -71,6 +73,16 @@ class server final : public net::server {
ss::future<> apply(ss::lw_shared_ptr<net::connection>) final;

ss::smp_service_group smp_group() const { return _smp_group; }

/**
* @brief Return the scheduling group to use for fetch requests.
*
* By default, fetches use a dedicated scheduling group, but this may
* be changed by configuration to restore the old behavior of lumping
* them with most other tasks in the default scheduling group.
*/
ss::scheduling_group fetch_scheduling_group() const;

cluster::topics_frontend& topics_frontend() {
return _topics_frontend.local();
}
Expand Down Expand Up @@ -157,6 +169,7 @@ class server final : public net::server {

private:
ss::smp_service_group _smp_group;
ss::scheduling_group _fetch_scheduling_group;
ss::sharded<cluster::topics_frontend>& _topics_frontend;
ss::sharded<cluster::config_frontend>& _config_frontend;
ss::sharded<features::feature_table>& _feature_table;
Expand Down
53 changes: 27 additions & 26 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -429,15 +429,15 @@ void application::initialize(
[this] { smp_service_groups.destroy_groups().get(); });

if (groups) {
_scheduling_groups = *groups;
sched_groups = *groups;
return;
}

_scheduling_groups.create_groups().get();
_scheduling_groups_probe.wire_up(_scheduling_groups);
sched_groups.create_groups().get();
_scheduling_groups_probe.wire_up(sched_groups);
_deferred.emplace_back([this] {
_scheduling_groups_probe.clear();
_scheduling_groups.destroy_groups().get();
sched_groups.destroy_groups().get();
});

if (proxy_cfg) {
Expand Down Expand Up @@ -818,7 +818,7 @@ void application::configure_admin_server() {
syschecks::systemd_message("constructing http server").get();
construct_service(
_admin,
admin_server_cfg_from_global_cfg(_scheduling_groups),
admin_server_cfg_from_global_cfg(sched_groups),
std::ref(partition_manager),
std::ref(cp_partition_manager),
controller.get(),
Expand Down Expand Up @@ -1015,7 +1015,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
raft_group_manager
.start(
node_id,
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
[] {
return raft::group_manager::configuration{
.heartbeat_interval
Expand Down Expand Up @@ -1088,7 +1088,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
_archival_upload_housekeeping,
std::ref(cloud_storage_api),
ss::sharded_parameter(
[sg = _scheduling_groups.archival_upload()] { return sg; }))
[sg = sched_groups.archival_upload()] { return sg; }))
.get();
_archival_upload_housekeeping
.invoke_on_all(&archival::upload_housekeeping_service::start)
Expand All @@ -1112,7 +1112,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
std::ref(cloud_storage_api),
std::ref(shadow_index_cache),
ss::sharded_parameter(
[sg = _scheduling_groups.archival_upload(),
[sg = sched_groups.archival_upload(),
p = archival_priority(),
enabled = archival_storage_enabled()]()
-> ss::lw_shared_ptr<archival::configuration> {
Expand Down Expand Up @@ -1155,7 +1155,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
node_id,
std::ref(local_monitor),
std::ref(_connection_cache),
_scheduling_groups.self_test_sg())
sched_groups.self_test_sg())
.get();

construct_single_service_sharded(
Expand Down Expand Up @@ -1249,7 +1249,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
construct_service(
_archival_upload_controller,
std::ref(partition_manager),
make_upload_controller_config(_scheduling_groups.archival_upload()))
make_upload_controller_config(sched_groups.archival_upload()))
.get();

construct_service(
Expand Down Expand Up @@ -1298,7 +1298,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
syschecks::systemd_message("Creating kafka group router").get();
construct_service(
group_router,
_scheduling_groups.kafka_sg(),
sched_groups.kafka_sg(),
smp_service_groups.kafka_smp_sg(),
std::ref(_group_manager),
std::ref(shard_table),
Expand Down Expand Up @@ -1514,6 +1514,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
.start(
&kafka_cfg,
smp_service_groups.kafka_smp_sg(),
sched_groups.fetch_sg(),
std::ref(metadata_cache),
std::ref(controller->get_topics_frontend()),
std::ref(controller->get_config_frontend()),
Expand Down Expand Up @@ -1543,7 +1544,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
_compaction_controller,
std::ref(storage),
compaction_controller_config(
_scheduling_groups.compaction_sg(),
sched_groups.compaction_sg(),
priority_manager::local().compaction_priority()))
.get();
}
Expand Down Expand Up @@ -1585,9 +1586,9 @@ void application::wire_up_bootstrap_services() {
storage,
[]() { return kvstore_config_from_global_config(); },
[this]() {
auto log_cfg = manager_config_from_global_config(_scheduling_groups);
auto log_cfg = manager_config_from_global_config(sched_groups);
log_cfg.reclaim_opts.background_reclaimer_sg
= _scheduling_groups.cache_background_reclaim_sg();
= sched_groups.cache_background_reclaim_sg();
return log_cfg;
},
std::ref(feature_table))
Expand Down Expand Up @@ -1798,7 +1799,7 @@ void application::start_bootstrap_services() {
std::vector<std::unique_ptr<rpc::service>> bootstrap_service;
bootstrap_service.push_back(
std::make_unique<cluster::bootstrap_service>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(storage)));
s.add_services(std::move(bootstrap_service));
Expand Down Expand Up @@ -1936,7 +1937,7 @@ void application::start_runtime_services(
runtime_services.push_back(std::make_unique<raft::service<
cluster::partition_manager,
cluster::shard_table>>(
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
smp_service_groups.raft_smp_sg(),
partition_manager,
shard_table.local(),
Expand All @@ -1960,13 +1961,13 @@ void application::start_runtime_services(
.invoke_on_all([this, start_raft_rpc_early](rpc::rpc_server& s) {
std::vector<std::unique_ptr<rpc::service>> runtime_services;
runtime_services.push_back(std::make_unique<cluster::id_allocator>(
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
smp_service_groups.raft_smp_sg(),
std::ref(id_allocator_frontend)));
// _rm_group_proxy is wrap around a sharded service with only
// `.local()' access so it's ok to share without foreign_ptr
runtime_services.push_back(std::make_unique<cluster::tx_gateway>(
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
smp_service_groups.raft_smp_sg(),
std::ref(tx_gateway_frontend),
_rm_group_proxy.get(),
Expand All @@ -1976,15 +1977,15 @@ void application::start_runtime_services(
runtime_services.push_back(std::make_unique<raft::service<
cluster::partition_manager,
cluster::shard_table>>(
_scheduling_groups.raft_sg(),
sched_groups.raft_sg(),
smp_service_groups.raft_smp_sg(),
partition_manager,
shard_table.local(),
config::shard_local_cfg().raft_heartbeat_interval_ms()));
}

runtime_services.push_back(std::make_unique<cluster::service>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(controller->get_topics_frontend()),
std::ref(controller->get_members_manager()),
Expand All @@ -2002,37 +2003,37 @@ void application::start_runtime_services(

runtime_services.push_back(
std::make_unique<cluster::metadata_dissemination_handler>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(controller->get_partition_leaders())));

runtime_services.push_back(
std::make_unique<cluster::node_status_rpc_handler>(
_scheduling_groups.node_status(),
sched_groups.node_status(),
smp_service_groups.cluster_smp_sg(),
std::ref(node_status_backend)));

runtime_services.push_back(
std::make_unique<cluster::self_test_rpc_handler>(
_scheduling_groups.node_status(),
sched_groups.node_status(),
smp_service_groups.cluster_smp_sg(),
std::ref(self_test_backend)));

runtime_services.push_back(
std::make_unique<cluster::partition_balancer_rpc_handler>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(controller->get_partition_balancer())));

runtime_services.push_back(
std::make_unique<cluster::ephemeral_credential_service>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(controller->get_ephemeral_credential_frontend())));

runtime_services.push_back(
std::make_unique<cluster::topic_recovery_status_rpc_handler>(
_scheduling_groups.cluster_sg(),
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(topic_recovery_service)));
s.add_services(std::move(runtime_services));
Expand Down
2 changes: 1 addition & 1 deletion src/v/redpanda/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class application {
ss::future<> set_proxy_client_config(ss::sstring name, std::any val);

smp_groups smp_service_groups;
scheduling_groups sched_groups;

// Sorted list of services (public members)
ss::sharded<cloud_storage::cache> shadow_index_cache;
Expand Down Expand Up @@ -231,7 +232,6 @@ class application {
std::optional<pandaproxy::schema_registry::configuration>
_schema_reg_config;
std::optional<kafka::client::configuration> _schema_reg_client_config;
scheduling_groups _scheduling_groups;
scheduling_groups_probe _scheduling_groups_probe;
ss::logger _log;

Expand Down
1 change: 1 addition & 0 deletions src/v/redpanda/tests/fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class redpanda_thread_fixture {
proto = std::make_unique<kafka::server>(
&configs,
app.smp_service_groups.kafka_smp_sg(),
app.sched_groups.fetch_sg(),
app.metadata_cache,
app.controller->get_topics_frontend(),
app.controller->get_config_frontend(),
Expand Down
Loading

0 comments on commit f23f4ba

Please sign in to comment.