diff --git a/src/v/kafka/server/requests.cc b/src/v/kafka/server/requests.cc index 7752464c18adc..b20466c03c55f 100644 --- a/src/v/kafka/server/requests.cc +++ b/src/v/kafka/server/requests.cc @@ -7,8 +7,13 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 -#include "kafka/server/handlers/handlers.h" -#include "kafka/server/handlers/produce.h" +#include "kafka/protocol/schemata/api_versions_request.h" +#include "kafka/protocol/schemata/fetch_request.h" +#include "kafka/protocol/schemata/produce_request.h" +#include "kafka/server/handlers/api_versions.h" +#include "kafka/server/handlers/details/any_handler.h" +#include "kafka/server/handlers/sasl_authenticate.h" +#include "kafka/server/handlers/sasl_handshake.h" #include "kafka/server/request_context.h" #include "kafka/types.h" #include "utils/to_string.h" @@ -42,54 +47,6 @@ requires(KafkaApiHandler || KafkaApiTwoPhaseHandler) } }; -/** - * Dispatch API versions request without version checks. - * - * The version bounds checks are not applied to this request because the client - * does not yet know what versions this server supports. The api versions - * request is used by a client to query this information. - */ -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return process_result_stages::single_stage( - api_versions_handler::handle(std::move(ctx), g)); - } -}; -/** - * Requests processed in two stages - */ -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return produce_handler::handle(std::move(ctx), g); - } -}; - -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return offset_commit_handler::handle(std::move(ctx), g); - } -}; -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return join_group_handler::handle(std::move(ctx), g); - } -}; -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return sync_group_handler::handle(std::move(ctx), g); - } -}; - template requires(KafkaApiHandler || KafkaApiTwoPhaseHandler) process_result_stages @@ -107,6 +64,35 @@ requires(KafkaApiHandler || KafkaApiTwoPhaseHandler) return process_dispatch::process(std::move(ctx), g); } +process_result_stages process_generic( + any_handler handler, request_context&& ctx, ss::smp_service_group g) { + vlog( + klog.trace, + "[{}:{}] processing name:{}, key:{}, version:{} for {}", + ctx.connection()->client_host(), + ctx.connection()->client_port(), + handler->name(), + ctx.header().key, + ctx.header().version, + ctx.header().client_id.value_or(std::string_view("unset-client-id"))); + + // We do a version check for most API requests, but for api_version + // requests we skip them. We do not apply them for api_versions, + // because the client does not yet know what + // versions this server supports. The api versions request is used by a + // client to query this information. + if (ctx.header().key != api_versions_api::key && + (ctx.header().version < handler->min_supported() || + ctx.header().version > handler->max_supported())) { + throw std::runtime_error(fmt::format( + "Unsupported version {} for {} API", + ctx.header().version, + handler->name())); + } + + return handler->handle(std::move(ctx), g); +} + class kafka_authentication_exception : public std::runtime_error { public: explicit kafka_authentication_exception(const std::string& m) @@ -147,7 +133,7 @@ handle_auth_handshake(request_context&& ctx, ss::smp_service_group g) { static ss::future handle_auth_initial(request_context&& ctx, ss::smp_service_group g) { switch (ctx.header().key) { - case api_versions_handler::api::key: { + case api_versions_api::key: { auto r = api_versions_handler::handle_raw(ctx); if (r.data.error_code == error_code::none) { ctx.sasl().set_state(security::sasl_server::sasl_state::handshake); @@ -233,8 +219,8 @@ handle_auth(request_context&& ctx, ss::smp_service_group g) { // only track latency for push and fetch requests bool track_latency(api_key key) { switch (key) { - case fetch_handler::api::key: - case produce_handler::api::key: + case fetch_api::key: + case produce_api::key: return true; default: return false; @@ -260,47 +246,14 @@ process_request(request_context&& ctx, ss::smp_service_group g) { })); } - switch (ctx.header().key) { - case api_versions_handler::api::key: - return do_process(std::move(ctx), g); - case metadata_handler::api::key: - return do_process(std::move(ctx), g); - case list_groups_handler::api::key: - return do_process(std::move(ctx), g); - case find_coordinator_handler::api::key: - return do_process(std::move(ctx), g); - case offset_fetch_handler::api::key: - return do_process(std::move(ctx), g); - case produce_handler::api::key: - return do_process(std::move(ctx), g); - case list_offsets_handler::api::key: - return do_process(std::move(ctx), g); - case offset_commit_handler::api::key: - return do_process(std::move(ctx), g); - case fetch_handler::api::key: - return do_process(std::move(ctx), g); - case join_group_handler::api::key: - return do_process(std::move(ctx), g); - case heartbeat_handler::api::key: - return do_process(std::move(ctx), g); - case leave_group_handler::api::key: - return do_process(std::move(ctx), g); - case sync_group_handler::api::key: - return do_process(std::move(ctx), g); - case create_topics_handler::api::key: - return do_process(std::move(ctx), g); - case describe_configs_handler::api::key: - return do_process(std::move(ctx), g); - case alter_configs_handler::api::key: - return do_process(std::move(ctx), g); - case delete_topics_handler::api::key: - return do_process(std::move(ctx), g); - case describe_groups_handler::api::key: - return do_process(std::move(ctx), g); - case sasl_handshake_handler::api::key: + auto& key = ctx.header().key; + + if (key == sasl_handshake_handler::api::key) { return process_result_stages::single_stage(ctx.respond( sasl_handshake_response(error_code::illegal_sasl_state, {}))); - case sasl_authenticate_handler::api::key: { + } + + if (key == sasl_authenticate_handler::api::key) { sasl_authenticate_response_data data{ .error_code = error_code::illegal_sasl_state, .error_message = "Authentication process already completed", @@ -308,33 +261,11 @@ process_request(request_context&& ctx, ss::smp_service_group g) { return process_result_stages::single_stage( ctx.respond(sasl_authenticate_response(std::move(data)))); } - case init_producer_id_handler::api::key: - return do_process(std::move(ctx), g); - case incremental_alter_configs_handler::api::key: - return do_process(std::move(ctx), g); - case delete_groups_handler::api::key: - return do_process(std::move(ctx), g); - case describe_acls_handler::api::key: - return do_process(std::move(ctx), g); - case describe_log_dirs_handler::api::key: - return do_process(std::move(ctx), g); - case create_acls_handler::api::key: - return do_process(std::move(ctx), g); - case delete_acls_handler::api::key: - return do_process(std::move(ctx), g); - case add_partitions_to_txn_handler::api::key: - return do_process(std::move(ctx), g); - case txn_offset_commit_handler::api::key: - return do_process(std::move(ctx), g); - case add_offsets_to_txn_handler::api::key: - return do_process(std::move(ctx), g); - case end_txn_handler::api::key: - return do_process(std::move(ctx), g); - case create_partitions_handler::api::key: - return do_process(std::move(ctx), g); - case offset_for_leader_epoch_handler::api::key: - return do_process(std::move(ctx), g); - }; + + if (auto handler = handler_for_key(key)) { + return process_generic(*handler, std::move(ctx), g); + } + throw std::runtime_error( fmt::format("Unsupported API {}", ctx.header().key)); }