diff --git a/src/v/kafka/CMakeLists.txt b/src/v/kafka/CMakeLists.txt index 6e8da95e85af..067fc72b9cd0 100644 --- a/src/v/kafka/CMakeLists.txt +++ b/src/v/kafka/CMakeLists.txt @@ -34,6 +34,7 @@ set(handlers_srcs server/handlers/delete_acls.cc server/handlers/create_partitions.cc server/handlers/offset_for_leader_epoch.cc + server/handlers/handler_interface.cc server/handlers/topics/types.cc server/handlers/topics/topic_utils.cc ) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 4847f601013b..0b263d09c8d6 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -13,16 +13,19 @@ #include "bytes/iobuf.h" #include "config/configuration.h" #include "kafka/protocol/sasl_authenticate.h" +#include "kafka/server/handlers/handler_interface.h" #include "kafka/server/protocol.h" #include "kafka/server/protocol_utils.h" #include "kafka/server/quota_manager.h" #include "kafka/server/request_context.h" +#include "kafka/server/response.h" #include "security/exceptions.h" #include "units.h" #include "vlog.h" #include #include +#include #include #include @@ -182,8 +185,9 @@ ss::future<> connection_context::handle_auth_v0(const size_t size) { }, std::move(request_buf), 0s); + auto sres = session_resources{}; auto resp = co_await kafka::process_request( - std::move(ctx), _proto.smp_group()) + std::move(ctx), _proto.smp_group(), sres) .response; auto data = std::move(*resp).release(); response.decode(std::move(data), version); @@ -213,8 +217,7 @@ bool connection_context::is_finished_parsing() const { return _rs.conn->input().eof() || _rs.abort_requested(); } -ss::future -connection_context::throttle_request( +ss::future connection_context::throttle_request( const request_header& hdr, size_t request_size) { // update the throughput tracker for this client using the // size of the current request and return any computed delay @@ -236,8 +239,9 @@ connection_context::throttle_request( } auto track = track_latency(hdr.key); return fut - .then( - [this, request_size] { return reserve_request_units(request_size); }) + .then([this, key = hdr.key, request_size] { + return reserve_request_units(key, request_size); + }) .then([this, delay, track, tracker = std::move(tracker)]( ss::semaphore_units<> units) mutable { return server().get_request_unit().then( @@ -262,15 +266,21 @@ connection_context::throttle_request( } ss::future> -connection_context::reserve_request_units(size_t size) { - // Allow for extra copies and bookkeeping - auto mem_estimate = size * 2 + 8000; // NOLINT - if (mem_estimate >= (size_t)std::numeric_limits::max()) { +connection_context::reserve_request_units(api_key key, size_t size) { + // Defer to the handler for the request type for the memory estimate, but + // if the request isn't found, use the default estimate (although in that + // case the request is likely for an API we don't support or malformed, so + // it is likely to fail shortly anyway). + auto handler = handler_for_key(key); + auto mem_estimate = handler ? (*handler)->memory_estimate(size, *this) + : default_memory_estimate(size); + if (unlikely(mem_estimate >= (size_t)std::numeric_limits::max())) { // TODO: Create error response using the specific API? throw std::runtime_error(fmt::format( - "request too large > 1GB (size: {}; estimate: {})", + "request too large > 1GB (size: {}, estimate: {}, API: {})", size, - mem_estimate)); + mem_estimate, + handler ? (*handler)->name() : "")); } auto fut = ss::get_units(_rs.memory(), mem_estimate); if (_rs.memory().waiters()) { @@ -282,11 +292,15 @@ connection_context::reserve_request_units(size_t size) { ss::future<> connection_context::dispatch_method_once(request_header hdr, size_t size) { return throttle_request(hdr, size).then([this, hdr = std::move(hdr), size]( - session_resources sres) mutable { + session_resources + sres_in) mutable { if (_rs.abort_requested()) { // protect against shutdown behavior return ss::make_ready_future<>(); } + + auto sres = ss::make_lw_shared(std::move(sres_in)); + auto remaining = size - request_header_size - hdr.client_id_buffer.size() - hdr.tags_size_bytes; return read_iobuf_exactly(_rs.conn->input(), remaining) @@ -298,7 +312,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { } auto self = shared_from_this(); auto rctx = request_context( - self, std::move(hdr), std::move(buf), sres.backpressure_delay); + self, std::move(hdr), std::move(buf), sres->backpressure_delay); /* * we process requests in order since all subsequent requests * are dependent on authentication having completed. @@ -323,7 +337,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { const sequence_id seq = _seq_idx; _seq_idx = _seq_idx + sequence_id(1); auto res = kafka::process_request( - std::move(rctx), _proto.smp_group()); + std::move(rctx), _proto.smp_group(), *sres); /** * first stage processed in a foreground. */ @@ -333,7 +347,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { seq, correlation, self, - s = std::move(sres)](ss::future<> d) mutable { + sres = std::move(sres)](ss::future<> d) mutable { /* * if the dispatch/first stage failed, then we need to * need to consume the second stage since it might be @@ -362,13 +376,22 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { ssx::background = ssx::spawn_with_gate_then( _rs.conn_gate(), - [this, f = std::move(f), seq, correlation]() mutable { - return f.then([this, seq, correlation]( - response_ptr r) mutable { - r->set_correlation(correlation); - _responses.insert({seq, std::move(r)}); - return process_next_response(); - }); + [this, + f = std::move(f), + sres = std::move(sres), + seq, + correlation]() mutable { + return f.then( + [this, + sres = std::move(sres), + seq, + correlation](response_ptr r) mutable { + r->set_correlation(correlation); + response_and_resources randr{ + std::move(r), std::move(sres)}; + _responses.insert({seq, std::move(randr)}); + return maybe_process_responses(); + }); }) .handle_exception([self](std::exception_ptr e) { // ssx::spawn_with_gate already caught @@ -397,8 +420,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { self->_rs.probe().service_error(); self->_rs.conn->shutdown_input(); - }) - .finally([s = std::move(s), self] {}); + }); return d; }) .handle_exception([self](std::exception_ptr e) { @@ -410,7 +432,20 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { }); } -ss::future<> connection_context::process_next_response() { +/** + * This method processes as many responses as possible, in request order. Since + * we proces the second stage asynchronously within a given connection, reponses + * may become ready out of order, but Kafka clients expect responses exactly in + * request order. + * + * The _responses queue handles that: responses are enqueued there in completion + * order, but only sent to the client in response order. So this method, called + * after every response is ready, may end up sending zero, one or more requests, + * depending on the completion order. + * + * @return ss::future<> + */ +ss::future<> connection_context::maybe_process_responses() { return ss::repeat([this]() mutable { auto it = _responses.find(_next_response); if (it == _responses.end()) { @@ -420,20 +455,25 @@ ss::future<> connection_context::process_next_response() { // found one; increment counter _next_response = _next_response + sequence_id(1); - auto r = std::move(it->second); + auto resp_and_res = std::move(it->second); + _responses.erase(it); - if (r->is_noop()) { + if (resp_and_res.response->is_noop()) { return ss::make_ready_future( ss::stop_iteration::no); } - auto msg = response_as_scattered(std::move(r)); + auto msg = response_as_scattered(std::move(resp_and_res.response)); try { - return _rs.conn->write(std::move(msg)).then([] { - return ss::make_ready_future( - ss::stop_iteration::no); - }); + return _rs.conn->write(std::move(msg)) + .then([] { + return ss::make_ready_future( + ss::stop_iteration::no); + }) + // release the resources only once it has been written to the + // connection. + .finally([resources = std::move(resp_and_res.resources)] {}); } catch (...) { vlog( klog.debug, diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 4276f8f9da65..10eb4d697756 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -11,6 +11,7 @@ #pragma once #include "kafka/server/protocol.h" #include "kafka/server/response.h" +#include "kafka/types.h" #include "net/server.h" #include "seastarx.h" #include "security/acl.h" @@ -37,6 +38,41 @@ using authz_quiet = ss::bool_class; struct request_header; class request_context; +// used to track number of pending requests +class request_tracker { +public: + explicit request_tracker(net::server_probe& probe) noexcept + : _probe(probe) { + _probe.request_received(); + } + request_tracker(const request_tracker&) = delete; + request_tracker(request_tracker&&) = delete; + request_tracker& operator=(const request_tracker&) = delete; + request_tracker& operator=(request_tracker&&) = delete; + + ~request_tracker() noexcept { _probe.request_completed(); } + +private: + net::server_probe& _probe; +}; + +// Used to hold resources associated with a given request until +// the response has been send, as well as to track some statistics +// about the request. +// +// The resources in particular should be not be destroyed until +// the request is complete (e.g., all the information written to +// the socket so that no userspace buffers remain). +struct session_resources { + using pointer = ss::lw_shared_ptr; + + ss::lowres_clock::duration backpressure_delay; + ss::semaphore_units<> memlocks; + ss::semaphore_units<> queue_units; + std::unique_ptr method_latency; + std::unique_ptr tracker; +}; + class connection_context final : public ss::enable_lw_shared_from_this { public: @@ -131,49 +167,52 @@ class connection_context final } private: - // used to track number of pending requests - class request_tracker { - public: - explicit request_tracker(net::server_probe& probe) noexcept - : _probe(probe) { - _probe.request_received(); - } - request_tracker(const request_tracker&) = delete; - request_tracker(request_tracker&&) = delete; - request_tracker& operator=(const request_tracker&) = delete; - request_tracker& operator=(request_tracker&&) = delete; - - ~request_tracker() noexcept { _probe.request_completed(); } - - private: - net::server_probe& _probe; - }; - // used to pass around some internal state - struct session_resources { - ss::lowres_clock::duration backpressure_delay; - ss::semaphore_units<> memlocks; - ss::semaphore_units<> queue_units; - std::unique_ptr method_latency; - std::unique_ptr tracker; - }; - - /// called by throttle_request - ss::future> reserve_request_units(size_t size); - - /// apply correct backpressure sequence + // Reserve units from memory from the memory semaphore in proportion + // to the number of bytes the request procesisng is expected to + // take. + ss::future> + reserve_request_units(api_key key, size_t size); + + // Apply backpressure sequence, where the request processing may be + // delayed for various reasons, including throttling but also because + // too few server resources are available to accomodate the request + // currently. + // When the returned future resolves, the throttling period is over and + // the associated resouces have been obtained and are tracked by the + // contained session_resources object. ss::future throttle_request(const request_header&, size_t sz); ss::future<> handle_mtls_auth(); ss::future<> dispatch_method_once(request_header, size_t sz); - ss::future<> process_next_response(); + + /** + * Process zero or more ready responses in request order. + * + * The future<> returned by this method resolves when all ready *and* + * in-order responses have been processed, which is not the same as all + * ready responses. In particular, responses which are ready may not be + * processed if there are earlier (lower sequence number) responses which + * are not yet ready: they will be processed by a future invocation. + * + * @return ss::future<> a future which as described above. + */ + ss::future<> maybe_process_responses(); ss::future<> do_process(request_context); ss::future<> handle_auth_v0(size_t); private: + /** + * Bundles together a response and its associated resources. + */ + struct response_and_resources { + response_ptr response; + session_resources::pointer resources; + }; + using sequence_id = named_type; - using map_t = absl::flat_hash_map; + using map_t = absl::flat_hash_map; class ctx_log { public: diff --git a/src/v/kafka/server/flex_versions.cc b/src/v/kafka/server/flex_versions.cc index 1a3565b41a71..b0e6bd2cbea2 100644 --- a/src/v/kafka/server/flex_versions.cc +++ b/src/v/kafka/server/flex_versions.cc @@ -17,13 +17,6 @@ namespace kafka { /// requests will map to a value of api_key(-2) static constexpr api_version invalid_api = api_version(-2); -template -static constexpr size_t max_api_key(type_list) { - /// Black magic here is an overload of std::max() that takes an - /// std::initializer_list - return std::max({RequestTypes::api::key()...}); -} - template static constexpr auto get_flexible_request_min_versions_list(type_list r) { diff --git a/src/v/kafka/server/fwd.h b/src/v/kafka/server/fwd.h index a893be15ed16..e034d9f65084 100644 --- a/src/v/kafka/server/fwd.h +++ b/src/v/kafka/server/fwd.h @@ -13,13 +13,15 @@ namespace kafka { +// sorted +class connection_context; class coordinator_ntp_mapper; class fetch_session_cache; class group_manager; class group_router; +class quota_manager; +class request_context; class rm_group_frontend; class rm_group_proxy_impl; -class request_context; -class quota_manager; } // namespace kafka diff --git a/src/v/kafka/server/handlers/add_offsets_to_txn.h b/src/v/kafka/server/handlers/add_offsets_to_txn.h index e4b1669b970a..fbc9fc1324e2 100644 --- a/src/v/kafka/server/handlers/add_offsets_to_txn.h +++ b/src/v/kafka/server/handlers/add_offsets_to_txn.h @@ -14,6 +14,7 @@ namespace kafka { -using add_offsets_to_txn_handler = handler; +using add_offsets_to_txn_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/add_partitions_to_txn.h b/src/v/kafka/server/handlers/add_partitions_to_txn.h index 5b0f2523b4f3..aee85586deee 100644 --- a/src/v/kafka/server/handlers/add_partitions_to_txn.h +++ b/src/v/kafka/server/handlers/add_partitions_to_txn.h @@ -14,6 +14,7 @@ namespace kafka { -using add_partitions_to_txn_handler = handler; +using add_partitions_to_txn_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/alter_configs.h b/src/v/kafka/server/handlers/alter_configs.h index 7edcf2f987ad..d61eb8c3f472 100644 --- a/src/v/kafka/server/handlers/alter_configs.h +++ b/src/v/kafka/server/handlers/alter_configs.h @@ -14,6 +14,6 @@ namespace kafka { -using alter_configs_handler = handler; +using alter_configs_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/api_versions.h b/src/v/kafka/server/handlers/api_versions.h index c131036d3f2a..7ab8921561ea 100644 --- a/src/v/kafka/server/handlers/api_versions.h +++ b/src/v/kafka/server/handlers/api_versions.h @@ -14,7 +14,8 @@ namespace kafka { -struct api_versions_handler : public handler { +struct api_versions_handler + : public single_stage_handler { static constexpr api_version min_flexible = api_version(3); static ss::future diff --git a/src/v/kafka/server/handlers/create_acls.h b/src/v/kafka/server/handlers/create_acls.h index e9719121d6df..d9a6161b71a1 100644 --- a/src/v/kafka/server/handlers/create_acls.h +++ b/src/v/kafka/server/handlers/create_acls.h @@ -14,6 +14,6 @@ namespace kafka { -using create_acls_handler = handler; +using create_acls_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/create_partitions.h b/src/v/kafka/server/handlers/create_partitions.h index 4102398e8d8b..16b1dcc9de27 100644 --- a/src/v/kafka/server/handlers/create_partitions.h +++ b/src/v/kafka/server/handlers/create_partitions.h @@ -14,6 +14,7 @@ namespace kafka { -using create_partitions_handler = handler; +using create_partitions_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/create_topics.h b/src/v/kafka/server/handlers/create_topics.h index e3c395858456..2ab493358dd0 100644 --- a/src/v/kafka/server/handlers/create_topics.h +++ b/src/v/kafka/server/handlers/create_topics.h @@ -14,6 +14,6 @@ namespace kafka { -using create_topics_handler = handler; +using create_topics_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/delete_acls.h b/src/v/kafka/server/handlers/delete_acls.h index d19ab798cf46..8e45cc5679fa 100644 --- a/src/v/kafka/server/handlers/delete_acls.h +++ b/src/v/kafka/server/handlers/delete_acls.h @@ -14,6 +14,6 @@ namespace kafka { -using delete_acls_handler = handler; +using delete_acls_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/delete_groups.h b/src/v/kafka/server/handlers/delete_groups.h index 85d01eb8c29b..d9858140b83a 100644 --- a/src/v/kafka/server/handlers/delete_groups.h +++ b/src/v/kafka/server/handlers/delete_groups.h @@ -14,6 +14,6 @@ namespace kafka { -using delete_groups_handler = handler; +using delete_groups_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/delete_topics.h b/src/v/kafka/server/handlers/delete_topics.h index 7dd8d66d2f15..e9b6606cbe00 100644 --- a/src/v/kafka/server/handlers/delete_topics.h +++ b/src/v/kafka/server/handlers/delete_topics.h @@ -14,6 +14,6 @@ namespace kafka { -using delete_topics_handler = handler; +using delete_topics_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/describe_acls.h b/src/v/kafka/server/handlers/describe_acls.h index 3377ac8a2858..996c6fa230aa 100644 --- a/src/v/kafka/server/handlers/describe_acls.h +++ b/src/v/kafka/server/handlers/describe_acls.h @@ -14,6 +14,6 @@ namespace kafka { -using describe_acls_handler = handler; +using describe_acls_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/describe_configs.h b/src/v/kafka/server/handlers/describe_configs.h index 27f7235dc762..97199e628b55 100644 --- a/src/v/kafka/server/handlers/describe_configs.h +++ b/src/v/kafka/server/handlers/describe_configs.h @@ -14,6 +14,7 @@ namespace kafka { -using describe_configs_handler = handler; +using describe_configs_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/describe_groups.h b/src/v/kafka/server/handlers/describe_groups.h index 6f804548b893..b62004ae9fa6 100644 --- a/src/v/kafka/server/handlers/describe_groups.h +++ b/src/v/kafka/server/handlers/describe_groups.h @@ -14,6 +14,6 @@ namespace kafka { -using describe_groups_handler = handler; +using describe_groups_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/describe_log_dirs.h b/src/v/kafka/server/handlers/describe_log_dirs.h index 13d11c440ad7..1731e88621a9 100644 --- a/src/v/kafka/server/handlers/describe_log_dirs.h +++ b/src/v/kafka/server/handlers/describe_log_dirs.h @@ -14,6 +14,7 @@ namespace kafka { -using describe_log_dirs_handler = handler; +using describe_log_dirs_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/end_txn.h b/src/v/kafka/server/handlers/end_txn.h index 72362cb00fed..cd80b0d41c25 100644 --- a/src/v/kafka/server/handlers/end_txn.h +++ b/src/v/kafka/server/handlers/end_txn.h @@ -14,6 +14,6 @@ namespace kafka { -using end_txn_handler = handler; +using end_txn_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/fetch.h b/src/v/kafka/server/handlers/fetch.h index d43b4e9b0b33..8f44d89f1451 100644 --- a/src/v/kafka/server/handlers/fetch.h +++ b/src/v/kafka/server/handlers/fetch.h @@ -17,7 +17,7 @@ namespace kafka { -using fetch_handler = handler; +using fetch_handler = single_stage_handler; /* * Fetch operation context diff --git a/src/v/kafka/server/handlers/find_coordinator.h b/src/v/kafka/server/handlers/find_coordinator.h index 8e3d83bfe4d6..1f5ff07fb97f 100644 --- a/src/v/kafka/server/handlers/find_coordinator.h +++ b/src/v/kafka/server/handlers/find_coordinator.h @@ -14,6 +14,7 @@ namespace kafka { -using find_coordinator_handler = handler; +using find_coordinator_handler + = single_stage_handler; } // namespace kafka diff --git a/src/v/kafka/server/handlers/handler.h b/src/v/kafka/server/handlers/handler.h index a4b87a754586..6e294af073c5 100644 --- a/src/v/kafka/server/handlers/handler.h +++ b/src/v/kafka/server/handlers/handler.h @@ -10,6 +10,7 @@ */ #pragma once #include "kafka/protocol/types.h" +#include "kafka/server/fwd.h" #include "kafka/server/request_context.h" #include "kafka/server/response.h" #include "kafka/types.h" @@ -18,18 +19,80 @@ namespace kafka { +using memory_estimate_fn = size_t(size_t, connection_context&); + +constexpr size_t +default_estimate_adaptor(size_t request_size, connection_context&) { + return default_memory_estimate(request_size); +} + +/** + * Handlers are generally specializations of this template, via one of the + * two aliases (handler or two_phase_hander) declared below, though it is + * not strictly necessary (only conforming to one of the two KafkaApi* + * concepts is needed). + * + * The benefit of this template is that it takes care of the most of the + * handler boilerplate. + */ template< typename RequestApi, api_version::type MinSupported, - api_version::type MaxSupported> -struct handler { + api_version::type MaxSupported, + typename HandleRetType, + memory_estimate_fn MemEstimator> +struct handler_template { using api = RequestApi; static constexpr api_version min_supported = api_version(MinSupported); static constexpr api_version max_supported = api_version(MaxSupported); - static ss::future - handle(request_context, ss::smp_service_group); + + static HandleRetType handle(request_context, ss::smp_service_group); + + /** + * See handler_interface::memory_estimate for a description of this + * function. + */ + static size_t + memory_estimate(size_t request_size, connection_context& conn_ctx) { + return MemEstimator(request_size, conn_ctx); + } }; +/** + * A single-stage handler implements the entire request handling in the initial + * stage which occurs before any subsequent request is processed. + */ +template< + typename RequestApi, + api_version::type MinSupported, + api_version::type MaxSupported, + memory_estimate_fn MemEstimator = default_estimate_adaptor> +using single_stage_handler = handler_template< + RequestApi, + MinSupported, + MaxSupported, + ss::future, + MemEstimator>; + +/** + * A two-stage handler has an initial stage which happens before any other + * request can start processing (as in a single-stage handler) but then also has + * a second stage which is processed in the background allowing other requests + * on the same connection to start their handler. Responses are still sent in + * order, but processing is out-of-order. + */ +template< + typename RequestApi, + api_version::type MinSupported, + api_version::type MaxSupported, + memory_estimate_fn MemEstimator = default_estimate_adaptor> +using two_phase_handler = handler_template< + RequestApi, + MinSupported, + MaxSupported, + process_result_stages, + MemEstimator>; + template concept KafkaApiHandler = KafkaApi && requires( T h, request_context&& ctx, ss::smp_service_group g) { @@ -45,4 +108,7 @@ concept KafkaApiTwoPhaseHandler = KafkaApi && requires( { T::handle(std::move(ctx), g) } -> std::same_as; }; +template +concept KafkaApiHandlerAny = KafkaApiHandler || KafkaApiTwoPhaseHandler; + } // namespace kafka diff --git a/src/v/kafka/server/handlers/handler_interface.cc b/src/v/kafka/server/handlers/handler_interface.cc new file mode 100644 index 000000000000..44593b0f96e4 --- /dev/null +++ b/src/v/kafka/server/handlers/handler_interface.cc @@ -0,0 +1,141 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#include "kafka/server/handlers/handler_interface.h" + +#include "kafka/server/handlers/handlers.h" +#include "kafka/server/handlers/produce.h" +#include "kafka/server/response.h" +#include "kafka/types.h" + +#include + +namespace kafka { + +/** + * @brief Packages together basic information common to every handler. + */ +struct handler_info { + handler_info( + api_key key, + const char* name, + api_version min_api, + api_version max_api, + memory_estimate_fn* mem_estimate) noexcept + : _key(key) + , _name(name) + , _min_api(min_api) + , _max_api(max_api) + , _mem_estimate(mem_estimate) {} + + api_key _key; + const char* _name; + api_version _min_api, _max_api; + memory_estimate_fn* _mem_estimate; +}; + +/** + * @brief Creates a type-erased handler implementation given info and a handle + * method. + * + * There are only two variants of this handler, for one and two pass + * implementations. + * This keeps the generated code duplication to a minimum, compared to + * templating this on the handler type. + * + * @tparam is_two_pass true if the handler is two-pass + */ +template +struct handler_base final : public handler_interface { + using single_pass_handler + = ss::future(request_context, ss::smp_service_group); + using two_pass_handler + = process_result_stages(request_context, ss::smp_service_group); + using fn_type + = std::conditional_t; + + handler_base(const handler_info& info, fn_type* handle_fn) noexcept + : _info(info) + , _handle_fn(handle_fn) {} + + api_version min_supported() const override { return _info._min_api; } + api_version max_supported() const override { return _info._max_api; } + + api_key key() const override { return _info._key; } + const char* name() const override { return _info._name; } + + size_t memory_estimate( + size_t request_size, connection_context& conn_ctx) const override { + return _info._mem_estimate(request_size, conn_ctx); + } + /** + * Only handle varies with one or two pass, since one pass handlers + * must pass through single_stage() to covert them to two-pass. + */ + process_result_stages + handle(request_context&& rc, ss::smp_service_group g) const override { + if constexpr (is_two_pass) { + return _handle_fn(std::move(rc), g); + } else { + return process_result_stages::single_stage( + _handle_fn(std::move(rc), g)); + } + } + +private: + handler_info _info; + fn_type* _handle_fn; +}; + +/** + * @brief Instance holder for the handler_base. + * + * Given a handler type H, exposes a static instance of the assoicated handler + * base object. + * + * @tparam H the handler type. + */ +template +struct handler_holder { + static const inline handler_base> instance{ + handler_info{ + H::api::key, + H::api::name, + H::min_supported, + H::max_supported, + H::memory_estimate}, + H::handle}; +}; + +template +constexpr auto make_lut(type_list) { + constexpr int max_index = std::max({Ts::api::key...}); + static_assert(max_index < sizeof...(Ts) * 10, "LUT is too sparse"); + + std::array lut{}; + ((lut[Ts::api::key] = &handler_holder::instance), ...); + + return lut; +} + +std::optional handler_for_key(kafka::api_key key) noexcept { + static constexpr auto lut = make_lut(request_types{}); + if (key >= (short)0 && key < (short)lut.size()) { + // We have already checked the bounds above so it is safe to use [] + // instead of at() + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-constant-array-index) + if (auto handler = lut[key]) { + return handler; + } + } + return std::nullopt; +} + +} // namespace kafka diff --git a/src/v/kafka/server/handlers/handler_interface.h b/src/v/kafka/server/handlers/handler_interface.h new file mode 100644 index 000000000000..dc4857e9f993 --- /dev/null +++ b/src/v/kafka/server/handlers/handler_interface.h @@ -0,0 +1,111 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#pragma once +#include "kafka/server/fwd.h" +#include "kafka/server/response.h" +#include "kafka/types.h" + +namespace kafka { +/** + * @brief Runtime polymorphic handler type. + * + * Allows access to all kafka request handling implementations though a + * type erased interface. This avoids the need to bring every handler + * type into scope and make everything that touches the handler a template + * function on the handler type. + * + */ +struct handler_interface { + /** + * @brief The minimum supported API version, inclusive. + */ + virtual api_version min_supported() const = 0; + + /** + * @brief The maximum supported API version, inclusive. + */ + virtual api_version max_supported() const = 0; + + /** + * @brief The name of the API method. + */ + virtual const char* name() const = 0; + + /** + * @brief The API key associated with the method. + */ + virtual api_key key() const = 0; + + /** + * @brief Estimates the memory used to process the request. + * + * Returns an esimate of the memory needed to process a request. This is + * used to block the request until sufficient memory is available using the + * "memory units" semaphore. Ideally this should be a conservative request + * (i.e., a possible overestimate in cases where the memory use may vary + * significantly) as the result of a too-small estimate may be an + * out-of-memory condition, while a too-large estimate will "merely" reduce + * performance. + * + * Unfortunately, this estimate happens early in the decoding process, after + * only the request size and header has been read, so handlers don't have + * as much information as they may like to make this decision. The + * connection_context for the associated connection is passed to give access + * to global state which may be useful in making the estimate. + */ + virtual size_t memory_estimate( + size_t request_size, connection_context& conn_ctx) const = 0; + + /** + * @brief Handles the request. + * + * Invokes the request handler with the given request context + * (which will be moved from) and smp_service_groups. + * + * The result stages objects contains futures for both the initial + * dispatch phase, and the find response. For API methods which + * are implemented a single phase, the same type is returned, but + * the response future will complete as soon as the dispatch one does. + * + * @return process_result_stages representing the future completion of + * the handler. + */ + virtual process_result_stages + handle(request_context&&, ss::smp_service_group) const = 0; + + virtual ~handler_interface() = default; +}; + +/** + * @brief Pointer to a handler. + * + * Most code will use handler objects, which are simply pointers + * to handlers, generally const objects with static storage duration + * obtained from handler_for_key. + */ +using handler = const handler_interface*; + +/** + * @brief Return a handler for the given key, if any. + * + * Returns a pointer to a constant singleton handler for the given + * key, or an empty optional if no such handler exists. The contained + * any_hanlder is guaranteed to be non-null if the optional as a value. + * + * This method looks up the handler in a table populated by all handlers + * in kafka::request_types. + * + * @param key the API key for the handler + * @return std::optional the handler, if any + */ +std::optional handler_for_key(api_key key) noexcept; + +} // namespace kafka diff --git a/src/v/kafka/server/handlers/handlers.h b/src/v/kafka/server/handlers/handlers.h index 80cb2d22ebfb..d98c4d0885f0 100644 --- a/src/v/kafka/server/handlers/handlers.h +++ b/src/v/kafka/server/handlers/handlers.h @@ -87,4 +87,11 @@ using request_types = make_request_types< end_txn_handler, create_partitions_handler, offset_for_leader_epoch_handler>; + +template +static constexpr size_t max_api_key(type_list) { + /// Black magic here is an overload of std::max() that takes an + /// std::initializer_list + return std::max({RequestTypes::api::key()...}); +} } // namespace kafka diff --git a/src/v/kafka/server/handlers/heartbeat.h b/src/v/kafka/server/handlers/heartbeat.h index 27a4c22b1cbd..437279760a54 100644 --- a/src/v/kafka/server/handlers/heartbeat.h +++ b/src/v/kafka/server/handlers/heartbeat.h @@ -14,6 +14,6 @@ namespace kafka { -using heartbeat_handler = handler; +using heartbeat_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.h b/src/v/kafka/server/handlers/incremental_alter_configs.h index 8e902f5da6b3..9dbfde6be92e 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.h +++ b/src/v/kafka/server/handlers/incremental_alter_configs.h @@ -15,6 +15,6 @@ namespace kafka { using incremental_alter_configs_handler - = handler; + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/init_producer_id.h b/src/v/kafka/server/handlers/init_producer_id.h index 5068f4325684..be4c7d0a080a 100644 --- a/src/v/kafka/server/handlers/init_producer_id.h +++ b/src/v/kafka/server/handlers/init_producer_id.h @@ -14,6 +14,7 @@ namespace kafka { -using init_producer_id_handler = handler; +using init_producer_id_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/join_group.cc b/src/v/kafka/server/handlers/join_group.cc index f7bc53b80a4e..57fe29ca9362 100644 --- a/src/v/kafka/server/handlers/join_group.cc +++ b/src/v/kafka/server/handlers/join_group.cc @@ -35,6 +35,7 @@ static void decode_request(request_context& ctx, join_group_request& req) { fmt::format("{}", ctx.connection()->client_host())); } +template<> process_result_stages join_group_handler::handle( request_context ctx, [[maybe_unused]] ss::smp_service_group g) { join_group_request request; diff --git a/src/v/kafka/server/handlers/join_group.h b/src/v/kafka/server/handlers/join_group.h index 1d3ec508350a..1830badc2f27 100644 --- a/src/v/kafka/server/handlers/join_group.h +++ b/src/v/kafka/server/handlers/join_group.h @@ -14,10 +14,6 @@ namespace kafka { -struct join_group_handler { - using api = join_group_api; - static constexpr api_version min_supported = api_version(0); - static constexpr api_version max_supported = api_version(5); - static process_result_stages handle(request_context, ss::smp_service_group); -}; +using join_group_handler = two_phase_handler; + } // namespace kafka diff --git a/src/v/kafka/server/handlers/leave_group.h b/src/v/kafka/server/handlers/leave_group.h index a959c6dc4ddd..61adf1450dec 100644 --- a/src/v/kafka/server/handlers/leave_group.h +++ b/src/v/kafka/server/handlers/leave_group.h @@ -14,6 +14,6 @@ namespace kafka { -using leave_group_handler = handler; +using leave_group_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/list_groups.h b/src/v/kafka/server/handlers/list_groups.h index efe1657ae082..b345f794a0e9 100644 --- a/src/v/kafka/server/handlers/list_groups.h +++ b/src/v/kafka/server/handlers/list_groups.h @@ -14,6 +14,6 @@ namespace kafka { -using list_groups_handler = handler; +using list_groups_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/list_offsets.h b/src/v/kafka/server/handlers/list_offsets.h index 896d0344b42a..bb88af1b1a7e 100644 --- a/src/v/kafka/server/handlers/list_offsets.h +++ b/src/v/kafka/server/handlers/list_offsets.h @@ -14,6 +14,6 @@ namespace kafka { -using list_offsets_handler = handler; +using list_offsets_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index 43813d0e5931..2667b11176f5 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -14,10 +14,13 @@ #include "cluster/types.h" #include "config/configuration.h" #include "config/node_config.h" +#include "kafka/protocol/schemata/metadata_response.h" #include "kafka/server/errors.h" +#include "kafka/server/fwd.h" #include "kafka/server/handlers/details/leader_epoch.h" #include "kafka/server/handlers/details/security.h" #include "kafka/server/handlers/topics/topic_utils.h" +#include "kafka/server/response.h" #include "kafka/types.h" #include "likely.h" #include "model/metadata.h" @@ -422,4 +425,80 @@ ss::future metadata_handler::handle( co_return co_await ctx.respond(std::move(reply)); } +size_t +metadata_memory_estimator(size_t request_size, connection_context& conn_ctx) { + // We cannot make a precise estimate of the size of a metadata response by + // examining only the size of the request (nor even by examining the entire + // request) since the response depends on the number of partitions in the + // cluster. Instead, we return a conservative estimate based on the current + // number of topics & partitions in the cluster. + + // Essentially we need to estimate the size taken by a "maximum size" + // metadata_response_data response. The maximum size is when metadata for + // all topics is returned, which is also a common case in practice. This + // involves calculating the size for each topic's portion of the response, + // since the size varies both based on the number of partitions and the + // replica count. + + // We start with a base estimate of 10K and then proceed to ignore + // everything other than the topic/partition part of the response, since + // that's what takes space in large responses and we assume the remaining + // part of the response (the broker list being the second largest part) will + // fit in this 10000k slush fund. + size_t size_estimate = 10000; + + auto& md_cache = conn_ctx.server().metadata_cache(); + + // The size will vary with the number of brokers, though this effect is + // probably small if there are large numbers of partitions + + // This covers the variable part of the broker response, i.e., the broker + // hostname + rack We just hope these are less than this amount, because we + // don't want to execute the relatively complex logic to guess the listener + // just for the size estimate. + constexpr size_t extra_bytes_per_broker = 200; + size_estimate + += md_cache.all_brokers().size() + * (sizeof(metadata_response_broker) + extra_bytes_per_broker); + + for (auto& [tp_ns, topic_metadata] : md_cache.all_topics_metadata()) { + // metadata_response_topic + size_estimate += sizeof(kafka::metadata_response_topic); + size_estimate += tp_ns.tp().size(); + + using partition = kafka::metadata_response_partition; + + // Base number of bytes needed to represent each partition, ignoring the + // variable part attributable to the replica count, we just take as the + // size of the partition response structure. + constexpr size_t bytes_per_partition = sizeof(partition); + + // Then, we need the number of additional bytes per replica, per + // partition, associated with storing the replica list in + // metadata_response_partition::replicas/isr_nodes, which we take to + // be the size of the elements in those lists (4 bytes each). + constexpr size_t bytes_per_replica = sizeof(partition::replica_nodes[0]) + + sizeof(partition::isr_nodes[0]); + + // The actual partition and replica count for this topic. + int32_t pcount = topic_metadata.get_configuration().partition_count; + int32_t rcount = topic_metadata.get_configuration().replication_factor; + + size_estimate += pcount + * (bytes_per_partition + bytes_per_replica * rcount); + } + + // Finally, we double the estimate, because the highwater mark for memory + // use comes when the in-memory structures (metadata_response_data and + // subobjects) exist on the heap and they are encoded into the reponse, + // which will also exist on the heap. The calculation above handles the + // first size, and the encoded response ends up being very similar in size, + // so we double the estimate to account for both. + size_estimate *= 2; + + // We still add on the default_estimate to handle the size of the request + // itself and miscellaneous other procesing (this is a small adjustment, + // generally ~8000 bytes). + return default_memory_estimate(request_size) + size_estimate; +} } // namespace kafka diff --git a/src/v/kafka/server/handlers/metadata.h b/src/v/kafka/server/handlers/metadata.h index 89445b193fd0..bd0e78bb7003 100644 --- a/src/v/kafka/server/handlers/metadata.h +++ b/src/v/kafka/server/handlers/metadata.h @@ -14,6 +14,17 @@ namespace kafka { -using metadata_handler = handler; +/** + * Estimate the size of a metadata request. + * + * Metadata requests are generally very small (a request for *all* metadata + * about a cluster is less than 30 bytes) but the response may be very large, so + * the default estimator is unsuitable. See the implementation for further + * notes. + */ +memory_estimate_fn metadata_memory_estimator; + +using metadata_handler + = single_stage_handler; -} +} // namespace kafka diff --git a/src/v/kafka/server/handlers/offset_commit.cc b/src/v/kafka/server/handlers/offset_commit.cc index ddaa68616aab..7c1c4d10976a 100644 --- a/src/v/kafka/server/handlers/offset_commit.cc +++ b/src/v/kafka/server/handlers/offset_commit.cc @@ -53,6 +53,7 @@ struct offset_commit_ctx { , ssg(ssg) {} }; +template<> process_result_stages offset_commit_handler::handle(request_context ctx, ss::smp_service_group ssg) { offset_commit_request request; diff --git a/src/v/kafka/server/handlers/offset_commit.h b/src/v/kafka/server/handlers/offset_commit.h index 2355c580aeac..5a0512d4d043 100644 --- a/src/v/kafka/server/handlers/offset_commit.h +++ b/src/v/kafka/server/handlers/offset_commit.h @@ -11,16 +11,13 @@ #pragma once #include "kafka/protocol/offset_commit.h" #include "kafka/server/handlers/handler.h" +#include "kafka/server/response.h" namespace kafka { // in version 0 kafka stores offsets in zookeeper. if we ever need to // support version 0 then we need to do some code review to see if this has // any implications on semantics. -struct offset_commit_handler { - using api = offset_commit_api; - static constexpr api_version min_supported = api_version(1); - static constexpr api_version max_supported = api_version(7); - static process_result_stages handle(request_context, ss::smp_service_group); -}; +using offset_commit_handler = two_phase_handler; + } // namespace kafka diff --git a/src/v/kafka/server/handlers/offset_fetch.h b/src/v/kafka/server/handlers/offset_fetch.h index 5cb87438b02b..64ff13891db5 100644 --- a/src/v/kafka/server/handlers/offset_fetch.h +++ b/src/v/kafka/server/handlers/offset_fetch.h @@ -17,6 +17,6 @@ namespace kafka { // in version 0 kafka stores offsets in zookeeper. if we ever need to // support version 0 then we need to do some code review to see if this has // any implications on semantics. -using offset_fetch_handler = handler; +using offset_fetch_handler = single_stage_handler; } // namespace kafka diff --git a/src/v/kafka/server/handlers/offset_for_leader_epoch.h b/src/v/kafka/server/handlers/offset_for_leader_epoch.h index e191aabf2a4c..0c0e047b10a6 100644 --- a/src/v/kafka/server/handlers/offset_for_leader_epoch.h +++ b/src/v/kafka/server/handlers/offset_for_leader_epoch.h @@ -15,5 +15,5 @@ namespace kafka { using offset_for_leader_epoch_handler - = handler; + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index 8e9a2b051ecd..29b2dfcde8a0 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -43,6 +43,8 @@ namespace kafka { +static constexpr auto despam_interval = std::chrono::minutes(5); + produce_response produce_request::make_error_response(error_code error) const { produce_response response; @@ -464,6 +466,7 @@ static std::vector produce_topics(produce_ctx& octx) { return topics; } +template<> process_result_stages produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { produce_request request; diff --git a/src/v/kafka/server/handlers/produce.h b/src/v/kafka/server/handlers/produce.h index ae7673858d77..617b7cb1c98f 100644 --- a/src/v/kafka/server/handlers/produce.h +++ b/src/v/kafka/server/handlers/produce.h @@ -14,12 +14,6 @@ namespace kafka { -struct produce_handler { - using api = produce_api; - static constexpr api_version min_supported = api_version(0); - static constexpr api_version max_supported = api_version(7); - static process_result_stages handle(request_context, ss::smp_service_group); - static constexpr auto despam_interval = std::chrono::minutes(5); -}; +using produce_handler = two_phase_handler; } // namespace kafka diff --git a/src/v/kafka/server/handlers/sasl_authenticate.h b/src/v/kafka/server/handlers/sasl_authenticate.h index d86e3152223c..5165e094db17 100644 --- a/src/v/kafka/server/handlers/sasl_authenticate.h +++ b/src/v/kafka/server/handlers/sasl_authenticate.h @@ -14,6 +14,7 @@ namespace kafka { -using sasl_authenticate_handler = handler; +using sasl_authenticate_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/sasl_handshake.h b/src/v/kafka/server/handlers/sasl_handshake.h index d5a9343f939b..7d5e5a3867f2 100644 --- a/src/v/kafka/server/handlers/sasl_handshake.h +++ b/src/v/kafka/server/handlers/sasl_handshake.h @@ -14,6 +14,6 @@ namespace kafka { -using sasl_handshake_handler = handler; +using sasl_handshake_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/sync_group.cc b/src/v/kafka/server/handlers/sync_group.cc index fcbfe4a1c743..0ddee9864020 100644 --- a/src/v/kafka/server/handlers/sync_group.cc +++ b/src/v/kafka/server/handlers/sync_group.cc @@ -21,6 +21,7 @@ namespace kafka { +template<> process_result_stages sync_group_handler::handle( request_context ctx, [[maybe_unused]] ss::smp_service_group g) { sync_group_request request; diff --git a/src/v/kafka/server/handlers/sync_group.h b/src/v/kafka/server/handlers/sync_group.h index 711cca63bae9..b23ceb79578a 100644 --- a/src/v/kafka/server/handlers/sync_group.h +++ b/src/v/kafka/server/handlers/sync_group.h @@ -14,11 +14,6 @@ namespace kafka { -struct sync_group_handler { - using api = sync_group_api; - static constexpr api_version min_supported = api_version(0); - static constexpr api_version max_supported = api_version(3); - static process_result_stages handle(request_context, ss::smp_service_group); -}; +using sync_group_handler = two_phase_handler; } // namespace kafka diff --git a/src/v/kafka/server/handlers/txn_offset_commit.h b/src/v/kafka/server/handlers/txn_offset_commit.h index dcb1fc578618..c7cebe25954f 100644 --- a/src/v/kafka/server/handlers/txn_offset_commit.h +++ b/src/v/kafka/server/handlers/txn_offset_commit.h @@ -14,6 +14,7 @@ namespace kafka { -using txn_offset_commit_handler = handler; +using txn_offset_commit_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/request_context.h b/src/v/kafka/server/request_context.h index 1f7b1f35e766..ddee4747a43f 100644 --- a/src/v/kafka/server/request_context.h +++ b/src/v/kafka/server/request_context.h @@ -219,7 +219,8 @@ class request_context { }; // Executes the API call identified by the specified request_context. -process_result_stages process_request(request_context&&, ss::smp_service_group); +process_result_stages process_request( + request_context&&, ss::smp_service_group, const session_resources&); bool track_latency(api_key); diff --git a/src/v/kafka/server/requests.cc b/src/v/kafka/server/requests.cc index 8aec87556a3e..950ff2ca2d62 100644 --- a/src/v/kafka/server/requests.cc +++ b/src/v/kafka/server/requests.cc @@ -7,8 +7,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 -#include "kafka/server/handlers/handlers.h" -#include "kafka/server/handlers/produce.h" +#include "kafka/protocol/schemata/api_versions_request.h" +#include "kafka/protocol/schemata/fetch_request.h" +#include "kafka/protocol/schemata/produce_request.h" +#include "kafka/server/connection_context.h" +#include "kafka/server/handlers/api_versions.h" +#include "kafka/server/handlers/handler_interface.h" +#include "kafka/server/handlers/sasl_authenticate.h" +#include "kafka/server/handlers/sasl_handshake.h" #include "kafka/server/request_context.h" #include "kafka/types.h" #include "utils/to_string.h" @@ -33,53 +39,6 @@ struct process_dispatch { // clang-format on } }; -/** - * api_versions request processed in one stage however this template - * specialization exists so that the return value of the request can be examined - * by the connection layer. - */ -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return process_result_stages::single_stage( - api_versions_handler::handle(std::move(ctx), g)); - } -}; - -/** - * Requests processed in two stages - */ -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return produce_handler::handle(std::move(ctx), g); - } -}; - -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return offset_commit_handler::handle(std::move(ctx), g); - } -}; -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return join_group_handler::handle(std::move(ctx), g); - } -}; -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return sync_group_handler::handle(std::move(ctx), g); - } -}; - class kafka_api_version_not_supported_exception : public std::runtime_error { public: explicit kafka_api_version_not_supported_exception(const std::string& m) @@ -121,6 +80,39 @@ requires(KafkaApiHandler || KafkaApiTwoPhaseHandler) return process_dispatch::process(std::move(ctx), g); } +process_result_stages process_generic( + handler handler, + request_context&& ctx, + ss::smp_service_group g, + const session_resources& sres) { + vlog( + klog.trace, + "[{}:{}] processing name:{}, key:{}, version:{} for {}, mem_units: {}", + ctx.connection()->client_host(), + ctx.connection()->client_port(), + handler->name(), + ctx.header().key, + ctx.header().version, + ctx.header().client_id.value_or(std::string_view("unset-client-id")), + sres.memlocks.count()); + + // We do a version check for most API requests, but for api_version + // requests we skip them. We do not apply them for api_versions, + // because the client does not yet know what + // versions this server supports. The api versions request is used by a + // client to query this information. + if (ctx.header().key != api_versions_api::key && + (ctx.header().version < handler->min_supported() || + ctx.header().version > handler->max_supported())) { + throw std::runtime_error(fmt::format( + "Unsupported version {} for {} API", + ctx.header().version, + handler->name())); + } + + return handler->handle(std::move(ctx), g); +} + class kafka_authentication_exception : public std::runtime_error { public: explicit kafka_authentication_exception(const std::string& m) @@ -161,7 +153,7 @@ handle_auth_handshake(request_context&& ctx, ss::smp_service_group g) { static ss::future handle_auth_initial(request_context&& ctx, ss::smp_service_group g) { switch (ctx.header().key) { - case api_versions_handler::api::key: { + case api_versions_api::key: { auto r = api_versions_handler::handle_raw(ctx); if (r.data.error_code == error_code::none) { ctx.sasl().set_state(security::sasl_server::sasl_state::handshake); @@ -247,16 +239,18 @@ handle_auth(request_context&& ctx, ss::smp_service_group g) { // only track latency for push and fetch requests bool track_latency(api_key key) { switch (key) { - case fetch_handler::api::key: - case produce_handler::api::key: + case fetch_api::key: + case produce_api::key: return true; default: return false; } } -process_result_stages -process_request(request_context&& ctx, ss::smp_service_group g) { +process_result_stages process_request( + request_context&& ctx, + ss::smp_service_group g, + const session_resources& sres) { /* * requests are handled as normal when auth is disabled. otherwise no * request is handled until the auth process has completed. @@ -274,47 +268,14 @@ process_request(request_context&& ctx, ss::smp_service_group g) { })); } - switch (ctx.header().key) { - case api_versions_handler::api::key: - return do_process(std::move(ctx), g); - case metadata_handler::api::key: - return do_process(std::move(ctx), g); - case list_groups_handler::api::key: - return do_process(std::move(ctx), g); - case find_coordinator_handler::api::key: - return do_process(std::move(ctx), g); - case offset_fetch_handler::api::key: - return do_process(std::move(ctx), g); - case produce_handler::api::key: - return do_process(std::move(ctx), g); - case list_offsets_handler::api::key: - return do_process(std::move(ctx), g); - case offset_commit_handler::api::key: - return do_process(std::move(ctx), g); - case fetch_handler::api::key: - return do_process(std::move(ctx), g); - case join_group_handler::api::key: - return do_process(std::move(ctx), g); - case heartbeat_handler::api::key: - return do_process(std::move(ctx), g); - case leave_group_handler::api::key: - return do_process(std::move(ctx), g); - case sync_group_handler::api::key: - return do_process(std::move(ctx), g); - case create_topics_handler::api::key: - return do_process(std::move(ctx), g); - case describe_configs_handler::api::key: - return do_process(std::move(ctx), g); - case alter_configs_handler::api::key: - return do_process(std::move(ctx), g); - case delete_topics_handler::api::key: - return do_process(std::move(ctx), g); - case describe_groups_handler::api::key: - return do_process(std::move(ctx), g); - case sasl_handshake_handler::api::key: + auto& key = ctx.header().key; + + if (key == sasl_handshake_handler::api::key) { return process_result_stages::single_stage(ctx.respond( sasl_handshake_response(error_code::illegal_sasl_state, {}))); - case sasl_authenticate_handler::api::key: { + } + + if (key == sasl_authenticate_handler::api::key) { sasl_authenticate_response_data data{ .error_code = error_code::illegal_sasl_state, .error_message = "Authentication process already completed", @@ -322,33 +283,11 @@ process_request(request_context&& ctx, ss::smp_service_group g) { return process_result_stages::single_stage( ctx.respond(sasl_authenticate_response(std::move(data)))); } - case init_producer_id_handler::api::key: - return do_process(std::move(ctx), g); - case incremental_alter_configs_handler::api::key: - return do_process(std::move(ctx), g); - case delete_groups_handler::api::key: - return do_process(std::move(ctx), g); - case describe_acls_handler::api::key: - return do_process(std::move(ctx), g); - case describe_log_dirs_handler::api::key: - return do_process(std::move(ctx), g); - case create_acls_handler::api::key: - return do_process(std::move(ctx), g); - case delete_acls_handler::api::key: - return do_process(std::move(ctx), g); - case add_partitions_to_txn_handler::api::key: - return do_process(std::move(ctx), g); - case txn_offset_commit_handler::api::key: - return do_process(std::move(ctx), g); - case add_offsets_to_txn_handler::api::key: - return do_process(std::move(ctx), g); - case end_txn_handler::api::key: - return do_process(std::move(ctx), g); - case create_partitions_handler::api::key: - return do_process(std::move(ctx), g); - case offset_for_leader_epoch_handler::api::key: - return do_process(std::move(ctx), g); - }; + + if (auto handler = handler_for_key(key)) { + return process_generic(*handler, std::move(ctx), g, sres); + } + throw std::runtime_error( fmt::format("Unsupported API {}", ctx.header().key)); } diff --git a/src/v/kafka/server/response.h b/src/v/kafka/server/response.h index c7be3de993d3..0bbe52bc978e 100644 --- a/src/v/kafka/server/response.h +++ b/src/v/kafka/server/response.h @@ -105,4 +105,26 @@ struct process_result_stages { ss::future response; }; +/** + * @brief The default memory size estimate. + * + * Request must make an up-front estimate of the amount of memory they will use, + * in order to obtain the corresponding number of units from the memory + * semaphore (blocking if they are not available). Each request type can use + * their own estimation approach, but if not specified this default estimator + * will be used. + * + * Now, this estimator is very poor for many request types: it only applies a + * multiplier to the request size, so only makes + * sense for requests (such as produce) where the size of the request is a + * good indicator of the total memory size. For requests with a small request + * but a large response (fetch, metadata, etc), it is not appropriate. + * + * @return size_t the estimated size required to process the request + */ +constexpr size_t default_memory_estimate(size_t request_size) { + // Allow for extra copies and bookkeeping + return request_size * 2 + 8000; // NOLINT +} + } // namespace kafka diff --git a/src/v/kafka/server/tests/CMakeLists.txt b/src/v/kafka/server/tests/CMakeLists.txt index 3630ba7e10ff..08298dec6045 100644 --- a/src/v/kafka/server/tests/CMakeLists.txt +++ b/src/v/kafka/server/tests/CMakeLists.txt @@ -8,11 +8,13 @@ rp_test( timeouts_conversion_test.cc types_conversion_tests.cc topic_utils_test.cc + handler_interface_test.cc DEFINITIONS BOOST_TEST_DYN_LINK - LIBRARIES Boost::unit_test_framework v::kafka + LIBRARIES Boost::unit_test_framework v::kafka v::coproc LABELS kafka ) + set(srcs consumer_groups_test.cc member_test.cc diff --git a/src/v/kafka/server/tests/handler_interface_test.cc b/src/v/kafka/server/tests/handler_interface_test.cc new file mode 100644 index 000000000000..ab0a011f2805 --- /dev/null +++ b/src/v/kafka/server/tests/handler_interface_test.cc @@ -0,0 +1,49 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#include "kafka/server/handlers/handler_interface.h" +#include "kafka/server/handlers/handlers.h" + +#include + +template +void check_any_vs_static() { + BOOST_TEST_INFO("Testing " << H::api::name); + auto hopt = kafka::handler_for_key(H::api::key); + BOOST_REQUIRE(hopt.has_value()); + auto h = *hopt; + BOOST_CHECK_EQUAL(h->min_supported(), H::min_supported); + BOOST_CHECK_EQUAL(h->max_supported(), H::max_supported); + BOOST_CHECK_EQUAL(h->key(), H::api::key); + BOOST_CHECK_EQUAL(h->name(), H::api::name); +} + +template +void check_all_types(kafka::type_list) { + (check_any_vs_static(), ...); +} + +BOOST_AUTO_TEST_CASE(handler_all_types) { + check_all_types(kafka::request_types{}); +} + +BOOST_AUTO_TEST_CASE(handler_handler_for_key) { + // key too low + BOOST_CHECK(!kafka::handler_for_key(kafka::api_key(-1)).has_value()); + // key too high + const auto max_key = kafka::max_api_key(kafka::request_types{}); + BOOST_CHECK( + !kafka::handler_for_key(kafka::api_key(max_key + 1)).has_value()); + // last key should be present + BOOST_CHECK(kafka::handler_for_key(kafka::api_key(max_key)).has_value()); + // 34 is AlterReplicaLogDirs which we don't currently support, use it as a + // test case for handlers which fall in the valid range but we don't support + BOOST_CHECK(!kafka::handler_for_key(kafka::api_key(34)).has_value()); +}