Skip to content

Commit

Permalink
Update request handling to use polymorphic handler
Browse files Browse the repository at this point in the history
Preceding changes in this series introduced a runtime polymorphic
handler class, and this change switches most of the request handling
to use it.

In particular, we replace the large switch on API key which dispatches
to a template method to a lookup of the handler method and virtual
dispatch.

Some handlers that need special processing like the authentication/SASL
related ones still use the old approach for now.
  • Loading branch information
travisdowns committed Jul 5, 2022
1 parent 72ee787 commit 20cb02f
Showing 1 changed file with 50 additions and 119 deletions.
169 changes: 50 additions & 119 deletions src/v/kafka/server/requests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "kafka/server/handlers/handlers.h"
#include "kafka/server/handlers/produce.h"
#include "kafka/protocol/schemata/api_versions_request.h"
#include "kafka/protocol/schemata/fetch_request.h"
#include "kafka/protocol/schemata/produce_request.h"
#include "kafka/server/handlers/api_versions.h"
#include "kafka/server/handlers/details/any_handler.h"
#include "kafka/server/handlers/sasl_authenticate.h"
#include "kafka/server/handlers/sasl_handshake.h"
#include "kafka/server/request_context.h"
#include "kafka/types.h"
#include "utils/to_string.h"
Expand Down Expand Up @@ -42,54 +47,6 @@ requires(KafkaApiHandler<Request> || KafkaApiTwoPhaseHandler<Request>)
}
};

/**
* Dispatch API versions request without version checks.
*
* The version bounds checks are not applied to this request because the client
* does not yet know what versions this server supports. The api versions
* request is used by a client to query this information.
*/
template<>
struct process_dispatch<api_versions_handler> {
static process_result_stages
process(request_context&& ctx, ss::smp_service_group g) {
return process_result_stages::single_stage(
api_versions_handler::handle(std::move(ctx), g));
}
};
/**
* Requests processed in two stages
*/
template<>
struct process_dispatch<produce_handler> {
static process_result_stages
process(request_context&& ctx, ss::smp_service_group g) {
return produce_handler::handle(std::move(ctx), g);
}
};

template<>
struct process_dispatch<offset_commit_handler> {
static process_result_stages
process(request_context&& ctx, ss::smp_service_group g) {
return offset_commit_handler::handle(std::move(ctx), g);
}
};
template<>
struct process_dispatch<join_group_handler> {
static process_result_stages
process(request_context&& ctx, ss::smp_service_group g) {
return join_group_handler::handle(std::move(ctx), g);
}
};
template<>
struct process_dispatch<sync_group_handler> {
static process_result_stages
process(request_context&& ctx, ss::smp_service_group g) {
return sync_group_handler::handle(std::move(ctx), g);
}
};

template<typename Request>
requires(KafkaApiHandler<Request> || KafkaApiTwoPhaseHandler<Request>)
process_result_stages
Expand All @@ -107,6 +64,35 @@ requires(KafkaApiHandler<Request> || KafkaApiTwoPhaseHandler<Request>)
return process_dispatch<Request>::process(std::move(ctx), g);
}

process_result_stages process_generic(
any_handler handler, request_context&& ctx, ss::smp_service_group g) {
vlog(
klog.trace,
"[{}:{}] processing name:{}, key:{}, version:{} for {}",
ctx.connection()->client_host(),
ctx.connection()->client_port(),
handler->name(),
ctx.header().key,
ctx.header().version,
ctx.header().client_id.value_or(std::string_view("unset-client-id")));

// We do a version check for most API requests, but for api_version
// requests we skip them. We do not apply them for api_versions,
// because the client does not yet know what
// versions this server supports. The api versions request is used by a
// client to query this information.
if (ctx.header().key != api_versions_api::key &&
(ctx.header().version < handler->min_supported() ||
ctx.header().version > handler->max_supported())) {
throw std::runtime_error(fmt::format(
"Unsupported version {} for {} API",
ctx.header().version,
handler->name()));
}

return handler->handle(std::move(ctx), g);
}

