From 7bb22201bcad85ffc1d0a7343f8fcc64694068a3 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 2 Jun 2022 19:56:28 +0200 Subject: [PATCH] k/group_metadata: added tests validating serialization of metadata key Added test validating a compatibility of decoding group metadata keys wrapped in iobuf. Signed-off-by: Michal Maslanka (cherry picked from commit a352a1710de499329487f77e4c649a4276d15244) --- .../group_metadata_serialization_test.cc | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/src/v/kafka/server/tests/group_metadata_serialization_test.cc b/src/v/kafka/server/tests/group_metadata_serialization_test.cc index 6c78de55b5d70..e4d8984992951 100644 --- a/src/v/kafka/server/tests/group_metadata_serialization_test.cc +++ b/src/v/kafka/server/tests/group_metadata_serialization_test.cc @@ -29,6 +29,9 @@ #include #include +#include +#include + struct fixture { static ss::logger logger; @@ -374,3 +377,58 @@ FIXTURE_TEST(test_consumer_offsets_serializer, fixture) { BOOST_REQUIRE_EQUAL(offset_key, offset_md_kv.key); BOOST_REQUIRE_EQUAL(offset_md, offset_md_kv.value); } + +model::record build_tombstone_record(iobuf buffer) { + storage::record_batch_builder builder( + model::record_batch_type::raft_data, model::offset(0)); + + builder.add_raw_kv(std::move(buffer), std::nullopt); + auto records = std::move(builder).build().copy_records(); + return std::move(records.front()); +} + +FIXTURE_TEST(test_unwrapping_tombstones_from_iobuf, fixture) { + std::vector serializers; + serializers.reserve(2); + serializers.push_back(kafka::make_consumer_offsets_serializer()); + serializers.push_back(kafka::make_backward_compatible_serializer()); + + for (auto& serializer : serializers) { + kafka::group_metadata_key group_md_key; + group_md_key.group_id = random_named_string(); + + auto group_kv = serializer.to_kv(kafka::group_metadata_kv{ + .key = group_md_key, + }); + auto group_tombstone = build_tombstone_record(group_kv.key.copy()); + // not wrapped in iobuf + auto decoded_group_md_kv = serializer.decode_group_metadata( + std::move(group_tombstone)); + + auto iobuf_decoded_group_md_kv = serializer.decode_group_metadata( + build_tombstone_record(reflection::to_iobuf(group_kv.key.copy()))); + + BOOST_REQUIRE_EQUAL(group_md_key, decoded_group_md_kv.key); + BOOST_REQUIRE_EQUAL(group_md_key, iobuf_decoded_group_md_kv.key); + + kafka::offset_metadata_key offset_key; + offset_key.group_id = random_named_string(); + ; + offset_key.topic = random_named_string(); + offset_key.partition = random_named_int(); + + auto offset_kv = serializer.to_kv(kafka::offset_metadata_kv{ + .key = offset_key, + }); + + // not wrapped in iobuf + auto offset_md_kv = serializer.decode_offset_metadata( + build_tombstone_record(offset_kv.key.copy())); + + auto iobuf_offset_md_kv = serializer.decode_offset_metadata( + build_tombstone_record(reflection::to_iobuf(offset_kv.key.copy()))); + + BOOST_REQUIRE_EQUAL(offset_key, offset_md_kv.key); + BOOST_REQUIRE_EQUAL(offset_key, iobuf_offset_md_kv.key); + } +}