diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index c31b2b37b7fea..444ba2d1a8e42 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -272,7 +272,7 @@ connection_context::reserve_request_units(api_key key, size_t size) { // case the request is likely for an API we don't support or malformed, so // it is likely to fail shortly anyway). auto handler = handler_for_key(key); - auto mem_estimate = handler ? (*handler)->memory_estimate(size) + auto mem_estimate = handler ? (*handler)->memory_estimate(size, *this) : default_memory_estimate(size); if (unlikely(mem_estimate >= (size_t)std::numeric_limits::max())) { // TODO: Create error response using the specific API? diff --git a/src/v/kafka/server/fwd.h b/src/v/kafka/server/fwd.h index 4aa0a30909ec1..e034d9f650847 100644 --- a/src/v/kafka/server/fwd.h +++ b/src/v/kafka/server/fwd.h @@ -14,6 +14,7 @@ namespace kafka { // sorted +class connection_context; class coordinator_ntp_mapper; class fetch_session_cache; class group_manager; diff --git a/src/v/kafka/server/handlers/details/any_handler.cc b/src/v/kafka/server/handlers/details/any_handler.cc index d245ceb988f4f..280f6602fa655 100644 --- a/src/v/kafka/server/handlers/details/any_handler.cc +++ b/src/v/kafka/server/handlers/details/any_handler.cc @@ -71,8 +71,9 @@ struct any_handler_base final : public any_handler_t { api_key key() const override { return _info._key; } const char* name() const override { return _info._name; } - size_t memory_estimate(size_t request_size) const override { - return _info._mem_estimate(request_size); + size_t memory_estimate( + size_t request_size, connection_context& conn_ctx) const override { + return _info._mem_estimate(request_size, conn_ctx); } /** * Only handle varies with one or two pass, since one pass handlers diff --git a/src/v/kafka/server/handlers/details/any_handler.h b/src/v/kafka/server/handlers/details/any_handler.h index 1b2e2fac06778..7da4bf7666061 100644 --- a/src/v/kafka/server/handlers/details/any_handler.h +++ b/src/v/kafka/server/handlers/details/any_handler.h @@ -55,15 +55,14 @@ struct any_handler_t { * out-of-memory condition, while a too-large estimate will "merely" reduce * performance. * - * Handers may also return an initial, small estimate here covering the - * first part of processing, then dynamically increase their memory - * allocation later on during processing when the full memory size is known. - * * Unfortunately, this estimate happens early in the decoding process, after * only the request size and header has been read, so handlers don't have - * as much information as they may like to make this decision. + * as much information as they may like to make this decision. The + * connection_context for the associated connection is passed to give access + * to global state which may be useful in making the estimate. */ - virtual size_t memory_estimate(size_t request_size) const = 0; + virtual size_t memory_estimate( + size_t request_size, connection_context& conn_ctx) const = 0; /** * @brief Handles the request. diff --git a/src/v/kafka/server/handlers/handler.h b/src/v/kafka/server/handlers/handler.h index 263bc6fd9077a..129b91ba9aea2 100644 --- a/src/v/kafka/server/handlers/handler.h +++ b/src/v/kafka/server/handlers/handler.h @@ -10,6 +10,7 @@ */ #pragma once #include "kafka/protocol/types.h" +#include "kafka/server/fwd.h" #include "kafka/server/request_context.h" #include "kafka/server/response.h" #include "kafka/types.h" @@ -18,7 +19,12 @@ namespace kafka { -using memory_estimate_fn = size_t(size_t); +using memory_estimate_fn = size_t(size_t, connection_context&); + +constexpr size_t +default_estimate_adaptor(size_t request_size, connection_context&) { + return default_memory_estimate(request_size); +} /** * Handlers are generally specializations of this template, via one of the @@ -45,8 +51,9 @@ struct handler_template { /** * See any_handler_t::memory_estimate for a description of this function. */ - static size_t memory_estimate(size_t request_size) { - return MemEstimator(request_size); + static size_t + memory_estimate(size_t request_size, connection_context& conn_ctx) { + return MemEstimator(request_size, conn_ctx); } }; @@ -58,7 +65,7 @@ template< typename RequestApi, api_version::type MinSupported, api_version::type MaxSupported, - memory_estimate_fn MemEstimator = default_memory_estimate> + memory_estimate_fn MemEstimator = default_estimate_adaptor> using handler = handler_template< RequestApi, MinSupported, @@ -77,7 +84,7 @@ template< typename RequestApi, api_version::type MinSupported, api_version::type MaxSupported, - memory_estimate_fn MemEstimator = default_memory_estimate> + memory_estimate_fn MemEstimator = default_estimate_adaptor> using two_phase_handler = handler_template< RequestApi, MinSupported, diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index a372d5b6a4fec..7a6e674cc4987 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -15,6 +15,7 @@ #include "config/configuration.h" #include "config/node_config.h" #include "kafka/server/errors.h" +#include "kafka/server/fwd.h" #include "kafka/server/handlers/details/leader_epoch.h" #include "kafka/server/handlers/details/security.h" #include "kafka/server/handlers/topics/topic_utils.h"