Skip to content

Commit

Permalink
Introduce handler template for two-stage handlers
Browse files Browse the repository at this point in the history
Single-stage handlers have a hander template which means that handler
objects can be declared in a single line specifying their api object,
min and max API versions.

This change extends this nice concept to two-stage handlers as well.
  • Loading branch information
travisdowns committed Jul 14, 2022
1 parent d09152e commit 8dae609
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 41 deletions.
54 changes: 50 additions & 4 deletions src/v/kafka/server/handlers/handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,28 @@ namespace kafka {

using memory_estimate_fn = size_t(size_t);

/**
* Handlers are generally specializations of this template, via one of the
* two aliases (handler or two_phase_hander) declared below, though it is
* not strictly necessary (only conforming to one of the two KafkaApi*
* concepts is needed).
*
* The benefit of this template is that it takes care of the most of the
* handler boilerplate.
*/
template<
typename RequestApi,
api_version::type MinSupported,
api_version::type MaxSupported,
memory_estimate_fn MemEstimator = default_memory_estimate>
struct handler {
typename HandleRetType,
memory_estimate_fn MemEstimator>
struct handler_template {
using api = RequestApi;
static constexpr api_version min_supported = api_version(MinSupported);
static constexpr api_version max_supported = api_version(MaxSupported);
static ss::future<response_ptr>
handle(request_context, ss::smp_service_group);

static HandleRetType handle(request_context, ss::smp_service_group);

/**
* See any_handler_t::memory_estimate for a description of this function.
*/
Expand All @@ -39,6 +50,41 @@ struct handler {
}
};

/**
* A single-stage handler implements the entire request handling in the initial
* stage which occurs before any subsequent request is processed.
*/
template<
typename RequestApi,
api_version::type MinSupported,
api_version::type MaxSupported,
memory_estimate_fn MemEstimator = default_memory_estimate>
using handler = handler_template<
RequestApi,
MinSupported,
MaxSupported,
ss::future<response_ptr>,
MemEstimator>;

/**
* A two-stage handler has an initial stage which happens before any other
* request can start processing (as in a single-stage handler) but then also has
* a second stage which is processed in the background allowing other requests
* on the same connection to start their handler. Responses are still sent in
* order, but processing is out-of-order.
*/
template<
typename RequestApi,
api_version::type MinSupported,
api_version::type MaxSupported,
memory_estimate_fn MemEstimator = default_memory_estimate>
using two_phase_handler = handler_template<
RequestApi,
MinSupported,
MaxSupported,
process_result_stages,
MemEstimator>;

template<typename T>
concept KafkaApiHandler = KafkaApi<typename T::api> && requires(
T h, request_context&& ctx, ss::smp_service_group g) {
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/handlers/join_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ static void decode_request(request_context& ctx, join_group_request& req) {
fmt::format("{}", ctx.connection()->client_host()));
}

template<>
process_result_stages join_group_handler::handle(
request_context ctx, [[maybe_unused]] ss::smp_service_group g) {
join_group_request request;
Expand Down
11 changes: 2 additions & 9 deletions src/v/kafka/server/handlers/join_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@

namespace kafka {

struct join_group_handler {
using api = join_group_api;
static constexpr api_version min_supported = api_version(0);
static constexpr api_version max_supported = api_version(5);
static process_result_stages handle(request_context, ss::smp_service_group);
static size_t memory_estimate(size_t request_size) {
return default_memory_estimate(request_size);
}
};
using join_group_handler = two_phase_handler<join_group_api, 0, 5>;

} // namespace kafka
1 change: 1 addition & 0 deletions src/v/kafka/server/handlers/offset_commit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct offset_commit_ctx {
, ssg(ssg) {}
};

template<>
process_result_stages
offset_commit_handler::handle(request_context ctx, ss::smp_service_group ssg) {
offset_commit_request request;
Expand Down
11 changes: 2 additions & 9 deletions src/v/kafka/server/handlers/offset_commit.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ namespace kafka {
// in version 0 kafka stores offsets in zookeeper. if we ever need to
// support version 0 then we need to do some code review to see if this has
// any implications on semantics.
struct offset_commit_handler {
using api = offset_commit_api;
static constexpr api_version min_supported = api_version(1);
static constexpr api_version max_supported = api_version(7);
static process_result_stages handle(request_context, ss::smp_service_group);
static size_t memory_estimate(size_t request_size) {
return default_memory_estimate(request_size);
}
};
using offset_commit_handler = two_phase_handler<offset_commit_api, 1, 7>;

} // namespace kafka
3 changes: 3 additions & 0 deletions src/v/kafka/server/handlers/produce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

namespace kafka {

static constexpr auto despam_interval = std::chrono::minutes(5);

produce_response produce_request::make_error_response(error_code error) const {
produce_response response;

Expand Down Expand Up @@ -464,6 +466,7 @@ static std::vector<topic_produce_stages> produce_topics(produce_ctx& octx) {
return topics;
}

template<>
process_result_stages
produce_handler::handle(request_context ctx, ss::smp_service_group ssg) {
produce_request request;
Expand Down
11 changes: 1 addition & 10 deletions src/v/kafka/server/handlers/produce.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,6 @@

namespace kafka {

struct produce_handler {
using api = produce_api;
static constexpr api_version min_supported = api_version(0);
static constexpr api_version max_supported = api_version(7);
static process_result_stages handle(request_context, ss::smp_service_group);
static constexpr auto despam_interval = std::chrono::minutes(5);
static size_t memory_estimate(size_t request_size) {
return default_memory_estimate(request_size);
}
};
using produce_handler = two_phase_handler<produce_api, 0, 7>;

} // namespace kafka
1 change: 1 addition & 0 deletions src/v/kafka/server/handlers/sync_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

namespace kafka {

template<>
process_result_stages sync_group_handler::handle(
request_context ctx, [[maybe_unused]] ss::smp_service_group g) {
sync_group_request request;
Expand Down
10 changes: 1 addition & 9 deletions src/v/kafka/server/handlers/sync_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@

namespace kafka {

struct sync_group_handler {
using api = sync_group_api;
static constexpr api_version min_supported = api_version(0);
static constexpr api_version max_supported = api_version(3);
static process_result_stages handle(request_context, ss::smp_service_group);
static size_t memory_estimate(size_t request_size) {
return default_memory_estimate(request_size);
}
};
using sync_group_handler = two_phase_handler<sync_group_api, 0, 3>;

} // namespace kafka

0 comments on commit 8dae609

Please sign in to comment.