Skip to content

Commit

Permalink
k/server: wire replica_selector in kafka server
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <[email protected]>
  • Loading branch information
mmaslankaprv committed May 2, 2023
1 parent 750ac31 commit 4a9a8ce
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 2 deletions.
5 changes: 5 additions & 0 deletions src/v/kafka/server/request_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "kafka/protocol/wire.h"
#include "kafka/server/connection_context.h"
#include "kafka/server/fetch_session_cache.h"
#include "kafka/server/handlers/fetch/replica_selector.h"
#include "kafka/server/logger.h"
#include "kafka/server/response.h"
#include "kafka/server/server.h"
Expand Down Expand Up @@ -231,6 +232,10 @@ class request_context {
return _conn->server().controller_api();
}

const replica_selector& replica_selector() const {
return _conn->server().get_replica_selector();
}

private:
template<typename ResponseType>
void update_usage_stats(const ResponseType& r, size_t response_size) {
Expand Down
5 changes: 4 additions & 1 deletion src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "kafka/server/handlers/describe_groups.h"
#include "kafka/server/handlers/details/security.h"
#include "kafka/server/handlers/end_txn.h"
#include "kafka/server/handlers/fetch/replica_selector.h"
#include "kafka/server/handlers/handler_interface.h"
#include "kafka/server/handlers/heartbeat.h"
#include "kafka/server/handlers/init_producer_id.h"
Expand Down Expand Up @@ -129,7 +130,9 @@ server::server(
, _gssapi_principal_mapper(
config::shard_local_cfg().sasl_kerberos_principal_mapping.bind())
, _krb_configurator(config::shard_local_cfg().sasl_kerberos_config.bind())
, _thread_worker(tw) {
, _thread_worker(tw)
, _replica_selector(
std::make_unique<rack_aware_replica_selector>(_metadata_cache.local())) {
if (qdc_config) {
_qdc_mon.emplace(*qdc_config);
}
Expand Down
7 changes: 6 additions & 1 deletion src/v/kafka/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "kafka/server/fetch_metadata_cache.hh"
#include "kafka/server/fetch_session_cache.h"
#include "kafka/server/fwd.h"
#include "kafka/server/handlers/fetch/replica_selector.h"
#include "kafka/server/queue_depth_monitor.h"
#include "net/server.h"
#include "security/fwd.h"
Expand Down Expand Up @@ -160,6 +161,10 @@ class server final : public net::server {
static std::vector<bool>
convert_api_names_to_key_bitmap(const std::vector<ss::sstring>& api_names);

const replica_selector& get_replica_selector() const {
return *_replica_selector;
}

private:
ss::smp_service_group _smp_group;
ss::scheduling_group _fetch_scheduling_group;
Expand Down Expand Up @@ -187,9 +192,9 @@ class server final : public net::server {
security::tls::principal_mapper _mtls_principal_mapper;
security::gssapi_principal_mapper _gssapi_principal_mapper;
security::krb5::configurator _krb_configurator;

class latency_probe _probe;
ssx::thread_worker& _thread_worker;
std::unique_ptr<replica_selector> _replica_selector;
};

} // namespace kafka

0 comments on commit 4a9a8ce

Please sign in to comment.