From eea0de57e7714748e353694addb3c8a22ce8cb24 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 29 Jun 2022 15:19:11 -0700 Subject: [PATCH 01/20] Clarify behavior of process_next_response The behavior of process_next_response is worth clarifying as the returned future does not nececessarily wait for all enqueued respones to be finished before resolving. We also rename the method to better reflect its purpose. --- src/v/kafka/server/connection_context.cc | 17 +++++++++++++++-- src/v/kafka/server/connection_context.h | 14 +++++++++++++- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 4847f601013b..bffc5dd8a9d3 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -367,7 +367,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { response_ptr r) mutable { r->set_correlation(correlation); _responses.insert({seq, std::move(r)}); - return process_next_response(); + return maybe_process_responses(); }); }) .handle_exception([self](std::exception_ptr e) { @@ -410,7 +410,20 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { }); } -ss::future<> connection_context::process_next_response() { +/** + * This method processes as many responses as possible, in request order. Since + * we proces the second stage asynchronously within a given connection, reponses + * may become ready out of order, but Kafka clients expect responses exactly in + * request order. + * + * The _responses queue handles that: responses are enqueued there in completion + * order, but only sent to the client in response order. So this method, called + * after every response is ready, may end up sending zero, one or more requests, + * depending on the completion order. + * + * @return ss::future<> + */ +ss::future<> connection_context::maybe_process_responses() { return ss::repeat([this]() mutable { auto it = _responses.find(_next_response); if (it == _responses.end()) { diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 4276f8f9da65..5445480c7253 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -166,7 +166,19 @@ class connection_context final ss::future<> handle_mtls_auth(); ss::future<> dispatch_method_once(request_header, size_t sz); - ss::future<> process_next_response(); + + /** + * Process zero or more ready responses in request order. + * + * The future<> returned by this method resolves when all ready *and* + * in-order responses have been processed, which is not the same as all + * ready responses. In particular, responses which are ready may not be + * processed if there are earlier (lower sequence number) responses which + * are not yet ready: they will be processed by a future invocation. + * + * @return ss::future<> a future which as described above. + */ + ss::future<> maybe_process_responses(); ss::future<> do_process(request_context); ss::future<> handle_auth_v0(size_t); From 5f5c7c6beb636af516dd1df6d1031c8e51b12ebe Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 29 Jun 2022 15:55:44 -0700 Subject: [PATCH 02/20] Add comment to session resources --- src/v/kafka/server/connection_context.h | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 5445480c7253..12ef73e40d18 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -148,7 +148,14 @@ class connection_context final private: net::server_probe& _probe; }; - // used to pass around some internal state + + // Used to hold resources associated with a given request until + // the response has been send, as well as to track some statistics + // about the request. + // + // The resources in particular should be not be destroyed until + // the request is complete (e.g., all the information written to + // the socket so that no userspace buffers remain). struct session_resources { ss::lowres_clock::duration backpressure_delay; ss::semaphore_units<> memlocks; From d2bac6cef4a959beda691dbc3bf3c79733ccfa50 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 29 Jun 2022 16:57:29 -0700 Subject: [PATCH 03/20] Release resources after the response is written Currently we release resources after the response is enqueued in connection_context and response processing is called, but it may not have been sent at this point as we require in-order responses but second-stage processing may happen out of order. In this change, we instead tunnel the resource object through to the place where the response is written, and release it there. FIxes #5278. --- src/v/kafka/server/connection_context.cc | 47 +++++++++++++++--------- src/v/kafka/server/connection_context.h | 10 ++++- 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index bffc5dd8a9d3..d046f5cbfdb0 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -333,7 +333,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { seq, correlation, self, - s = std::move(sres)](ss::future<> d) mutable { + sres = std::move(sres)](ss::future<> d) mutable { /* * if the dispatch/first stage failed, then we need to * need to consume the second stage since it might be @@ -362,13 +362,22 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { ssx::background = ssx::spawn_with_gate_then( _rs.conn_gate(), - [this, f = std::move(f), seq, correlation]() mutable { - return f.then([this, seq, correlation]( - response_ptr r) mutable { - r->set_correlation(correlation); - _responses.insert({seq, std::move(r)}); - return maybe_process_responses(); - }); + [this, + f = std::move(f), + sres = std::move(sres), + seq, + correlation]() mutable { + return f.then( + [this, + sres = std::move(sres), + seq, + correlation](response_ptr r) mutable { + r->set_correlation(correlation); + response_and_resources randr{ + std::move(r), std::move(sres)}; + _responses.insert({seq, std::move(randr)}); + return maybe_process_responses(); + }); }) .handle_exception([self](std::exception_ptr e) { // ssx::spawn_with_gate already caught @@ -397,8 +406,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { self->_rs.probe().service_error(); self->_rs.conn->shutdown_input(); - }) - .finally([s = std::move(s), self] {}); + }); return d; }) .handle_exception([self](std::exception_ptr e) { @@ -433,20 +441,25 @@ ss::future<> connection_context::maybe_process_responses() { // found one; increment counter _next_response = _next_response + sequence_id(1); - auto r = std::move(it->second); + auto resp_and_res = std::move(it->second); + _responses.erase(it); - if (r->is_noop()) { + if (resp_and_res.response->is_noop()) { return ss::make_ready_future( ss::stop_iteration::no); } - auto msg = response_as_scattered(std::move(r)); + auto msg = response_as_scattered(std::move(resp_and_res.response)); try { - return _rs.conn->write(std::move(msg)).then([] { - return ss::make_ready_future( - ss::stop_iteration::no); - }); + return _rs.conn->write(std::move(msg)) + .then([] { + return ss::make_ready_future( + ss::stop_iteration::no); + }) + // release the resources only once it has been written to the + // connection. + .finally([resources = std::move(resp_and_res.resources)] {}); } catch (...) { vlog( klog.debug, diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 12ef73e40d18..e0b5766827cf 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -191,8 +191,16 @@ class connection_context final ss::future<> handle_auth_v0(size_t); private: + /** + * Bundles together a response and its associated resources. + */ + struct response_and_resources { + response_ptr response; + session_resources resources; + }; + using sequence_id = named_type; - using map_t = absl::flat_hash_map; + using map_t = absl::flat_hash_map; class ctx_log { public: From 5def3b733178657ee5ee9bb99e4020e16848516f Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 29 Jun 2022 20:40:06 -0700 Subject: [PATCH 04/20] Improve documentation of throttle related methods --- src/v/kafka/server/connection_context.h | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index e0b5766827cf..9de753f27b24 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -164,10 +164,18 @@ class connection_context final std::unique_ptr tracker; }; - /// called by throttle_request + // Reserve units from memory from the memory semaphore in proportion + // to the number of bytes the request procesisng is expected to + // take. ss::future> reserve_request_units(size_t size); - /// apply correct backpressure sequence + // Apply backpressure sequence, where the request processing may be + // delayed for various reasons, including throttling but also because + // too few server resources are available to accomodate the request + // currently. + // When the returned future resolves, the throttling period is over and + // the associated resouces have been obtained and are tracked by the + // contained session_resources object. ss::future throttle_request(const request_header&, size_t sz); From 79fe3a55f701515e16741da370d704b97b41ad3a Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Mon, 4 Jul 2022 17:59:33 -0700 Subject: [PATCH 05/20] Move max_api_key function to handlers header. This lets us share it with the request processing code which would also like to do type list based manipulation of the request types. --- src/v/kafka/server/flex_versions.cc | 7 ------- src/v/kafka/server/handlers/handlers.h | 7 +++++++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/v/kafka/server/flex_versions.cc b/src/v/kafka/server/flex_versions.cc index 1a3565b41a71..b0e6bd2cbea2 100644 --- a/src/v/kafka/server/flex_versions.cc +++ b/src/v/kafka/server/flex_versions.cc @@ -17,13 +17,6 @@ namespace kafka { /// requests will map to a value of api_key(-2) static constexpr api_version invalid_api = api_version(-2); -template -static constexpr size_t max_api_key(type_list) { - /// Black magic here is an overload of std::max() that takes an - /// std::initializer_list - return std::max({RequestTypes::api::key()...}); -} - template static constexpr auto get_flexible_request_min_versions_list(type_list r) { diff --git a/src/v/kafka/server/handlers/handlers.h b/src/v/kafka/server/handlers/handlers.h index 80cb2d22ebfb..d98c4d0885f0 100644 --- a/src/v/kafka/server/handlers/handlers.h +++ b/src/v/kafka/server/handlers/handlers.h @@ -87,4 +87,11 @@ using request_types = make_request_types< end_txn_handler, create_partitions_handler, offset_for_leader_epoch_handler>; + +template +static constexpr size_t max_api_key(type_list) { + /// Black magic here is an overload of std::max() that takes an + /// std::initializer_list + return std::max({RequestTypes::api::key()...}); +} } // namespace kafka From 2e4dd222dc452a02c2ebdde0374162d0c2a673ff Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Mon, 4 Jul 2022 22:51:25 -0700 Subject: [PATCH 06/20] Introduce KafkaApiHandlerAny concept We already had concepts for one-phase and two-phase handlers, and this concept is simply the union of those two handler concepts, i.e., "any" type of handler. --- src/v/kafka/server/handlers/handler.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/v/kafka/server/handlers/handler.h b/src/v/kafka/server/handlers/handler.h index a4b87a754586..8d289536344e 100644 --- a/src/v/kafka/server/handlers/handler.h +++ b/src/v/kafka/server/handlers/handler.h @@ -45,4 +45,7 @@ concept KafkaApiTwoPhaseHandler = KafkaApi && requires( { T::handle(std::move(ctx), g) } -> std::same_as; }; +template +concept KafkaApiHandlerAny = KafkaApiHandler || KafkaApiTwoPhaseHandler; + } // namespace kafka From 0c8a6e94f516cd5cdd88b3e1a5a72243172fcae1 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Thu, 14 Jul 2022 11:29:31 -0700 Subject: [PATCH 07/20] Rename handler to single_stage_handler We wish to reclaim the name handler for the generic handler interface introduced in the next change. --- src/v/kafka/server/handlers/add_offsets_to_txn.h | 3 ++- src/v/kafka/server/handlers/add_partitions_to_txn.h | 3 ++- src/v/kafka/server/handlers/alter_configs.h | 2 +- src/v/kafka/server/handlers/api_versions.h | 3 ++- src/v/kafka/server/handlers/create_acls.h | 2 +- src/v/kafka/server/handlers/create_partitions.h | 3 ++- src/v/kafka/server/handlers/create_topics.h | 2 +- src/v/kafka/server/handlers/delete_acls.h | 2 +- src/v/kafka/server/handlers/delete_groups.h | 2 +- src/v/kafka/server/handlers/delete_topics.h | 2 +- src/v/kafka/server/handlers/describe_acls.h | 2 +- src/v/kafka/server/handlers/describe_configs.h | 3 ++- src/v/kafka/server/handlers/describe_groups.h | 2 +- src/v/kafka/server/handlers/describe_log_dirs.h | 3 ++- src/v/kafka/server/handlers/end_txn.h | 2 +- src/v/kafka/server/handlers/fetch.h | 2 +- src/v/kafka/server/handlers/find_coordinator.h | 3 ++- src/v/kafka/server/handlers/handler.h | 2 +- src/v/kafka/server/handlers/heartbeat.h | 2 +- src/v/kafka/server/handlers/incremental_alter_configs.h | 2 +- src/v/kafka/server/handlers/init_producer_id.h | 3 ++- src/v/kafka/server/handlers/leave_group.h | 2 +- src/v/kafka/server/handlers/list_groups.h | 2 +- src/v/kafka/server/handlers/list_offsets.h | 2 +- src/v/kafka/server/handlers/metadata.h | 2 +- src/v/kafka/server/handlers/offset_fetch.h | 2 +- src/v/kafka/server/handlers/offset_for_leader_epoch.h | 2 +- src/v/kafka/server/handlers/sasl_authenticate.h | 3 ++- src/v/kafka/server/handlers/sasl_handshake.h | 2 +- src/v/kafka/server/handlers/txn_offset_commit.h | 3 ++- 30 files changed, 40 insertions(+), 30 deletions(-) diff --git a/src/v/kafka/server/handlers/add_offsets_to_txn.h b/src/v/kafka/server/handlers/add_offsets_to_txn.h index e4b1669b970a..fbc9fc1324e2 100644 --- a/src/v/kafka/server/handlers/add_offsets_to_txn.h +++ b/src/v/kafka/server/handlers/add_offsets_to_txn.h @@ -14,6 +14,7 @@ namespace kafka { -using add_offsets_to_txn_handler = handler; +using add_offsets_to_txn_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/add_partitions_to_txn.h b/src/v/kafka/server/handlers/add_partitions_to_txn.h index 5b0f2523b4f3..aee85586deee 100644 --- a/src/v/kafka/server/handlers/add_partitions_to_txn.h +++ b/src/v/kafka/server/handlers/add_partitions_to_txn.h @@ -14,6 +14,7 @@ namespace kafka { -using add_partitions_to_txn_handler = handler; +using add_partitions_to_txn_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/alter_configs.h b/src/v/kafka/server/handlers/alter_configs.h index 7edcf2f987ad..d61eb8c3f472 100644 --- a/src/v/kafka/server/handlers/alter_configs.h +++ b/src/v/kafka/server/handlers/alter_configs.h @@ -14,6 +14,6 @@ namespace kafka { -using alter_configs_handler = handler; +using alter_configs_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/api_versions.h b/src/v/kafka/server/handlers/api_versions.h index c131036d3f2a..7ab8921561ea 100644 --- a/src/v/kafka/server/handlers/api_versions.h +++ b/src/v/kafka/server/handlers/api_versions.h @@ -14,7 +14,8 @@ namespace kafka { -struct api_versions_handler : public handler { +struct api_versions_handler + : public single_stage_handler { static constexpr api_version min_flexible = api_version(3); static ss::future diff --git a/src/v/kafka/server/handlers/create_acls.h b/src/v/kafka/server/handlers/create_acls.h index e9719121d6df..d9a6161b71a1 100644 --- a/src/v/kafka/server/handlers/create_acls.h +++ b/src/v/kafka/server/handlers/create_acls.h @@ -14,6 +14,6 @@ namespace kafka { -using create_acls_handler = handler; +using create_acls_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/create_partitions.h b/src/v/kafka/server/handlers/create_partitions.h index 4102398e8d8b..16b1dcc9de27 100644 --- a/src/v/kafka/server/handlers/create_partitions.h +++ b/src/v/kafka/server/handlers/create_partitions.h @@ -14,6 +14,7 @@ namespace kafka { -using create_partitions_handler = handler; +using create_partitions_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/create_topics.h b/src/v/kafka/server/handlers/create_topics.h index e3c395858456..2ab493358dd0 100644 --- a/src/v/kafka/server/handlers/create_topics.h +++ b/src/v/kafka/server/handlers/create_topics.h @@ -14,6 +14,6 @@ namespace kafka { -using create_topics_handler = handler; +using create_topics_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/delete_acls.h b/src/v/kafka/server/handlers/delete_acls.h index d19ab798cf46..8e45cc5679fa 100644 --- a/src/v/kafka/server/handlers/delete_acls.h +++ b/src/v/kafka/server/handlers/delete_acls.h @@ -14,6 +14,6 @@ namespace kafka { -using delete_acls_handler = handler; +using delete_acls_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/delete_groups.h b/src/v/kafka/server/handlers/delete_groups.h index 85d01eb8c29b..d9858140b83a 100644 --- a/src/v/kafka/server/handlers/delete_groups.h +++ b/src/v/kafka/server/handlers/delete_groups.h @@ -14,6 +14,6 @@ namespace kafka { -using delete_groups_handler = handler; +using delete_groups_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/delete_topics.h b/src/v/kafka/server/handlers/delete_topics.h index 7dd8d66d2f15..e9b6606cbe00 100644 --- a/src/v/kafka/server/handlers/delete_topics.h +++ b/src/v/kafka/server/handlers/delete_topics.h @@ -14,6 +14,6 @@ namespace kafka { -using delete_topics_handler = handler; +using delete_topics_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/describe_acls.h b/src/v/kafka/server/handlers/describe_acls.h index 3377ac8a2858..996c6fa230aa 100644 --- a/src/v/kafka/server/handlers/describe_acls.h +++ b/src/v/kafka/server/handlers/describe_acls.h @@ -14,6 +14,6 @@ namespace kafka { -using describe_acls_handler = handler; +using describe_acls_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/describe_configs.h b/src/v/kafka/server/handlers/describe_configs.h index 27f7235dc762..97199e628b55 100644 --- a/src/v/kafka/server/handlers/describe_configs.h +++ b/src/v/kafka/server/handlers/describe_configs.h @@ -14,6 +14,7 @@ namespace kafka { -using describe_configs_handler = handler; +using describe_configs_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/describe_groups.h b/src/v/kafka/server/handlers/describe_groups.h index 6f804548b893..b62004ae9fa6 100644 --- a/src/v/kafka/server/handlers/describe_groups.h +++ b/src/v/kafka/server/handlers/describe_groups.h @@ -14,6 +14,6 @@ namespace kafka { -using describe_groups_handler = handler; +using describe_groups_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/describe_log_dirs.h b/src/v/kafka/server/handlers/describe_log_dirs.h index 13d11c440ad7..1731e88621a9 100644 --- a/src/v/kafka/server/handlers/describe_log_dirs.h +++ b/src/v/kafka/server/handlers/describe_log_dirs.h @@ -14,6 +14,7 @@ namespace kafka { -using describe_log_dirs_handler = handler; +using describe_log_dirs_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/end_txn.h b/src/v/kafka/server/handlers/end_txn.h index 72362cb00fed..cd80b0d41c25 100644 --- a/src/v/kafka/server/handlers/end_txn.h +++ b/src/v/kafka/server/handlers/end_txn.h @@ -14,6 +14,6 @@ namespace kafka { -using end_txn_handler = handler; +using end_txn_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/fetch.h b/src/v/kafka/server/handlers/fetch.h index d43b4e9b0b33..8f44d89f1451 100644 --- a/src/v/kafka/server/handlers/fetch.h +++ b/src/v/kafka/server/handlers/fetch.h @@ -17,7 +17,7 @@ namespace kafka { -using fetch_handler = handler; +using fetch_handler = single_stage_handler; /* * Fetch operation context diff --git a/src/v/kafka/server/handlers/find_coordinator.h b/src/v/kafka/server/handlers/find_coordinator.h index 8e3d83bfe4d6..1f5ff07fb97f 100644 --- a/src/v/kafka/server/handlers/find_coordinator.h +++ b/src/v/kafka/server/handlers/find_coordinator.h @@ -14,6 +14,7 @@ namespace kafka { -using find_coordinator_handler = handler; +using find_coordinator_handler + = single_stage_handler; } // namespace kafka diff --git a/src/v/kafka/server/handlers/handler.h b/src/v/kafka/server/handlers/handler.h index 8d289536344e..8895026c4e32 100644 --- a/src/v/kafka/server/handlers/handler.h +++ b/src/v/kafka/server/handlers/handler.h @@ -22,7 +22,7 @@ template< typename RequestApi, api_version::type MinSupported, api_version::type MaxSupported> -struct handler { +struct single_stage_handler { using api = RequestApi; static constexpr api_version min_supported = api_version(MinSupported); static constexpr api_version max_supported = api_version(MaxSupported); diff --git a/src/v/kafka/server/handlers/heartbeat.h b/src/v/kafka/server/handlers/heartbeat.h index 27a4c22b1cbd..437279760a54 100644 --- a/src/v/kafka/server/handlers/heartbeat.h +++ b/src/v/kafka/server/handlers/heartbeat.h @@ -14,6 +14,6 @@ namespace kafka { -using heartbeat_handler = handler; +using heartbeat_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.h b/src/v/kafka/server/handlers/incremental_alter_configs.h index 8e902f5da6b3..9dbfde6be92e 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.h +++ b/src/v/kafka/server/handlers/incremental_alter_configs.h @@ -15,6 +15,6 @@ namespace kafka { using incremental_alter_configs_handler - = handler; + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/init_producer_id.h b/src/v/kafka/server/handlers/init_producer_id.h index 5068f4325684..be4c7d0a080a 100644 --- a/src/v/kafka/server/handlers/init_producer_id.h +++ b/src/v/kafka/server/handlers/init_producer_id.h @@ -14,6 +14,7 @@ namespace kafka { -using init_producer_id_handler = handler; +using init_producer_id_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/leave_group.h b/src/v/kafka/server/handlers/leave_group.h index a959c6dc4ddd..61adf1450dec 100644 --- a/src/v/kafka/server/handlers/leave_group.h +++ b/src/v/kafka/server/handlers/leave_group.h @@ -14,6 +14,6 @@ namespace kafka { -using leave_group_handler = handler; +using leave_group_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/list_groups.h b/src/v/kafka/server/handlers/list_groups.h index efe1657ae082..b345f794a0e9 100644 --- a/src/v/kafka/server/handlers/list_groups.h +++ b/src/v/kafka/server/handlers/list_groups.h @@ -14,6 +14,6 @@ namespace kafka { -using list_groups_handler = handler; +using list_groups_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/list_offsets.h b/src/v/kafka/server/handlers/list_offsets.h index 896d0344b42a..bb88af1b1a7e 100644 --- a/src/v/kafka/server/handlers/list_offsets.h +++ b/src/v/kafka/server/handlers/list_offsets.h @@ -14,6 +14,6 @@ namespace kafka { -using list_offsets_handler = handler; +using list_offsets_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/metadata.h b/src/v/kafka/server/handlers/metadata.h index 89445b193fd0..8d2336218e5b 100644 --- a/src/v/kafka/server/handlers/metadata.h +++ b/src/v/kafka/server/handlers/metadata.h @@ -14,6 +14,6 @@ namespace kafka { -using metadata_handler = handler; +using metadata_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/offset_fetch.h b/src/v/kafka/server/handlers/offset_fetch.h index 5cb87438b02b..64ff13891db5 100644 --- a/src/v/kafka/server/handlers/offset_fetch.h +++ b/src/v/kafka/server/handlers/offset_fetch.h @@ -17,6 +17,6 @@ namespace kafka { // in version 0 kafka stores offsets in zookeeper. if we ever need to // support version 0 then we need to do some code review to see if this has // any implications on semantics. -using offset_fetch_handler = handler; +using offset_fetch_handler = single_stage_handler; } // namespace kafka diff --git a/src/v/kafka/server/handlers/offset_for_leader_epoch.h b/src/v/kafka/server/handlers/offset_for_leader_epoch.h index e191aabf2a4c..0c0e047b10a6 100644 --- a/src/v/kafka/server/handlers/offset_for_leader_epoch.h +++ b/src/v/kafka/server/handlers/offset_for_leader_epoch.h @@ -15,5 +15,5 @@ namespace kafka { using offset_for_leader_epoch_handler - = handler; + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/sasl_authenticate.h b/src/v/kafka/server/handlers/sasl_authenticate.h index d86e3152223c..5165e094db17 100644 --- a/src/v/kafka/server/handlers/sasl_authenticate.h +++ b/src/v/kafka/server/handlers/sasl_authenticate.h @@ -14,6 +14,7 @@ namespace kafka { -using sasl_authenticate_handler = handler; +using sasl_authenticate_handler + = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/sasl_handshake.h b/src/v/kafka/server/handlers/sasl_handshake.h index d5a9343f939b..7d5e5a3867f2 100644 --- a/src/v/kafka/server/handlers/sasl_handshake.h +++ b/src/v/kafka/server/handlers/sasl_handshake.h @@ -14,6 +14,6 @@ namespace kafka { -using sasl_handshake_handler = handler; +using sasl_handshake_handler = single_stage_handler; } diff --git a/src/v/kafka/server/handlers/txn_offset_commit.h b/src/v/kafka/server/handlers/txn_offset_commit.h index dcb1fc578618..c7cebe25954f 100644 --- a/src/v/kafka/server/handlers/txn_offset_commit.h +++ b/src/v/kafka/server/handlers/txn_offset_commit.h @@ -14,6 +14,7 @@ namespace kafka { -using txn_offset_commit_handler = handler; +using txn_offset_commit_handler + = single_stage_handler; } From d533df6f0526b3583bc0a8ca774b176f629584ea Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Mon, 4 Jul 2022 22:59:02 -0700 Subject: [PATCH 08/20] Introduce type-erased handler interface This is a polymorphic handler class (abstract class with virtual methods), as well as concrete instantiations of the interface for each of the existing handlers, which can be looked up by API key. This lets us treat handlers generically without resorting to template functions which must be specialized for each handler type. This reduces code bloat significantly as we do not duplicate code paths for our ~45 handler types. For example, requests.cc.o drops from ~11 MB to ~5 MB after it is switched to the any_handler approach. --- src/v/kafka/CMakeLists.txt | 1 + .../server/handlers/handler_interface.cc | 126 ++++++++++++++++++ .../kafka/server/handlers/handler_interface.h | 91 +++++++++++++ 3 files changed, 218 insertions(+) create mode 100644 src/v/kafka/server/handlers/handler_interface.cc create mode 100644 src/v/kafka/server/handlers/handler_interface.h diff --git a/src/v/kafka/CMakeLists.txt b/src/v/kafka/CMakeLists.txt index 6e8da95e85af..067fc72b9cd0 100644 --- a/src/v/kafka/CMakeLists.txt +++ b/src/v/kafka/CMakeLists.txt @@ -34,6 +34,7 @@ set(handlers_srcs server/handlers/delete_acls.cc server/handlers/create_partitions.cc server/handlers/offset_for_leader_epoch.cc + server/handlers/handler_interface.cc server/handlers/topics/types.cc server/handlers/topics/topic_utils.cc ) diff --git a/src/v/kafka/server/handlers/handler_interface.cc b/src/v/kafka/server/handlers/handler_interface.cc new file mode 100644 index 000000000000..140271e41cf5 --- /dev/null +++ b/src/v/kafka/server/handlers/handler_interface.cc @@ -0,0 +1,126 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#include "kafka/server/handlers/handler_interface.h" + +#include "kafka/server/handlers/handlers.h" +#include "kafka/server/handlers/produce.h" +#include "kafka/types.h" + +#include + +namespace kafka { + +/** + * @brief Packages together basic information common to every handler. + */ +struct handler_info { + handler_info( + api_key key, + const char* name, + api_version min_api, + api_version max_api) noexcept + : _key(key) + , _name(name) + , _min_api(min_api) + , _max_api(max_api) {} + + api_key _key; + const char* _name; + api_version _min_api, _max_api; +}; + +/** + * @brief Creates a type-erased handler implementation given info and a handle + * method. + * + * There are only two variants of this handler, for one and two pass + * implementations. + * This keeps the generated code duplication to a minimum, compared to + * templating this on the handler type. + * + * @tparam is_two_pass true if the handler is two-pass + */ +template +struct handler_base final : public handler_interface { + using single_pass_handler + = ss::future(request_context, ss::smp_service_group); + using two_pass_handler + = process_result_stages(request_context, ss::smp_service_group); + using fn_type + = std::conditional_t; + + handler_base(const handler_info& info, fn_type* handle_fn) noexcept + : _info(info) + , _handle_fn(handle_fn) {} + + api_version min_supported() const override { return _info._min_api; } + api_version max_supported() const override { return _info._max_api; } + + api_key key() const override { return _info._key; } + const char* name() const override { return _info._name; } + + /** + * Only handle varies with one or two pass, since one pass handlers + * must pass through single_stage() to covert them to two-pass. + */ + process_result_stages + handle(request_context&& rc, ss::smp_service_group g) const override { + if constexpr (is_two_pass) { + return _handle_fn(std::move(rc), g); + } else { + return process_result_stages::single_stage( + _handle_fn(std::move(rc), g)); + } + } + +private: + handler_info _info; + fn_type* _handle_fn; +}; + +/** + * @brief Instance holder for the handler_base. + * + * Given a handler type H, exposes a static instance of the assoicated handler + * base object. + * + * @tparam H the handler type. + */ +template +struct handler_holder { + static const inline handler_base> instance{ + handler_info{ + H::api::key, H::api::name, H::min_supported, H::max_supported}, + H::handle}; +}; + +template +constexpr auto make_lut(type_list) { + constexpr int max_index = std::max({Ts::api::key...}); + static_assert(max_index < sizeof...(Ts) * 10, "LUT is too sparse"); + + std::array lut{}; + ((lut[Ts::api::key] = &handler_holder::instance), ...); + + return lut; +} + +std::optional handler_for_key(kafka::api_key key) { + static constexpr auto lut = make_lut(request_types{}); + if (key >= (short)0 && key < (short)lut.size()) { + if (auto handler = lut[key]) { + return handler; + } + } + return std::nullopt; +} + +} // namespace kafka diff --git a/src/v/kafka/server/handlers/handler_interface.h b/src/v/kafka/server/handlers/handler_interface.h new file mode 100644 index 000000000000..a88b49490d75 --- /dev/null +++ b/src/v/kafka/server/handlers/handler_interface.h @@ -0,0 +1,91 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#pragma once +#include "kafka/server/fwd.h" +#include "kafka/server/response.h" +#include "kafka/types.h" + +namespace kafka { +/** + * @brief Runtime polymorphic handler type. + * + * Allows access to all kafka request handling implementations though a + * type erased interface. This avoids the need to bring every handler + * type into scope and make everything that touches the handler a template + * function on the handler type. + * + */ +struct handler_interface { + /** + * @brief The minimum supported API version, inclusive. + */ + virtual api_version min_supported() const = 0; + + /** + * @brief The maximum supported API version, inclusive. + */ + virtual api_version max_supported() const = 0; + + /** + * @brief The name of the API method. + */ + virtual const char* name() const = 0; + + /** + * @brief The API key associated with the method. + */ + virtual api_key key() const = 0; + + /** + * @brief Handles the request. + * + * Invokes the request handler with the given request context + * (which will be moved from) and smp_service_groups. + * + * The result stages objects contains futures for both the initial + * dispatch phase, and the find response. For API methods which + * are implemented a single phase, the same type is returned, but + * the response future will complete as soon as the dispatch one does. + * + * @return process_result_stages representing the future completion of + * the handler. + */ + virtual process_result_stages + handle(request_context&&, ss::smp_service_group) const = 0; + + virtual ~handler_interface() = default; +}; + +/** + * @brief Pointer to a handler. + * + * Most code will use handler objects, which are simply pointers + * to handlers, generally const objects with static storage duration + * obtained from handler_for_key. + */ +using handler = const handler_interface*; + +/** + * @brief Return a handler for the given key, if any. + * + * Returns a pointer to a constant singleton handler for the given + * key, or an empty optional if no such handler exists. The contained + * any_hanlder is guaranteed to be non-null if the optional as a value. + * + * This method looks up the handler in a table populated by all handlers + * in kafka::request_types. + * + * @param key the API key for the handler + * @return std::optional the handler, if any + */ +std::optional handler_for_key(api_key key); + +} // namespace kafka From f80e99db75186dcbbb5f1f17ae7ae964a2efcb70 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Mon, 4 Jul 2022 23:15:37 -0700 Subject: [PATCH 09/20] Add handler_interface unit tests The handler_interface already gets good functional coverage as it is added to the core request path in requests.cc, but we also include this unit test with basic coverage. --- src/v/kafka/server/tests/CMakeLists.txt | 4 +- .../server/tests/handler_interface_test.cc | 49 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 src/v/kafka/server/tests/handler_interface_test.cc diff --git a/src/v/kafka/server/tests/CMakeLists.txt b/src/v/kafka/server/tests/CMakeLists.txt index 3630ba7e10ff..08298dec6045 100644 --- a/src/v/kafka/server/tests/CMakeLists.txt +++ b/src/v/kafka/server/tests/CMakeLists.txt @@ -8,11 +8,13 @@ rp_test( timeouts_conversion_test.cc types_conversion_tests.cc topic_utils_test.cc + handler_interface_test.cc DEFINITIONS BOOST_TEST_DYN_LINK - LIBRARIES Boost::unit_test_framework v::kafka + LIBRARIES Boost::unit_test_framework v::kafka v::coproc LABELS kafka ) + set(srcs consumer_groups_test.cc member_test.cc diff --git a/src/v/kafka/server/tests/handler_interface_test.cc b/src/v/kafka/server/tests/handler_interface_test.cc new file mode 100644 index 000000000000..ab0a011f2805 --- /dev/null +++ b/src/v/kafka/server/tests/handler_interface_test.cc @@ -0,0 +1,49 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#include "kafka/server/handlers/handler_interface.h" +#include "kafka/server/handlers/handlers.h" + +#include + +template +void check_any_vs_static() { + BOOST_TEST_INFO("Testing " << H::api::name); + auto hopt = kafka::handler_for_key(H::api::key); + BOOST_REQUIRE(hopt.has_value()); + auto h = *hopt; + BOOST_CHECK_EQUAL(h->min_supported(), H::min_supported); + BOOST_CHECK_EQUAL(h->max_supported(), H::max_supported); + BOOST_CHECK_EQUAL(h->key(), H::api::key); + BOOST_CHECK_EQUAL(h->name(), H::api::name); +} + +template +void check_all_types(kafka::type_list) { + (check_any_vs_static(), ...); +} + +BOOST_AUTO_TEST_CASE(handler_all_types) { + check_all_types(kafka::request_types{}); +} + +BOOST_AUTO_TEST_CASE(handler_handler_for_key) { + // key too low + BOOST_CHECK(!kafka::handler_for_key(kafka::api_key(-1)).has_value()); + // key too high + const auto max_key = kafka::max_api_key(kafka::request_types{}); + BOOST_CHECK( + !kafka::handler_for_key(kafka::api_key(max_key + 1)).has_value()); + // last key should be present + BOOST_CHECK(kafka::handler_for_key(kafka::api_key(max_key)).has_value()); + // 34 is AlterReplicaLogDirs which we don't currently support, use it as a + // test case for handlers which fall in the valid range but we don't support + BOOST_CHECK(!kafka::handler_for_key(kafka::api_key(34)).has_value()); +} From 12d43793d343d264c282523a3e93dfba2189ce38 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Tue, 5 Jul 2022 11:31:08 -0700 Subject: [PATCH 10/20] Update request handling to use polymorphic handler Preceding changes in this series introduced a runtime polymorphic handler class, and this change switches most of the request handling to use it. In particular, we replace the large switch on API key which dispatches to a template method to a lookup of the handler method and virtual dispatch. Some handlers that need special processing like the authentication/SASL related ones still use the old approach for now. --- src/v/kafka/server/requests.cc | 168 ++++++++++----------------------- 1 file changed, 50 insertions(+), 118 deletions(-) diff --git a/src/v/kafka/server/requests.cc b/src/v/kafka/server/requests.cc index 8aec87556a3e..5d5fcbd0b7da 100644 --- a/src/v/kafka/server/requests.cc +++ b/src/v/kafka/server/requests.cc @@ -7,8 +7,13 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 -#include "kafka/server/handlers/handlers.h" -#include "kafka/server/handlers/produce.h" +#include "kafka/protocol/schemata/api_versions_request.h" +#include "kafka/protocol/schemata/fetch_request.h" +#include "kafka/protocol/schemata/produce_request.h" +#include "kafka/server/handlers/api_versions.h" +#include "kafka/server/handlers/handler_interface.h" +#include "kafka/server/handlers/sasl_authenticate.h" +#include "kafka/server/handlers/sasl_handshake.h" #include "kafka/server/request_context.h" #include "kafka/types.h" #include "utils/to_string.h" @@ -33,53 +38,6 @@ struct process_dispatch { // clang-format on } }; -/** - * api_versions request processed in one stage however this template - * specialization exists so that the return value of the request can be examined - * by the connection layer. - */ -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return process_result_stages::single_stage( - api_versions_handler::handle(std::move(ctx), g)); - } -}; - -/** - * Requests processed in two stages - */ -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return produce_handler::handle(std::move(ctx), g); - } -}; - -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return offset_commit_handler::handle(std::move(ctx), g); - } -}; -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return join_group_handler::handle(std::move(ctx), g); - } -}; -template<> -struct process_dispatch { - static process_result_stages - process(request_context&& ctx, ss::smp_service_group g) { - return sync_group_handler::handle(std::move(ctx), g); - } -}; - class kafka_api_version_not_supported_exception : public std::runtime_error { public: explicit kafka_api_version_not_supported_exception(const std::string& m) @@ -121,6 +79,35 @@ requires(KafkaApiHandler || KafkaApiTwoPhaseHandler) return process_dispatch::process(std::move(ctx), g); } +process_result_stages process_generic( + handler handler, request_context&& ctx, ss::smp_service_group g) { + vlog( + klog.trace, + "[{}:{}] processing name:{}, key:{}, version:{} for {}", + ctx.connection()->client_host(), + ctx.connection()->client_port(), + handler->name(), + ctx.header().key, + ctx.header().version, + ctx.header().client_id.value_or(std::string_view("unset-client-id"))); + + // We do a version check for most API requests, but for api_version + // requests we skip them. We do not apply them for api_versions, + // because the client does not yet know what + // versions this server supports. The api versions request is used by a + // client to query this information. + if (ctx.header().key != api_versions_api::key && + (ctx.header().version < handler->min_supported() || + ctx.header().version > handler->max_supported())) { + throw std::runtime_error(fmt::format( + "Unsupported version {} for {} API", + ctx.header().version, + handler->name())); + } + + return handler->handle(std::move(ctx), g); +} + class kafka_authentication_exception : public std::runtime_error { public: explicit kafka_authentication_exception(const std::string& m) @@ -161,7 +148,7 @@ handle_auth_handshake(request_context&& ctx, ss::smp_service_group g) { static ss::future handle_auth_initial(request_context&& ctx, ss::smp_service_group g) { switch (ctx.header().key) { - case api_versions_handler::api::key: { + case api_versions_api::key: { auto r = api_versions_handler::handle_raw(ctx); if (r.data.error_code == error_code::none) { ctx.sasl().set_state(security::sasl_server::sasl_state::handshake); @@ -247,8 +234,8 @@ handle_auth(request_context&& ctx, ss::smp_service_group g) { // only track latency for push and fetch requests bool track_latency(api_key key) { switch (key) { - case fetch_handler::api::key: - case produce_handler::api::key: + case fetch_api::key: + case produce_api::key: return true; default: return false; @@ -274,47 +261,14 @@ process_request(request_context&& ctx, ss::smp_service_group g) { })); } - switch (ctx.header().key) { - case api_versions_handler::api::key: - return do_process(std::move(ctx), g); - case metadata_handler::api::key: - return do_process(std::move(ctx), g); - case list_groups_handler::api::key: - return do_process(std::move(ctx), g); - case find_coordinator_handler::api::key: - return do_process(std::move(ctx), g); - case offset_fetch_handler::api::key: - return do_process(std::move(ctx), g); - case produce_handler::api::key: - return do_process(std::move(ctx), g); - case list_offsets_handler::api::key: - return do_process(std::move(ctx), g); - case offset_commit_handler::api::key: - return do_process(std::move(ctx), g); - case fetch_handler::api::key: - return do_process(std::move(ctx), g); - case join_group_handler::api::key: - return do_process(std::move(ctx), g); - case heartbeat_handler::api::key: - return do_process(std::move(ctx), g); - case leave_group_handler::api::key: - return do_process(std::move(ctx), g); - case sync_group_handler::api::key: - return do_process(std::move(ctx), g); - case create_topics_handler::api::key: - return do_process(std::move(ctx), g); - case describe_configs_handler::api::key: - return do_process(std::move(ctx), g); - case alter_configs_handler::api::key: - return do_process(std::move(ctx), g); - case delete_topics_handler::api::key: - return do_process(std::move(ctx), g); - case describe_groups_handler::api::key: - return do_process(std::move(ctx), g); - case sasl_handshake_handler::api::key: + auto& key = ctx.header().key; + + if (key == sasl_handshake_handler::api::key) { return process_result_stages::single_stage(ctx.respond( sasl_handshake_response(error_code::illegal_sasl_state, {}))); - case sasl_authenticate_handler::api::key: { + } + + if (key == sasl_authenticate_handler::api::key) { sasl_authenticate_response_data data{ .error_code = error_code::illegal_sasl_state, .error_message = "Authentication process already completed", @@ -322,33 +276,11 @@ process_request(request_context&& ctx, ss::smp_service_group g) { return process_result_stages::single_stage( ctx.respond(sasl_authenticate_response(std::move(data)))); } - case init_producer_id_handler::api::key: - return do_process(std::move(ctx), g); - case incremental_alter_configs_handler::api::key: - return do_process(std::move(ctx), g); - case delete_groups_handler::api::key: - return do_process(std::move(ctx), g); - case describe_acls_handler::api::key: - return do_process(std::move(ctx), g); - case describe_log_dirs_handler::api::key: - return do_process(std::move(ctx), g); - case create_acls_handler::api::key: - return do_process(std::move(ctx), g); - case delete_acls_handler::api::key: - return do_process(std::move(ctx), g); - case add_partitions_to_txn_handler::api::key: - return do_process(std::move(ctx), g); - case txn_offset_commit_handler::api::key: - return do_process(std::move(ctx), g); - case add_offsets_to_txn_handler::api::key: - return do_process(std::move(ctx), g); - case end_txn_handler::api::key: - return do_process(std::move(ctx), g); - case create_partitions_handler::api::key: - return do_process(std::move(ctx), g); - case offset_for_leader_epoch_handler::api::key: - return do_process(std::move(ctx), g); - }; + + if (auto handler = handler_for_key(key)) { + return process_generic(*handler, std::move(ctx), g); + } + throw std::runtime_error( fmt::format("Unsupported API {}", ctx.header().key)); } From 38be81c6485cea2b841fc9de4200ac09e62a5e97 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Tue, 5 Jul 2022 17:44:12 -0700 Subject: [PATCH 11/20] Add support for memory estimation to handlers Currently we use single memory estimate for all kafka request types, but different API calls may use wildly different amounts of memory. This change allows each handler to perform an API-specific calculation instead. --- src/v/kafka/server/handlers/handler.h | 12 +++++++++- .../server/handlers/handler_interface.cc | 17 +++++++++++--- .../kafka/server/handlers/handler_interface.h | 21 ++++++++++++++++++ src/v/kafka/server/handlers/join_group.h | 3 +++ src/v/kafka/server/handlers/offset_commit.h | 4 ++++ src/v/kafka/server/handlers/produce.h | 3 +++ src/v/kafka/server/handlers/sync_group.h | 3 +++ src/v/kafka/server/response.h | 22 +++++++++++++++++++ 8 files changed, 81 insertions(+), 4 deletions(-) diff --git a/src/v/kafka/server/handlers/handler.h b/src/v/kafka/server/handlers/handler.h index 8895026c4e32..b6b3efa8aea8 100644 --- a/src/v/kafka/server/handlers/handler.h +++ b/src/v/kafka/server/handlers/handler.h @@ -18,16 +18,26 @@ namespace kafka { +using memory_estimate_fn = size_t(size_t); + template< typename RequestApi, api_version::type MinSupported, - api_version::type MaxSupported> + api_version::type MaxSupported, + memory_estimate_fn MemEstimator = default_memory_estimate> struct single_stage_handler { using api = RequestApi; static constexpr api_version min_supported = api_version(MinSupported); static constexpr api_version max_supported = api_version(MaxSupported); static ss::future handle(request_context, ss::smp_service_group); + /** + * See handler_interface::memory_estimate for a description of this + * function. + */ + static size_t memory_estimate(size_t request_size) { + return MemEstimator(request_size); + } }; template diff --git a/src/v/kafka/server/handlers/handler_interface.cc b/src/v/kafka/server/handlers/handler_interface.cc index 140271e41cf5..56721e833196 100644 --- a/src/v/kafka/server/handlers/handler_interface.cc +++ b/src/v/kafka/server/handlers/handler_interface.cc @@ -12,6 +12,7 @@ #include "kafka/server/handlers/handlers.h" #include "kafka/server/handlers/produce.h" +#include "kafka/server/response.h" #include "kafka/types.h" #include @@ -26,15 +27,18 @@ struct handler_info { api_key key, const char* name, api_version min_api, - api_version max_api) noexcept + api_version max_api, + memory_estimate_fn* mem_estimate) noexcept : _key(key) , _name(name) , _min_api(min_api) - , _max_api(max_api) {} + , _max_api(max_api) + , _mem_estimate(mem_estimate) {} api_key _key; const char* _name; api_version _min_api, _max_api; + memory_estimate_fn* _mem_estimate; }; /** @@ -67,6 +71,9 @@ struct handler_base final : public handler_interface { api_key key() const override { return _info._key; } const char* name() const override { return _info._name; } + size_t memory_estimate(size_t request_size) const override { + return _info._mem_estimate(request_size); + } /** * Only handle varies with one or two pass, since one pass handlers * must pass through single_stage() to covert them to two-pass. @@ -98,7 +105,11 @@ template struct handler_holder { static const inline handler_base> instance{ handler_info{ - H::api::key, H::api::name, H::min_supported, H::max_supported}, + H::api::key, + H::api::name, + H::min_supported, + H::max_supported, + H::memory_estimate}, H::handle}; }; diff --git a/src/v/kafka/server/handlers/handler_interface.h b/src/v/kafka/server/handlers/handler_interface.h index a88b49490d75..05e21ec497e0 100644 --- a/src/v/kafka/server/handlers/handler_interface.h +++ b/src/v/kafka/server/handlers/handler_interface.h @@ -44,6 +44,27 @@ struct handler_interface { */ virtual api_key key() const = 0; + /** + * @brief Estimates the memory used to process the request. + * + * Returns an esimate of the memory needed to process a request. This is + * used to block the request until sufficient memory is available using the + * "memory units" semaphore. Ideally this should be a conservative request + * (i.e., a possible overestimate in cases where the memory use may vary + * significantly) as the result of a too-small estimate may be an + * out-of-memory condition, while a too-large estimate will "merely" reduce + * performance. + * + * Handers may also return an initial, small estimate here covering the + * first part of processing, then dynamically increase their memory + * allocation later on during processing when the full memory size is known. + * + * Unfortunately, this estimate happens early in the decoding process, after + * only the request size and header has been read, so handlers don't have + * as much information as they may like to make this decision. + */ + virtual size_t memory_estimate(size_t request_size) const = 0; + /** * @brief Handles the request. * diff --git a/src/v/kafka/server/handlers/join_group.h b/src/v/kafka/server/handlers/join_group.h index 1d3ec508350a..5f4ebf322027 100644 --- a/src/v/kafka/server/handlers/join_group.h +++ b/src/v/kafka/server/handlers/join_group.h @@ -19,5 +19,8 @@ struct join_group_handler { static constexpr api_version min_supported = api_version(0); static constexpr api_version max_supported = api_version(5); static process_result_stages handle(request_context, ss::smp_service_group); + static size_t memory_estimate(size_t request_size) { + return default_memory_estimate(request_size); + } }; } // namespace kafka diff --git a/src/v/kafka/server/handlers/offset_commit.h b/src/v/kafka/server/handlers/offset_commit.h index 2355c580aeac..7026c855ce59 100644 --- a/src/v/kafka/server/handlers/offset_commit.h +++ b/src/v/kafka/server/handlers/offset_commit.h @@ -11,6 +11,7 @@ #pragma once #include "kafka/protocol/offset_commit.h" #include "kafka/server/handlers/handler.h" +#include "kafka/server/response.h" namespace kafka { @@ -22,5 +23,8 @@ struct offset_commit_handler { static constexpr api_version min_supported = api_version(1); static constexpr api_version max_supported = api_version(7); static process_result_stages handle(request_context, ss::smp_service_group); + static size_t memory_estimate(size_t request_size) { + return default_memory_estimate(request_size); + } }; } // namespace kafka diff --git a/src/v/kafka/server/handlers/produce.h b/src/v/kafka/server/handlers/produce.h index ae7673858d77..fbdace0fb23f 100644 --- a/src/v/kafka/server/handlers/produce.h +++ b/src/v/kafka/server/handlers/produce.h @@ -20,6 +20,9 @@ struct produce_handler { static constexpr api_version max_supported = api_version(7); static process_result_stages handle(request_context, ss::smp_service_group); static constexpr auto despam_interval = std::chrono::minutes(5); + static size_t memory_estimate(size_t request_size) { + return default_memory_estimate(request_size); + } }; } // namespace kafka diff --git a/src/v/kafka/server/handlers/sync_group.h b/src/v/kafka/server/handlers/sync_group.h index 711cca63bae9..88b85acbb14a 100644 --- a/src/v/kafka/server/handlers/sync_group.h +++ b/src/v/kafka/server/handlers/sync_group.h @@ -19,6 +19,9 @@ struct sync_group_handler { static constexpr api_version min_supported = api_version(0); static constexpr api_version max_supported = api_version(3); static process_result_stages handle(request_context, ss::smp_service_group); + static size_t memory_estimate(size_t request_size) { + return default_memory_estimate(request_size); + } }; } // namespace kafka diff --git a/src/v/kafka/server/response.h b/src/v/kafka/server/response.h index c7be3de993d3..0bbe52bc978e 100644 --- a/src/v/kafka/server/response.h +++ b/src/v/kafka/server/response.h @@ -105,4 +105,26 @@ struct process_result_stages { ss::future response; }; +/** + * @brief The default memory size estimate. + * + * Request must make an up-front estimate of the amount of memory they will use, + * in order to obtain the corresponding number of units from the memory + * semaphore (blocking if they are not available). Each request type can use + * their own estimation approach, but if not specified this default estimator + * will be used. + * + * Now, this estimator is very poor for many request types: it only applies a + * multiplier to the request size, so only makes + * sense for requests (such as produce) where the size of the request is a + * good indicator of the total memory size. For requests with a small request + * but a large response (fetch, metadata, etc), it is not appropriate. + * + * @return size_t the estimated size required to process the request + */ +constexpr size_t default_memory_estimate(size_t request_size) { + // Allow for extra copies and bookkeeping + return request_size * 2 + 8000; // NOLINT +} + } // namespace kafka From 5a9ca081b3e42ad01065abd9d08d4e9eaf3a3aa4 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Tue, 5 Jul 2022 21:07:27 -0700 Subject: [PATCH 12/20] Use handler specific memory estimate In connection_context, we now use the handler-specific initial memory use estimate, rather than a single estimate for every handler type. --- src/v/kafka/server/connection_context.cc | 25 ++++++++++++++++-------- src/v/kafka/server/connection_context.h | 3 ++- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index d046f5cbfdb0..e8546672a17f 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -13,10 +13,12 @@ #include "bytes/iobuf.h" #include "config/configuration.h" #include "kafka/protocol/sasl_authenticate.h" +#include "kafka/server/handlers/handler_interface.h" #include "kafka/server/protocol.h" #include "kafka/server/protocol_utils.h" #include "kafka/server/quota_manager.h" #include "kafka/server/request_context.h" +#include "kafka/server/response.h" #include "security/exceptions.h" #include "units.h" #include "vlog.h" @@ -236,8 +238,9 @@ connection_context::throttle_request( } auto track = track_latency(hdr.key); return fut - .then( - [this, request_size] { return reserve_request_units(request_size); }) + .then([this, key = hdr.key, request_size] { + return reserve_request_units(key, request_size); + }) .then([this, delay, track, tracker = std::move(tracker)]( ss::semaphore_units<> units) mutable { return server().get_request_unit().then( @@ -262,15 +265,21 @@ connection_context::throttle_request( } ss::future> -connection_context::reserve_request_units(size_t size) { - // Allow for extra copies and bookkeeping - auto mem_estimate = size * 2 + 8000; // NOLINT - if (mem_estimate >= (size_t)std::numeric_limits::max()) { +connection_context::reserve_request_units(api_key key, size_t size) { + // Defer to the handler for the request type for the memory estimate, but + // if the request isn't found, use the default estimate (although in that + // case the request is likely for an API we don't support or malformed, so + // it is likely to fail shortly anyway). + auto handler = handler_for_key(key); + auto mem_estimate = handler ? (*handler)->memory_estimate(size) + : default_memory_estimate(size); + if (unlikely(mem_estimate >= (size_t)std::numeric_limits::max())) { // TODO: Create error response using the specific API? throw std::runtime_error(fmt::format( - "request too large > 1GB (size: {}; estimate: {})", + "request too large > 1GB (size: {}, estimate: {}, API: {})", size, - mem_estimate)); + mem_estimate, + handler ? (*handler)->name() : "")); } auto fut = ss::get_units(_rs.memory(), mem_estimate); if (_rs.memory().waiters()) { diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 9de753f27b24..2b18788fcbf8 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -167,7 +167,8 @@ class connection_context final // Reserve units from memory from the memory semaphore in proportion // to the number of bytes the request procesisng is expected to // take. - ss::future> reserve_request_units(size_t size); + ss::future> + reserve_request_units(api_key key, size_t size); // Apply backpressure sequence, where the request processing may be // delayed for various reasons, including throttling but also because From 958b9330bcbb69f16e9cb2502417c403845d45fc Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Sun, 10 Jul 2022 16:22:20 -0700 Subject: [PATCH 13/20] Move session_resouces object to top level The session_resources type was a private member of connection_context, but as we want to use it more broadly, move it out as a standalone public class. Additionally, pass it by shared_pointer in preparation for later changes will feed it into requests. --- src/v/kafka/server/connection_context.cc | 17 ++++-- src/v/kafka/server/connection_context.h | 71 ++++++++++++------------ src/v/kafka/server/request_context.h | 3 +- src/v/kafka/server/requests.cc | 19 +++++-- 4 files changed, 63 insertions(+), 47 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index e8546672a17f..e94b4af15688 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -25,6 +25,7 @@ #include #include +#include #include #include @@ -184,8 +185,9 @@ ss::future<> connection_context::handle_auth_v0(const size_t size) { }, std::move(request_buf), 0s); + auto sres = session_resources{}; auto resp = co_await kafka::process_request( - std::move(ctx), _proto.smp_group()) + std::move(ctx), _proto.smp_group(), sres) .response; auto data = std::move(*resp).release(); response.decode(std::move(data), version); @@ -215,8 +217,7 @@ bool connection_context::is_finished_parsing() const { return _rs.conn->input().eof() || _rs.abort_requested(); } -ss::future -connection_context::throttle_request( +ss::future connection_context::throttle_request( const request_header& hdr, size_t request_size) { // update the throughput tracker for this client using the // size of the current request and return any computed delay @@ -291,11 +292,15 @@ connection_context::reserve_request_units(api_key key, size_t size) { ss::future<> connection_context::dispatch_method_once(request_header hdr, size_t size) { return throttle_request(hdr, size).then([this, hdr = std::move(hdr), size]( - session_resources sres) mutable { + session_resources + sres_in) mutable { if (_rs.abort_requested()) { // protect against shutdown behavior return ss::make_ready_future<>(); } + + auto sres = ss::make_lw_shared(std::move(sres_in)); + auto remaining = size - request_header_size - hdr.client_id_buffer.size() - hdr.tags_size_bytes; return read_iobuf_exactly(_rs.conn->input(), remaining) @@ -307,7 +312,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { } auto self = shared_from_this(); auto rctx = request_context( - self, std::move(hdr), std::move(buf), sres.backpressure_delay); + self, std::move(hdr), std::move(buf), sres->backpressure_delay); /* * we process requests in order since all subsequent requests * are dependent on authentication having completed. @@ -332,7 +337,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { const sequence_id seq = _seq_idx; _seq_idx = _seq_idx + sequence_id(1); auto res = kafka::process_request( - std::move(rctx), _proto.smp_group()); + std::move(rctx), _proto.smp_group(), *sres); /** * first stage processed in a foreground. */ diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 2b18788fcbf8..10eb4d697756 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -11,6 +11,7 @@ #pragma once #include "kafka/server/protocol.h" #include "kafka/server/response.h" +#include "kafka/types.h" #include "net/server.h" #include "seastarx.h" #include "security/acl.h" @@ -37,6 +38,41 @@ using authz_quiet = ss::bool_class; struct request_header; class request_context; +// used to track number of pending requests +class request_tracker { +public: + explicit request_tracker(net::server_probe& probe) noexcept + : _probe(probe) { + _probe.request_received(); + } + request_tracker(const request_tracker&) = delete; + request_tracker(request_tracker&&) = delete; + request_tracker& operator=(const request_tracker&) = delete; + request_tracker& operator=(request_tracker&&) = delete; + + ~request_tracker() noexcept { _probe.request_completed(); } + +private: + net::server_probe& _probe; +}; + +// Used to hold resources associated with a given request until +// the response has been send, as well as to track some statistics +// about the request. +// +// The resources in particular should be not be destroyed until +// the request is complete (e.g., all the information written to +// the socket so that no userspace buffers remain). +struct session_resources { + using pointer = ss::lw_shared_ptr; + + ss::lowres_clock::duration backpressure_delay; + ss::semaphore_units<> memlocks; + ss::semaphore_units<> queue_units; + std::unique_ptr method_latency; + std::unique_ptr tracker; +}; + class connection_context final : public ss::enable_lw_shared_from_this { public: @@ -131,39 +167,6 @@ class connection_context final } private: - // used to track number of pending requests - class request_tracker { - public: - explicit request_tracker(net::server_probe& probe) noexcept - : _probe(probe) { - _probe.request_received(); - } - request_tracker(const request_tracker&) = delete; - request_tracker(request_tracker&&) = delete; - request_tracker& operator=(const request_tracker&) = delete; - request_tracker& operator=(request_tracker&&) = delete; - - ~request_tracker() noexcept { _probe.request_completed(); } - - private: - net::server_probe& _probe; - }; - - // Used to hold resources associated with a given request until - // the response has been send, as well as to track some statistics - // about the request. - // - // The resources in particular should be not be destroyed until - // the request is complete (e.g., all the information written to - // the socket so that no userspace buffers remain). - struct session_resources { - ss::lowres_clock::duration backpressure_delay; - ss::semaphore_units<> memlocks; - ss::semaphore_units<> queue_units; - std::unique_ptr method_latency; - std::unique_ptr tracker; - }; - // Reserve units from memory from the memory semaphore in proportion // to the number of bytes the request procesisng is expected to // take. @@ -205,7 +208,7 @@ class connection_context final */ struct response_and_resources { response_ptr response; - session_resources resources; + session_resources::pointer resources; }; using sequence_id = named_type; diff --git a/src/v/kafka/server/request_context.h b/src/v/kafka/server/request_context.h index 1f7b1f35e766..ddee4747a43f 100644 --- a/src/v/kafka/server/request_context.h +++ b/src/v/kafka/server/request_context.h @@ -219,7 +219,8 @@ class request_context { }; // Executes the API call identified by the specified request_context. -process_result_stages process_request(request_context&&, ss::smp_service_group); +process_result_stages process_request( + request_context&&, ss::smp_service_group, const session_resources&); bool track_latency(api_key); diff --git a/src/v/kafka/server/requests.cc b/src/v/kafka/server/requests.cc index 5d5fcbd0b7da..950ff2ca2d62 100644 --- a/src/v/kafka/server/requests.cc +++ b/src/v/kafka/server/requests.cc @@ -10,6 +10,7 @@ #include "kafka/protocol/schemata/api_versions_request.h" #include "kafka/protocol/schemata/fetch_request.h" #include "kafka/protocol/schemata/produce_request.h" +#include "kafka/server/connection_context.h" #include "kafka/server/handlers/api_versions.h" #include "kafka/server/handlers/handler_interface.h" #include "kafka/server/handlers/sasl_authenticate.h" @@ -80,16 +81,20 @@ requires(KafkaApiHandler || KafkaApiTwoPhaseHandler) } process_result_stages process_generic( - handler handler, request_context&& ctx, ss::smp_service_group g) { + handler handler, + request_context&& ctx, + ss::smp_service_group g, + const session_resources& sres) { vlog( klog.trace, - "[{}:{}] processing name:{}, key:{}, version:{} for {}", + "[{}:{}] processing name:{}, key:{}, version:{} for {}, mem_units: {}", ctx.connection()->client_host(), ctx.connection()->client_port(), handler->name(), ctx.header().key, ctx.header().version, - ctx.header().client_id.value_or(std::string_view("unset-client-id"))); + ctx.header().client_id.value_or(std::string_view("unset-client-id")), + sres.memlocks.count()); // We do a version check for most API requests, but for api_version // requests we skip them. We do not apply them for api_versions, @@ -242,8 +247,10 @@ bool track_latency(api_key key) { } } -process_result_stages -process_request(request_context&& ctx, ss::smp_service_group g) { +process_result_stages process_request( + request_context&& ctx, + ss::smp_service_group g, + const session_resources& sres) { /* * requests are handled as normal when auth is disabled. otherwise no * request is handled until the auth process has completed. @@ -278,7 +285,7 @@ process_request(request_context&& ctx, ss::smp_service_group g) { } if (auto handler = handler_for_key(key)) { - return process_generic(*handler, std::move(ctx), g); + return process_generic(*handler, std::move(ctx), g, sres); } throw std::runtime_error( From 213b6716d6aa2bb8e1f29623074868017db19deb Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 13 Jul 2022 16:16:10 -0700 Subject: [PATCH 14/20] Sort kafka/server forward includes --- src/v/kafka/server/fwd.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/v/kafka/server/fwd.h b/src/v/kafka/server/fwd.h index a893be15ed16..4aa0a30909ec 100644 --- a/src/v/kafka/server/fwd.h +++ b/src/v/kafka/server/fwd.h @@ -13,13 +13,14 @@ namespace kafka { +// sorted class coordinator_ntp_mapper; class fetch_session_cache; class group_manager; class group_router; +class quota_manager; +class request_context; class rm_group_frontend; class rm_group_proxy_impl; -class request_context; -class quota_manager; } // namespace kafka From d645c7a78d548706e4f2b9d79ea48115e765566c Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 13 Jul 2022 20:15:15 -0700 Subject: [PATCH 15/20] Introduce handler template for two-stage handlers Single-stage handlers have a handler template which means that handler objects can be declared in a single line specifying their api object, min and max API versions. This change extends this nice concept to two-stage handlers as well. --- src/v/kafka/server/handlers/handler.h | 54 ++++++++++++++++++-- src/v/kafka/server/handlers/join_group.cc | 1 + src/v/kafka/server/handlers/join_group.h | 11 +--- src/v/kafka/server/handlers/offset_commit.cc | 1 + src/v/kafka/server/handlers/offset_commit.h | 11 +--- src/v/kafka/server/handlers/produce.cc | 3 ++ src/v/kafka/server/handlers/produce.h | 11 +--- src/v/kafka/server/handlers/sync_group.cc | 1 + src/v/kafka/server/handlers/sync_group.h | 10 +--- 9 files changed, 62 insertions(+), 41 deletions(-) diff --git a/src/v/kafka/server/handlers/handler.h b/src/v/kafka/server/handlers/handler.h index b6b3efa8aea8..749943cd6077 100644 --- a/src/v/kafka/server/handlers/handler.h +++ b/src/v/kafka/server/handlers/handler.h @@ -20,17 +20,28 @@ namespace kafka { using memory_estimate_fn = size_t(size_t); +/** + * Handlers are generally specializations of this template, via one of the + * two aliases (handler or two_phase_hander) declared below, though it is + * not strictly necessary (only conforming to one of the two KafkaApi* + * concepts is needed). + * + * The benefit of this template is that it takes care of the most of the + * handler boilerplate. + */ template< typename RequestApi, api_version::type MinSupported, api_version::type MaxSupported, - memory_estimate_fn MemEstimator = default_memory_estimate> -struct single_stage_handler { + typename HandleRetType, + memory_estimate_fn MemEstimator> +struct handler_template { using api = RequestApi; static constexpr api_version min_supported = api_version(MinSupported); static constexpr api_version max_supported = api_version(MaxSupported); - static ss::future - handle(request_context, ss::smp_service_group); + + static HandleRetType handle(request_context, ss::smp_service_group); + /** * See handler_interface::memory_estimate for a description of this * function. @@ -40,6 +51,41 @@ struct single_stage_handler { } }; +/** + * A single-stage handler implements the entire request handling in the initial + * stage which occurs before any subsequent request is processed. + */ +template< + typename RequestApi, + api_version::type MinSupported, + api_version::type MaxSupported, + memory_estimate_fn MemEstimator = default_memory_estimate> +using single_stage_handler = handler_template< + RequestApi, + MinSupported, + MaxSupported, + ss::future, + MemEstimator>; + +/** + * A two-stage handler has an initial stage which happens before any other + * request can start processing (as in a single-stage handler) but then also has + * a second stage which is processed in the background allowing other requests + * on the same connection to start their handler. Responses are still sent in + * order, but processing is out-of-order. + */ +template< + typename RequestApi, + api_version::type MinSupported, + api_version::type MaxSupported, + memory_estimate_fn MemEstimator = default_memory_estimate> +using two_phase_handler = handler_template< + RequestApi, + MinSupported, + MaxSupported, + process_result_stages, + MemEstimator>; + template concept KafkaApiHandler = KafkaApi && requires( T h, request_context&& ctx, ss::smp_service_group g) { diff --git a/src/v/kafka/server/handlers/join_group.cc b/src/v/kafka/server/handlers/join_group.cc index f7bc53b80a4e..57fe29ca9362 100644 --- a/src/v/kafka/server/handlers/join_group.cc +++ b/src/v/kafka/server/handlers/join_group.cc @@ -35,6 +35,7 @@ static void decode_request(request_context& ctx, join_group_request& req) { fmt::format("{}", ctx.connection()->client_host())); } +template<> process_result_stages join_group_handler::handle( request_context ctx, [[maybe_unused]] ss::smp_service_group g) { join_group_request request; diff --git a/src/v/kafka/server/handlers/join_group.h b/src/v/kafka/server/handlers/join_group.h index 5f4ebf322027..1830badc2f27 100644 --- a/src/v/kafka/server/handlers/join_group.h +++ b/src/v/kafka/server/handlers/join_group.h @@ -14,13 +14,6 @@ namespace kafka { -struct join_group_handler { - using api = join_group_api; - static constexpr api_version min_supported = api_version(0); - static constexpr api_version max_supported = api_version(5); - static process_result_stages handle(request_context, ss::smp_service_group); - static size_t memory_estimate(size_t request_size) { - return default_memory_estimate(request_size); - } -}; +using join_group_handler = two_phase_handler; + } // namespace kafka diff --git a/src/v/kafka/server/handlers/offset_commit.cc b/src/v/kafka/server/handlers/offset_commit.cc index ddaa68616aab..7c1c4d10976a 100644 --- a/src/v/kafka/server/handlers/offset_commit.cc +++ b/src/v/kafka/server/handlers/offset_commit.cc @@ -53,6 +53,7 @@ struct offset_commit_ctx { , ssg(ssg) {} }; +template<> process_result_stages offset_commit_handler::handle(request_context ctx, ss::smp_service_group ssg) { offset_commit_request request; diff --git a/src/v/kafka/server/handlers/offset_commit.h b/src/v/kafka/server/handlers/offset_commit.h index 7026c855ce59..5a0512d4d043 100644 --- a/src/v/kafka/server/handlers/offset_commit.h +++ b/src/v/kafka/server/handlers/offset_commit.h @@ -18,13 +18,6 @@ namespace kafka { // in version 0 kafka stores offsets in zookeeper. if we ever need to // support version 0 then we need to do some code review to see if this has // any implications on semantics. -struct offset_commit_handler { - using api = offset_commit_api; - static constexpr api_version min_supported = api_version(1); - static constexpr api_version max_supported = api_version(7); - static process_result_stages handle(request_context, ss::smp_service_group); - static size_t memory_estimate(size_t request_size) { - return default_memory_estimate(request_size); - } -}; +using offset_commit_handler = two_phase_handler; + } // namespace kafka diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index 8e9a2b051ecd..29b2dfcde8a0 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -43,6 +43,8 @@ namespace kafka { +static constexpr auto despam_interval = std::chrono::minutes(5); + produce_response produce_request::make_error_response(error_code error) const { produce_response response; @@ -464,6 +466,7 @@ static std::vector produce_topics(produce_ctx& octx) { return topics; } +template<> process_result_stages produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { produce_request request; diff --git a/src/v/kafka/server/handlers/produce.h b/src/v/kafka/server/handlers/produce.h index fbdace0fb23f..617b7cb1c98f 100644 --- a/src/v/kafka/server/handlers/produce.h +++ b/src/v/kafka/server/handlers/produce.h @@ -14,15 +14,6 @@ namespace kafka { -struct produce_handler { - using api = produce_api; - static constexpr api_version min_supported = api_version(0); - static constexpr api_version max_supported = api_version(7); - static process_result_stages handle(request_context, ss::smp_service_group); - static constexpr auto despam_interval = std::chrono::minutes(5); - static size_t memory_estimate(size_t request_size) { - return default_memory_estimate(request_size); - } -}; +using produce_handler = two_phase_handler; } // namespace kafka diff --git a/src/v/kafka/server/handlers/sync_group.cc b/src/v/kafka/server/handlers/sync_group.cc index fcbfe4a1c743..0ddee9864020 100644 --- a/src/v/kafka/server/handlers/sync_group.cc +++ b/src/v/kafka/server/handlers/sync_group.cc @@ -21,6 +21,7 @@ namespace kafka { +template<> process_result_stages sync_group_handler::handle( request_context ctx, [[maybe_unused]] ss::smp_service_group g) { sync_group_request request; diff --git a/src/v/kafka/server/handlers/sync_group.h b/src/v/kafka/server/handlers/sync_group.h index 88b85acbb14a..b23ceb79578a 100644 --- a/src/v/kafka/server/handlers/sync_group.h +++ b/src/v/kafka/server/handlers/sync_group.h @@ -14,14 +14,6 @@ namespace kafka { -struct sync_group_handler { - using api = sync_group_api; - static constexpr api_version min_supported = api_version(0); - static constexpr api_version max_supported = api_version(3); - static process_result_stages handle(request_context, ss::smp_service_group); - static size_t memory_estimate(size_t request_size) { - return default_memory_estimate(request_size); - } -}; +using sync_group_handler = two_phase_handler; } // namespace kafka From bbea197bdc3c611d342d1c5b46ea43d563fa165c Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 13 Jul 2022 20:54:46 -0700 Subject: [PATCH 16/20] Add the connection context to the memory estimator Passing the connection context to the estimator allows the estimator to use the various subsystems to estimate the memory use of a given request. --- src/v/kafka/server/connection_context.cc | 2 +- src/v/kafka/server/fwd.h | 1 + src/v/kafka/server/handlers/handler.h | 17 ++++++++++++----- .../kafka/server/handlers/handler_interface.cc | 5 +++-- src/v/kafka/server/handlers/handler_interface.h | 11 +++++------ src/v/kafka/server/handlers/metadata.cc | 1 + 6 files changed, 23 insertions(+), 14 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index e94b4af15688..0b263d09c8d6 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -272,7 +272,7 @@ connection_context::reserve_request_units(api_key key, size_t size) { // case the request is likely for an API we don't support or malformed, so // it is likely to fail shortly anyway). auto handler = handler_for_key(key); - auto mem_estimate = handler ? (*handler)->memory_estimate(size) + auto mem_estimate = handler ? (*handler)->memory_estimate(size, *this) : default_memory_estimate(size); if (unlikely(mem_estimate >= (size_t)std::numeric_limits::max())) { // TODO: Create error response using the specific API? diff --git a/src/v/kafka/server/fwd.h b/src/v/kafka/server/fwd.h index 4aa0a30909ec..e034d9f65084 100644 --- a/src/v/kafka/server/fwd.h +++ b/src/v/kafka/server/fwd.h @@ -14,6 +14,7 @@ namespace kafka { // sorted +class connection_context; class coordinator_ntp_mapper; class fetch_session_cache; class group_manager; diff --git a/src/v/kafka/server/handlers/handler.h b/src/v/kafka/server/handlers/handler.h index 749943cd6077..6e294af073c5 100644 --- a/src/v/kafka/server/handlers/handler.h +++ b/src/v/kafka/server/handlers/handler.h @@ -10,6 +10,7 @@ */ #pragma once #include "kafka/protocol/types.h" +#include "kafka/server/fwd.h" #include "kafka/server/request_context.h" #include "kafka/server/response.h" #include "kafka/types.h" @@ -18,7 +19,12 @@ namespace kafka { -using memory_estimate_fn = size_t(size_t); +using memory_estimate_fn = size_t(size_t, connection_context&); + +constexpr size_t +default_estimate_adaptor(size_t request_size, connection_context&) { + return default_memory_estimate(request_size); +} /** * Handlers are generally specializations of this template, via one of the @@ -46,8 +52,9 @@ struct handler_template { * See handler_interface::memory_estimate for a description of this * function. */ - static size_t memory_estimate(size_t request_size) { - return MemEstimator(request_size); + static size_t + memory_estimate(size_t request_size, connection_context& conn_ctx) { + return MemEstimator(request_size, conn_ctx); } }; @@ -59,7 +66,7 @@ template< typename RequestApi, api_version::type MinSupported, api_version::type MaxSupported, - memory_estimate_fn MemEstimator = default_memory_estimate> + memory_estimate_fn MemEstimator = default_estimate_adaptor> using single_stage_handler = handler_template< RequestApi, MinSupported, @@ -78,7 +85,7 @@ template< typename RequestApi, api_version::type MinSupported, api_version::type MaxSupported, - memory_estimate_fn MemEstimator = default_memory_estimate> + memory_estimate_fn MemEstimator = default_estimate_adaptor> using two_phase_handler = handler_template< RequestApi, MinSupported, diff --git a/src/v/kafka/server/handlers/handler_interface.cc b/src/v/kafka/server/handlers/handler_interface.cc index 56721e833196..156f305bb51b 100644 --- a/src/v/kafka/server/handlers/handler_interface.cc +++ b/src/v/kafka/server/handlers/handler_interface.cc @@ -71,8 +71,9 @@ struct handler_base final : public handler_interface { api_key key() const override { return _info._key; } const char* name() const override { return _info._name; } - size_t memory_estimate(size_t request_size) const override { - return _info._mem_estimate(request_size); + size_t memory_estimate( + size_t request_size, connection_context& conn_ctx) const override { + return _info._mem_estimate(request_size, conn_ctx); } /** * Only handle varies with one or two pass, since one pass handlers diff --git a/src/v/kafka/server/handlers/handler_interface.h b/src/v/kafka/server/handlers/handler_interface.h index 05e21ec497e0..6f5b368984c8 100644 --- a/src/v/kafka/server/handlers/handler_interface.h +++ b/src/v/kafka/server/handlers/handler_interface.h @@ -55,15 +55,14 @@ struct handler_interface { * out-of-memory condition, while a too-large estimate will "merely" reduce * performance. * - * Handers may also return an initial, small estimate here covering the - * first part of processing, then dynamically increase their memory - * allocation later on during processing when the full memory size is known. - * * Unfortunately, this estimate happens early in the decoding process, after * only the request size and header has been read, so handlers don't have - * as much information as they may like to make this decision. + * as much information as they may like to make this decision. The + * connection_context for the associated connection is passed to give access + * to global state which may be useful in making the estimate. */ - virtual size_t memory_estimate(size_t request_size) const = 0; + virtual size_t memory_estimate( + size_t request_size, connection_context& conn_ctx) const = 0; /** * @brief Handles the request. diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index 43813d0e5931..4865bb540141 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -15,6 +15,7 @@ #include "config/configuration.h" #include "config/node_config.h" #include "kafka/server/errors.h" +#include "kafka/server/fwd.h" #include "kafka/server/handlers/details/leader_epoch.h" #include "kafka/server/handlers/details/security.h" #include "kafka/server/handlers/topics/topic_utils.h" From c218c2bb3c122a1dedfdb451416113ad48a61a74 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Sun, 10 Jul 2022 22:27:57 -0700 Subject: [PATCH 17/20] Use a better estimator for metadata requests Currently we estimate that metadata requests take 8000 + rsize * 2 bytes of memory to process, where rsize is the size of the request. Since metadata requests are very small, this end up being roughly 8000 bytes. However, metadata requests which return information about every partition and replica may easily be several MBs in size. To fix this for metadata requests specifically, we use a new more conservative estimate which uses the current topic and partition configuration to give an upper bound on the size. --- src/v/kafka/server/handlers/metadata.cc | 65 +++++++++++++++++++++++++ src/v/kafka/server/handlers/metadata.h | 15 +++++- 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index 4865bb540141..92e034fb4af8 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -19,6 +19,7 @@ #include "kafka/server/handlers/details/leader_epoch.h" #include "kafka/server/handlers/details/security.h" #include "kafka/server/handlers/topics/topic_utils.h" +#include "kafka/server/response.h" #include "kafka/types.h" #include "likely.h" #include "model/metadata.h" @@ -423,4 +424,68 @@ ss::future metadata_handler::handle( co_return co_await ctx.respond(std::move(reply)); } +size_t +metadata_memory_estimator(size_t request_size, connection_context& conn_ctx) { + // We cannot make a precise estimate of the size of a metadata response by + // examining only the size of the request (nor even by examining the entire + // request) since the response depends on the number of partitions in the + // cluster. Instead, we return a conservative estimate based on the current + // number of topics & partitions in the cluster. + + // Essentially we need to estimate the size taken by a "maximum size" + // metadata_response_data response. The maximum size is when metadata for + // all topics is returned, which is also a common case in practice. This + // involves calculating the size for each topic's portion of the response, + // since the size varies both based on the number of partitions and the + // replica count. + + // We start with a base estimate of 10K and then proceed to ignore + // everything other than the topic/partition part of the response, since + // that's what takes space in large responses and we assume the remaining + // part of the response (the broker list being the second largest part) will + // fit in this 10000k slush fund. + size_t size_estimate = 10000; + + auto& md = conn_ctx.server().metadata_cache().all_topics_metadata(); + + for (auto& [tp_ns, topic_metadata] : md) { + // metadata_response_topic + size_estimate += sizeof(kafka::metadata_response_topic); + size_estimate += tp_ns.tp().size(); + + using partition = kafka::metadata_response_partition; + + // Base number of bytes needed to represent each partition, ignoring the + // variable part attributable to the replica count, we just take as the + // size of the partition response structure. + constexpr size_t bytes_per_partition = sizeof(partition); + + // Then, we need the number of additional bytes per replica, per + // partition, associated with storing the replica list in + // metadata_response_partition::replicas/isr_nodes, which we take to + // be the size of the elements in those lists (4 bytes each). + constexpr size_t bytes_per_replica = sizeof(partition::replica_nodes[0]) + + sizeof(partition::isr_nodes[0]); + + // The actual partition and replica count for this topic. + int32_t pcount = topic_metadata.get_configuration().partition_count; + int32_t rcount = topic_metadata.get_configuration().replication_factor; + + size_estimate += pcount + * (bytes_per_partition + bytes_per_replica * rcount); + } + + // Finally, we double the estimate, because the highwater mark for memory + // use comes when the in-memory structures (metadata_response_data and + // subobjects) exist on the heap and they are encoded into the reponse, + // which will also exist on the heap. The calculation above handles the + // first size, and the encoded response ends up being very similar in size, + // so we double the estimate to account for both. + size_estimate *= 2; + + // We still add on the default_estimate to handle the size of the request + // itself and miscellaneous other procesing (this is a small adjustment, + // generally ~8000 bytes). + return default_memory_estimate(request_size) + size_estimate; +} } // namespace kafka diff --git a/src/v/kafka/server/handlers/metadata.h b/src/v/kafka/server/handlers/metadata.h index 8d2336218e5b..bd0e78bb7003 100644 --- a/src/v/kafka/server/handlers/metadata.h +++ b/src/v/kafka/server/handlers/metadata.h @@ -14,6 +14,17 @@ namespace kafka { -using metadata_handler = single_stage_handler; +/** + * Estimate the size of a metadata request. + * + * Metadata requests are generally very small (a request for *all* metadata + * about a cluster is less than 30 bytes) but the response may be very large, so + * the default estimator is unsuitable. See the implementation for further + * notes. + */ +memory_estimate_fn metadata_memory_estimator; + +using metadata_handler + = single_stage_handler; -} +} // namespace kafka From 6cb5a71e009803a818400db6ad6336b01b24171b Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Thu, 14 Jul 2022 13:38:51 -0700 Subject: [PATCH 18/20] Include broker list in metadata estimation Prior to this change, we used only the topic and partition data to estimate the size of the metadata response. Now, we also include the approximate size of the broker metadata portion of the response, which may be important if the list of brokers is very large or they have very long hostnames. --- src/v/kafka/server/handlers/metadata.cc | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index 92e034fb4af8..2667b11176f5 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -14,6 +14,7 @@ #include "cluster/types.h" #include "config/configuration.h" #include "config/node_config.h" +#include "kafka/protocol/schemata/metadata_response.h" #include "kafka/server/errors.h" #include "kafka/server/fwd.h" #include "kafka/server/handlers/details/leader_epoch.h" @@ -446,9 +447,21 @@ metadata_memory_estimator(size_t request_size, connection_context& conn_ctx) { // fit in this 10000k slush fund. size_t size_estimate = 10000; - auto& md = conn_ctx.server().metadata_cache().all_topics_metadata(); + auto& md_cache = conn_ctx.server().metadata_cache(); - for (auto& [tp_ns, topic_metadata] : md) { + // The size will vary with the number of brokers, though this effect is + // probably small if there are large numbers of partitions + + // This covers the variable part of the broker response, i.e., the broker + // hostname + rack We just hope these are less than this amount, because we + // don't want to execute the relatively complex logic to guess the listener + // just for the size estimate. + constexpr size_t extra_bytes_per_broker = 200; + size_estimate + += md_cache.all_brokers().size() + * (sizeof(metadata_response_broker) + extra_bytes_per_broker); + + for (auto& [tp_ns, topic_metadata] : md_cache.all_topics_metadata()) { // metadata_response_topic size_estimate += sizeof(kafka::metadata_response_topic); size_estimate += tp_ns.tp().size(); From f649f7f7f46abd37fa413e5d36ecf032976d3ab9 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Thu, 14 Jul 2022 14:26:23 -0700 Subject: [PATCH 19/20] Add noexcept to handler_for_key. --- src/v/kafka/server/handlers/handler_interface.cc | 2 +- src/v/kafka/server/handlers/handler_interface.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/v/kafka/server/handlers/handler_interface.cc b/src/v/kafka/server/handlers/handler_interface.cc index 156f305bb51b..0551cef7ffb8 100644 --- a/src/v/kafka/server/handlers/handler_interface.cc +++ b/src/v/kafka/server/handlers/handler_interface.cc @@ -125,7 +125,7 @@ constexpr auto make_lut(type_list) { return lut; } -std::optional handler_for_key(kafka::api_key key) { +std::optional handler_for_key(kafka::api_key key) noexcept { static constexpr auto lut = make_lut(request_types{}); if (key >= (short)0 && key < (short)lut.size()) { if (auto handler = lut[key]) { diff --git a/src/v/kafka/server/handlers/handler_interface.h b/src/v/kafka/server/handlers/handler_interface.h index 6f5b368984c8..dc4857e9f993 100644 --- a/src/v/kafka/server/handlers/handler_interface.h +++ b/src/v/kafka/server/handlers/handler_interface.h @@ -106,6 +106,6 @@ using handler = const handler_interface*; * @param key the API key for the handler * @return std::optional the handler, if any */ -std::optional handler_for_key(api_key key); +std::optional handler_for_key(api_key key) noexcept; } // namespace kafka From fd6353f40efc22e433e79b14aa0fabd2e68cabb9 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Thu, 14 Jul 2022 14:30:19 -0700 Subject: [PATCH 20/20] Add NOLINT to use of operator[] We already check the bounds so the cpp core guideline presumably does not apply and we don't want to pay the price for the additional bounds check inside at(). --- src/v/kafka/server/handlers/handler_interface.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/v/kafka/server/handlers/handler_interface.cc b/src/v/kafka/server/handlers/handler_interface.cc index 0551cef7ffb8..44593b0f96e4 100644 --- a/src/v/kafka/server/handlers/handler_interface.cc +++ b/src/v/kafka/server/handlers/handler_interface.cc @@ -128,6 +128,9 @@ constexpr auto make_lut(type_list) { std::optional handler_for_key(kafka::api_key key) noexcept { static constexpr auto lut = make_lut(request_types{}); if (key >= (short)0 && key < (short)lut.size()) { + // We have already checked the bounds above so it is safe to use [] + // instead of at() + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-constant-array-index) if (auto handler = lut[key]) { return handler; }