class kafka_authentication_exception : public std::runtime_error {
public:
explicit kafka_authentication_exception(const std::string& m)
Expand Down Expand Up @@ -147,7 +133,7 @@ handle_auth_handshake(request_context&& ctx, ss::smp_service_group g) {
static ss::future<response_ptr>
handle_auth_initial(request_context&& ctx, ss::smp_service_group g) {
switch (ctx.header().key) {
case api_versions_handler::api::key: {
case api_versions_api::key: {
auto r = api_versions_handler::handle_raw(ctx);
if (r.data.error_code == error_code::none) {
ctx.sasl().set_state(security::sasl_server::sasl_state::handshake);
Expand Down Expand Up @@ -233,8 +219,8 @@ handle_auth(request_context&& ctx, ss::smp_service_group g) {
// only track latency for push and fetch requests
bool track_latency(api_key key) {
switch (key) {
case fetch_handler::api::key:
case produce_handler::api::key:
case fetch_api::key:
case produce_api::key:
return true;
default:
return false;
Expand All @@ -260,81 +246,26 @@ process_request(request_context&& ctx, ss::smp_service_group g) {
}));
}

switch (ctx.header().key) {
case api_versions_handler::api::key:
return do_process<api_versions_handler>(std::move(ctx), g);
case metadata_handler::api::key:
return do_process<metadata_handler>(std::move(ctx), g);
case list_groups_handler::api::key:
return do_process<list_groups_handler>(std::move(ctx), g);
case find_coordinator_handler::api::key:
return do_process<find_coordinator_handler>(std::move(ctx), g);
case offset_fetch_handler::api::key:
return do_process<offset_fetch_handler>(std::move(ctx), g);
case produce_handler::api::key:
return do_process<produce_handler>(std::move(ctx), g);
case list_offsets_handler::api::key:
return do_process<list_offsets_handler>(std::move(ctx), g);
case offset_commit_handler::api::key:
return do_process<offset_commit_handler>(std::move(ctx), g);
case fetch_handler::api::key:
return do_process<fetch_handler>(std::move(ctx), g);
case join_group_handler::api::key:
return do_process<join_group_handler>(std::move(ctx), g);
case heartbeat_handler::api::key:
return do_process<heartbeat_handler>(std::move(ctx), g);
case leave_group_handler::api::key:
return do_process<leave_group_handler>(std::move(ctx), g);
case sync_group_handler::api::key:
return do_process<sync_group_handler>(std::move(ctx), g);
case create_topics_handler::api::key:
return do_process<create_topics_handler>(std::move(ctx), g);
case describe_configs_handler::api::key:
return do_process<describe_configs_handler>(std::move(ctx), g);
case alter_configs_handler::api::key:
return do_process<alter_configs_handler>(std::move(ctx), g);
case delete_topics_handler::api::key:
return do_process<delete_topics_handler>(std::move(ctx), g);
case describe_groups_handler::api::key:
return do_process<describe_groups_handler>(std::move(ctx), g);
case sasl_handshake_handler::api::key:
auto& key = ctx.header().key;

if (key == sasl_handshake_handler::api::key) {
return process_result_stages::single_stage(ctx.respond(
sasl_handshake_response(error_code::illegal_sasl_state, {})));
case sasl_authenticate_handler::api::key: {
}

if (key == sasl_authenticate_handler::api::key) {
sasl_authenticate_response_data data{
.error_code = error_code::illegal_sasl_state,
.error_message = "Authentication process already completed",
};
return process_result_stages::single_stage(
ctx.respond(sasl_authenticate_response(std::move(data))));
}
case init_producer_id_handler::api::key:
return do_process<init_producer_id_handler>(std::move(ctx), g);
case incremental_alter_configs_handler::api::key:
return do_process<incremental_alter_configs_handler>(std::move(ctx), g);
case delete_groups_handler::api::key:
return do_process<delete_groups_handler>(std::move(ctx), g);
case describe_acls_handler::api::key:
return do_process<describe_acls_handler>(std::move(ctx), g);
case describe_log_dirs_handler::api::key:
return do_process<describe_log_dirs_handler>(std::move(ctx), g);
case create_acls_handler::api::key:
return do_process<create_acls_handler>(std::move(ctx), g);
case delete_acls_handler::api::key:
return do_process<delete_acls_handler>(std::move(ctx), g);
case add_partitions_to_txn_handler::api::key:
return do_process<add_partitions_to_txn_handler>(std::move(ctx), g);
case txn_offset_commit_handler::api::key:
return do_process<txn_offset_commit_handler>(std::move(ctx), g);
case add_offsets_to_txn_handler::api::key:
return do_process<add_offsets_to_txn_handler>(std::move(ctx), g);
case end_txn_handler::api::key:
return do_process<end_txn_handler>(std::move(ctx), g);
case create_partitions_handler::api::key:
return do_process<create_partitions_handler>(std::move(ctx), g);
case offset_for_leader_epoch_handler::api::key:
return do_process<offset_for_leader_epoch_handler>(std::move(ctx), g);
};

if (auto handler = handler_for_key(key)) {
return process_generic(*handler, std::move(ctx), g);
}

throw std::runtime_error(
fmt::format("Unsupported API {}", ctx.header().key));
}
Expand Down

0 comments on commit 20cb02f

Please sign in to comment.