Skip to content

Commit

Permalink
k/group_metadata: added tests validating serialization of metadata key
Browse files Browse the repository at this point in the history
Added test validating a compatibility of decoding group metadata keys
wrapped in iobuf.

Signed-off-by: Michal Maslanka <michal@vectorized.io>
  • Loading branch information
mmaslankaprv committed Jun 3, 2022
1 parent 19ee4eb commit a352a17
Showing 1 changed file with 58 additions and 0 deletions.
58 changes: 58 additions & 0 deletions src/v/kafka/server/tests/group_metadata_serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
#include <boost/range/irange.hpp>
#include <boost/test/tools/old/interface.hpp>

#include <limits>
#include <optional>

struct fixture {
static ss::logger logger;

Expand Down Expand Up @@ -374,3 +377,58 @@ FIXTURE_TEST(test_consumer_offsets_serializer, fixture) {
BOOST_REQUIRE_EQUAL(offset_key, offset_md_kv.key);
BOOST_REQUIRE_EQUAL(offset_md, offset_md_kv.value);
}

model::record build_tombstone_record(iobuf buffer) {
storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));

builder.add_raw_kv(std::move(buffer), std::nullopt);
auto records = std::move(builder).build().copy_records();
return std::move(records.front());
}

FIXTURE_TEST(test_unwrapping_tombstones_from_iobuf, fixture) {
std::vector<kafka::group_metadata_serializer> serializers;
serializers.reserve(2);
serializers.push_back(kafka::make_consumer_offsets_serializer());
serializers.push_back(kafka::make_backward_compatible_serializer());

for (auto& serializer : serializers) {
kafka::group_metadata_key group_md_key;
group_md_key.group_id = random_named_string<kafka::group_id>();

auto group_kv = serializer.to_kv(kafka::group_metadata_kv{
.key = group_md_key,
});
auto group_tombstone = build_tombstone_record(group_kv.key.copy());
// not wrapped in iobuf
auto decoded_group_md_kv = serializer.decode_group_metadata(
std::move(group_tombstone));

auto iobuf_decoded_group_md_kv = serializer.decode_group_metadata(
build_tombstone_record(reflection::to_iobuf(group_kv.key.copy())));

BOOST_REQUIRE_EQUAL(group_md_key, decoded_group_md_kv.key);
BOOST_REQUIRE_EQUAL(group_md_key, iobuf_decoded_group_md_kv.key);

kafka::offset_metadata_key offset_key;
offset_key.group_id = random_named_string<kafka::group_id>();
;
offset_key.topic = random_named_string<model::topic>();
offset_key.partition = random_named_int<model::partition_id>();

auto offset_kv = serializer.to_kv(kafka::offset_metadata_kv{
.key = offset_key,
});

// not wrapped in iobuf
auto offset_md_kv = serializer.decode_offset_metadata(
build_tombstone_record(offset_kv.key.copy()));

auto iobuf_offset_md_kv = serializer.decode_offset_metadata(
build_tombstone_record(reflection::to_iobuf(offset_kv.key.copy())));

BOOST_REQUIRE_EQUAL(offset_key, offset_md_kv.key);
BOOST_REQUIRE_EQUAL(offset_key, iobuf_offset_md_kv.key);
}
}

0 comments on commit a352a17

Please sign in to comment.