Skip to content

Commit

Permalink
Add the connection context to the memory estimator
Browse files Browse the repository at this point in the history
Passing the connection context to the estimator allows the
estimator to use the various subsystems to estimate the memory
use of a given request.
  • Loading branch information
travisdowns committed Jul 14, 2022
1 parent 8dae609 commit ec67a5b
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ connection_context::reserve_request_units(api_key key, size_t size) {
// case the request is likely for an API we don't support or malformed, so
// it is likely to fail shortly anyway).
auto handler = handler_for_key(key);
auto mem_estimate = handler ? (*handler)->memory_estimate(size)
auto mem_estimate = handler ? (*handler)->memory_estimate(size, *this)
: default_memory_estimate(size);
if (unlikely(mem_estimate >= (size_t)std::numeric_limits<int32_t>::max())) {
// TODO: Create error response using the specific API?
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace kafka {

// sorted
class connection_context;
class coordinator_ntp_mapper;
class fetch_session_cache;
class group_manager;
Expand Down
5 changes: 3 additions & 2 deletions src/v/kafka/server/handlers/details/any_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ struct any_handler_base final : public any_handler_t {
api_key key() const override { return _info._key; }
const char* name() const override { return _info._name; }

size_t memory_estimate(size_t request_size) const override {
return _info._mem_estimate(request_size);
size_t memory_estimate(
size_t request_size, connection_context& conn_ctx) const override {
return _info._mem_estimate(request_size, conn_ctx);
}
/**
* Only handle varies with one or two pass, since one pass handlers
Expand Down
11 changes: 5 additions & 6 deletions src/v/kafka/server/handlers/details/any_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,14 @@ struct any_handler_t {
* out-of-memory condition, while a too-large estimate will "merely" reduce
* performance.
*
* Handers may also return an initial, small estimate here covering the
* first part of processing, then dynamically increase their memory
* allocation later on during processing when the full memory size is known.
*
* Unfortunately, this estimate happens early in the decoding process, after
* only the request size and header has been read, so handlers don't have
* as much information as they may like to make this decision.
* as much information as they may like to make this decision. The
* connection_context for the associated connection is passed to give access
* to global state which may be useful in making the estimate.
*/
virtual size_t memory_estimate(size_t request_size) const = 0;
virtual size_t memory_estimate(
size_t request_size, connection_context& conn_ctx) const = 0;

/**
* @brief Handles the request.
Expand Down
17 changes: 12 additions & 5 deletions src/v/kafka/server/handlers/handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#pragma once
#include "kafka/protocol/types.h"
#include "kafka/server/fwd.h"
#include "kafka/server/request_context.h"
#include "kafka/server/response.h"
#include "kafka/types.h"
Expand All @@ -18,7 +19,12 @@

namespace kafka {

using memory_estimate_fn = size_t(size_t);
using memory_estimate_fn = size_t(size_t, connection_context&);

constexpr size_t
default_estimate_adaptor(size_t request_size, connection_context&) {
return default_memory_estimate(request_size);
}

/**
* Handlers are generally specializations of this template, via one of the
Expand All @@ -45,8 +51,9 @@ struct handler_template {
/**
* See any_handler_t::memory_estimate for a description of this function.
*/
static size_t memory_estimate(size_t request_size) {
return MemEstimator(request_size);
static size_t
memory_estimate(size_t request_size, connection_context& conn_ctx) {
return MemEstimator(request_size, conn_ctx);
}
};

Expand All @@ -58,7 +65,7 @@ template<
typename RequestApi,
api_version::type MinSupported,
api_version::type MaxSupported,
memory_estimate_fn MemEstimator = default_memory_estimate>
memory_estimate_fn MemEstimator = default_estimate_adaptor>
using handler = handler_template<
RequestApi,
MinSupported,
Expand All @@ -77,7 +84,7 @@ template<
typename RequestApi,
api_version::type MinSupported,
api_version::type MaxSupported,
memory_estimate_fn MemEstimator = default_memory_estimate>
memory_estimate_fn MemEstimator = default_estimate_adaptor>
using two_phase_handler = handler_template<
RequestApi,
MinSupported,
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/handlers/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "config/configuration.h"
#include "config/node_config.h"
#include "kafka/server/errors.h"
#include "kafka/server/fwd.h"
#include "kafka/server/handlers/details/leader_epoch.h"
#include "kafka/server/handlers/details/security.h"
#include "kafka/server/handlers/topics/topic_utils.h"
Expand Down

0 comments on commit ec67a5b

Please sign in to comment.