Skip to content

Commit

Permalink
Introduce type-erased any_handler
Browse files Browse the repository at this point in the history
This is a polymorphic handler each of which is backed by an existing
concrete handler, but which lets us treat handlers generically without
restorting to template functions.

This reduces code bloat significantly as we do not duplicate code paths
for our ~45 handler types.

For example, requests.cc.o drops from ~11 MB to ~5 MB after it is
switched to the any_handler approach.
  • Loading branch information
travisdowns committed Jul 5, 2022
1 parent eb23bb9 commit db9de3c
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/v/kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ set(handlers_srcs
server/handlers/delete_acls.cc
server/handlers/create_partitions.cc
server/handlers/offset_for_leader_epoch.cc
server/handlers/details/any_handler.cc
server/handlers/topics/types.cc
server/handlers/topics/topic_utils.cc
)
Expand Down
126 changes: 126 additions & 0 deletions src/v/kafka/server/handlers/details/any_handler.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2022 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#include "kafka/server/handlers/details/any_handler.h"

#include "kafka/server/handlers/handlers.h"
#include "kafka/server/handlers/produce.h"
#include "kafka/types.h"

#include <optional>

namespace kafka {

/**
* @brief Packages together basic information common to every handler.
*/
struct handler_info {
handler_info(
api_key key,
const char* name,
api_version min_api,
api_version max_api) noexcept
: _key(key)
, _name(name)
, _min_api(min_api)
, _max_api(max_api) {}

api_key _key;
const char* _name;
api_version _min_api, _max_api;
};

/**
* @brief Creates a type-erased handler implementation given info and a handle
* method.
*
* There are only two variants of this handler, for one and two pass
* implementations.
* This keeps the generated code duplication to a minimum, compared to
* templating this on the handler type.
*
* @tparam is_two_pass true if the handler is two-pass
*/
template<bool is_two_pass>
struct any_handler_base final : public any_handler_t {
using single_pass_handler
= ss::future<response_ptr>(request_context, ss::smp_service_group);
using two_pass_handler
= process_result_stages(request_context, ss::smp_service_group);
using fn_type
= std::conditional_t<is_two_pass, two_pass_handler, single_pass_handler>;

any_handler_base(const handler_info& info, fn_type* handle_fn) noexcept
: _info(info)
, _handle_fn(handle_fn) {}

api_version min_supported() const override { return _info._min_api; }
api_version max_supported() const override { return _info._max_api; }

api_key key() const override { return _info._key; }
const char* name() const override { return _info._name; }

/**
* Only handle varies with one or two pass, since one pass handlers
* must pass through single_stage() to covert them to two-pass.
*/
process_result_stages
handle(request_context&& rc, ss::smp_service_group g) const override {
if constexpr (is_two_pass) {
return _handle_fn(std::move(rc), g);
} else {
return process_result_stages::single_stage(
_handle_fn(std::move(rc), g));
}
}

private:
handler_info _info;
fn_type* _handle_fn;
};

/**
* @brief Instance holder for the any_handler_base.
*
* Given a handler type H, exposes a static instance of the assoicated handler
* base object.
*
* @tparam H the handler type.
*/
template<KafkaApiHandlerAny H>
struct any_handler_adaptor {
static const inline any_handler_base<KafkaApiTwoPhaseHandler<H>> instance{
handler_info{
H::api::key, H::api::name, H::min_supported, H::max_supported},
H::handle};
};

template<typename... Ts>
constexpr auto make_lut(type_list<Ts...>) {
constexpr int max_index = std::max({Ts::api::key...});
static_assert(max_index < sizeof...(Ts) * 10, "LUT is too sparse");

std::array<any_handler, max_index + 1> lut{};
((lut[Ts::api::key] = &any_handler_adaptor<Ts>::instance), ...);

return lut;
}

std::optional<any_handler> handler_for_key(kafka::api_key key) {
static constexpr auto lut = make_lut(request_types{});
if (key >= (short)0 && key < (short)lut.size()) {
if (auto handler = lut.at(key)) {
return handler;
}
}
return std::nullopt;
}

} // namespace kafka
91 changes: 91 additions & 0 deletions src/v/kafka/server/handlers/details/any_handler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2022 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#pragma once
#include "kafka/server/fwd.h"
#include "kafka/server/response.h"
#include "kafka/types.h"

namespace kafka {
/**
* @brief Runtime polymorphic handler type.
*
* Allows access to all kafka request handling implementations though a
* type erased interface. This avoids the need to bring every handler
* type into scope and make everything that touches the handler a template
* function on the handler type.
*
*/
struct any_handler_t {
/**
* @brief The minimum supported API version, inclusive.
*/
virtual api_version min_supported() const = 0;

/**
* @brief The maximum supported API version, inclusive.
*/
virtual api_version max_supported() const = 0;

/**
* @brief The name of the API method.
*/
virtual const char* name() const = 0;

/**
* @brief The API key associated with the method.
*/
virtual api_key key() const = 0;

/**
* @brief Handles the request.
*
* Invokes the request handler with the given request context
* (which will be moved from) and smp_service_groups.
*
* The result stages objects contains futures for both the initial
* dispatch phase, and the find response. For API methods which
* are implemented a single phase, the same type is returned, but
* the response future will complete as soon as the dispatch one does.
*
* @return process_result_stages representing the future completion of
* the handler.
*/
virtual process_result_stages
handle(request_context&&, ss::smp_service_group) const = 0;

virtual ~any_handler_t() = default;
};

/**
* @brief Pointer to a handler.
*
* Most code will use any_handler objects, which are simply pointers
* to handlers, generally const objects with static storage duration
* obtained from handler_for_key.
*/
using any_handler = const any_handler_t*;

/**
* @brief Return a handler for the given key, if any.
*
* Returns a pointer to a constant singleton handler for the given
* key, or an empty optional if no such handler exists. The contained
* any_hanlder is guaranteed to be non-null if the optional as a value.
*
* This method looks up the handler in a table populated by all handlers
* in kafka::request_types.
*
* @param key the API key for the handler
* @return std::optional<any_handler> the handler, if any
*/
std::optional<any_handler> handler_for_key(api_key key);

} // namespace kafka

0 comments on commit db9de3c

Please sign in to comment.