Skip to content

Commit

Permalink
k/protocol/test: Added unit test for optional tag values
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Boquard <[email protected]>
(cherry picked from commit e77a935)
  • Loading branch information
michael-redpanda committed Jun 26, 2023
1 parent 942d23b commit 9ba3cee
Showing 1 changed file with 136 additions and 0 deletions.
136 changes: 136 additions & 0 deletions src/v/kafka/protocol/tests/protocol_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,144 @@ void check_all_requests(kafka::type_list<Ts...>) {
(check_proto_compat<Ts>(), ...);
}

template<typename Request, api_version::type version, bool is_request>
requires(KafkaApiHandler<Request>) struct tag_field_entry {
using api = typename Request::api;
static constexpr api_version test_version = api_version(version);
static constexpr bool request = is_request;
};

// Instructions on adding optional tag field messages
// When a new kafka message is added that contains optional tag fields,
// you will do the following:
// 1. Add the entry to `tag_field_entries`, providing
// a. The handler
// b. minimum api version
// c. true for request, false for response (needs to be bool for constexpr)
// 2. Create a specialized `create_default_and_non_default_data` function
// a. Have it set non-default values and default values for each
// b. Provide the difference in size the encoded buffers will be

using tag_field_entries = kafka::type_list<
tag_field_entry<create_topics_handler, 5, false>,
tag_field_entry<api_versions_handler, 3, false>>;

template<typename T>
long create_default_and_non_default_data(T& non_default_data, T& default_data);

template<>
long create_default_and_non_default_data(
decltype(create_topics_response::data)& non_default_data,
decltype(create_topics_response::data)& default_data) {
non_default_data.throttle_time_ms = std::chrono::milliseconds(1000);

non_default_data.topics.emplace_back(creatable_topic_result{
model::topic{"topic1"},
{},
kafka::error_code{1},
"test_error_message",
3,
16,
std::nullopt,
kafka::error_code{2}});

default_data = non_default_data;

default_data.topics.at(0).topic_config_error_code = kafka::error_code{0};

// int16 (2 bytes) + tag (2 bytes)
return 4;
}

template<>
long create_default_and_non_default_data(
decltype(api_versions_response::data)& non_default_data,
decltype(api_versions_response::data)& default_data) {
non_default_data.finalized_features_epoch = 0;
default_data = non_default_data;
default_data.finalized_features_epoch = -1;

// int64 (8 bytes) + tag (2 bytes)
return 10;
}

template<typename T>
bool validate_buffer_against_data(
const T& check_data, api_version version, const bytes& buffer) {
T r;
if constexpr (HasPrimitiveDecode<decltype(r)>) {
r.decode(bytes_to_iobuf(buffer), version);
} else {
kafka::request_reader rdr(bytes_to_iobuf(buffer));
r.decode(rdr, version);
}

return r == check_data;
}

template<typename T>
void check_kafka_tag_format(api_version version, bool is_request) {
decltype(T::data) non_default_data{};
decltype(T::data) default_data{};

auto size_diff = create_default_and_non_default_data(
non_default_data, default_data);

BOOST_REQUIRE_NE(non_default_data, default_data);

bytes non_default_encoded, default_encoded;

{
iobuf iob;
kafka::response_writer rw(iob);
non_default_data.encode(rw, version);
non_default_encoded = iobuf_to_bytes(iob);
}

{
iobuf iob;
kafka::response_writer rw(iob);
default_data.encode(rw, version);
default_encoded = iobuf_to_bytes(iob);
}

BOOST_CHECK_EQUAL(
non_default_encoded.size() - default_encoded.size(), size_diff);

BOOST_TEST_CHECK(validate_buffer_against_data(
non_default_data, version, non_default_encoded));
BOOST_TEST_CHECK(
validate_buffer_against_data(default_data, version, default_encoded));
}

template<typename T>
concept FieldEntry = kafka::KafkaApi<typename T::api> && requires(T t) {
{ T::test_version } -> std::convertible_to<const api_version&>;
{ T::request } -> std::convertible_to<const bool>;
};

template<FieldEntry T>
void check_tag_request() {
if constexpr (T::request) {
check_kafka_tag_format<typename T::api::request_type>(
T::test_version, T::request);
} else {
check_kafka_tag_format<typename T::api::response_type>(
T::test_version, T::request);
}
}

template<typename... Ts>
void check_all_tag_requests(kafka::type_list<Ts...>) {
(check_tag_request<Ts>(), ...);
}

BOOST_AUTO_TEST_CASE(test_kafka_protocol_compat) {
check_all_requests(kafka::request_types{});
}

BOOST_AUTO_TEST_CASE(test_optional_tag_values) {
check_all_tag_requests(tag_field_entries{});
}

} // namespace kafka

0 comments on commit 9ba3cee

Please sign in to comment.