Skip to content

Commit

Permalink
k/server: kafka::server made a peering_sharded_service
Browse files Browse the repository at this point in the history
Functions down the fetch code path will need access to the local
kafka::server instance members like memory semaphores.
  • Loading branch information
dlex committed Jun 1, 2023
1 parent eb8a915 commit d891efe
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 27 deletions.
1 change: 1 addition & 0 deletions src/v/kafka/server/connection_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class connection_context final
connection_context& operator=(const connection_context&) = delete;
connection_context& operator=(connection_context&&) = delete;

/// The instance of \ref kafka::server on the shard serving the connection
server& server() { return _server; }
const ss::sstring& listener() const { return conn->name(); }
std::optional<security::sasl_server>& sasl() { return _sasl; }
Expand Down
4 changes: 3 additions & 1 deletion src/v/kafka/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@

namespace kafka {

class server final : public net::server {
class server final
: public net::server
, public ss::peering_sharded_service<server> {
public:
server(
ss::sharded<net::server_configuration>*,
Expand Down
56 changes: 30 additions & 26 deletions src/v/redpanda/tests/fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "pandaproxy/schema_registry/configuration.h"
#include "redpanda/application.h"
#include "resource_mgmt/cpu_scheduling.h"
#include "ssx/thread_worker.h"
#include "storage/directories.h"
#include "storage/tests/utils/disk_log_builder.h"
#include "test_utils/async.h"
Expand Down Expand Up @@ -125,30 +126,32 @@ class redpanda_thread_fixture {
configs.start(ss::sstring("fixture_config")).get();

// used by request context builder
proto = std::make_unique<kafka::server>(
&configs,
app.smp_service_groups.kafka_smp_sg(),
app.sched_groups.fetch_sg(),
app.metadata_cache,
app.controller->get_topics_frontend(),
app.controller->get_config_frontend(),
app.controller->get_feature_table(),
app.quota_mgr,
app.snc_quota_mgr,
app.group_router,
app.usage_manager,
app.shard_table,
app.partition_manager,
app.id_allocator_frontend,
app.controller->get_credential_store(),
app.controller->get_authorizer(),
app.controller->get_security_frontend(),
app.controller->get_api(),
app.tx_gateway_frontend,
app.tx_registry_frontend,
std::nullopt,
*app.thread_worker,
app.schema_registry());
proto
.start(
&configs,
app.smp_service_groups.kafka_smp_sg(),
app.sched_groups.fetch_sg(),
std::ref(app.metadata_cache),
std::ref(app.controller->get_topics_frontend()),
std::ref(app.controller->get_config_frontend()),
std::ref(app.controller->get_feature_table()),
std::ref(app.quota_mgr),
std::ref(app.snc_quota_mgr),
std::ref(app.group_router),
std::ref(app.usage_manager),
std::ref(app.shard_table),
std::ref(app.partition_manager),
std::ref(app.id_allocator_frontend),
std::ref(app.controller->get_credential_store()),
std::ref(app.controller->get_authorizer()),
std::ref(app.controller->get_security_frontend()),
std::ref(app.controller->get_api()),
std::ref(app.tx_gateway_frontend),
std::ref(app.tx_registry_frontend),
std::nullopt,
std::ref(*app.thread_worker),
std::ref(app.schema_registry()))
.get();

configs.stop().get();
}
Expand Down Expand Up @@ -226,6 +229,7 @@ class redpanda_thread_fixture {

~redpanda_thread_fixture() {
shutdown();
proto.stop().get();
if (remove_on_shutdown) {
std::filesystem::remove_all(data_dir);
}
Expand Down Expand Up @@ -645,7 +649,7 @@ class redpanda_thread_fixture {
conn_ptr make_connection_context() {
security::sasl_server sasl(security::sasl_server::sasl_state::complete);
return ss::make_lw_shared<kafka::connection_context>(
*proto,
proto.local(),
nullptr,
std::move(sasl),
false,
Expand Down Expand Up @@ -689,7 +693,7 @@ class redpanda_thread_fixture {
uint16_t schema_reg_port;
std::filesystem::path data_dir;
ss::sharded<net::server_configuration> configs;
std::unique_ptr<kafka::server> proto;
ss::sharded<kafka::server> proto;
bool remove_on_shutdown;
std::unique_ptr<::stop_signal> app_signal;
};

0 comments on commit d891efe

Please sign in to comment.