diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 8c8a1b9ad272..c6262d037c85 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -2159,7 +2159,7 @@ void add_offset_tombstone_record( .partition = tp.partition, }; auto kv = serializer.to_kv(offset_metadata_kv{.key = std::move(key)}); - builder.add_raw_kv(reflection::to_iobuf(std::move(kv.key)), std::nullopt); + builder.add_raw_kv(std::move(kv.key), std::nullopt); } void add_group_tombstone_record( @@ -2170,7 +2170,7 @@ void add_group_tombstone_record( .group_id = group, }; auto kv = serializer.to_kv(group_metadata_kv{.key = std::move(key)}); - builder.add_raw_kv(reflection::to_iobuf(std::move(kv.key)), std::nullopt); + builder.add_raw_kv(std::move(kv.key), std::nullopt); } } // namespace diff --git a/src/v/kafka/server/group_metadata.cc b/src/v/kafka/server/group_metadata.cc index ccc5587e3451..ecdce3d5c3ce 100644 --- a/src/v/kafka/server/group_metadata.cc +++ b/src/v/kafka/server/group_metadata.cc @@ -12,8 +12,11 @@ #include "kafka/server/group_metadata.h" #include "bytes/bytes.h" +#include "bytes/iobuf.h" +#include "bytes/iobuf_parser.h" #include "kafka/protocol/request_reader.h" #include "kafka/protocol/response_writer.h" +#include "kafka/server/logger.h" #include "kafka/types.h" #include "model/adl_serde.h" #include "model/fundamental.h" @@ -21,6 +24,7 @@ #include "reflection/adl.h" #include "utils/to_string.h" #include "vassert.h" +#include "vlog.h" #include #include @@ -239,13 +243,84 @@ iobuf metadata_to_iobuf(const T& t) { T::encode(writer, t); return buffer; } +iobuf maybe_unwrap_from_iobuf(iobuf buffer) { + /* + * Previously in redpanda we had an error leading to wrapping + * tombstone record keys with an iobuf serialization envelope. In + * order to provide compatibility and be able to read keys wrapped + * with an additional iobuf we introduce a check leveraging + * serialization format. + * + * # New serializer format - __consumer_offset topic + * + * + * In Kafka the only allowed values for gron metadata versions are + * 0, 1, and 2. Group metadata use big endian encoding hence the + * first two bytes of a serialized group metadata key contains + * information about a version. Both offset_metadata_key and + * group_metadata_key first serialized field is a `group_id` string + * which is encoded as 2 bytes of size and then array of characters. + * This way 4 first bytes of encoded key are: + * + * b0 - denotes LSB + * bN - denotes N byte indexed from LSB to MSB (LSB has index 0) + * + * Key bytes (big endian): + * + * |version_b1|version_b0|size_b1|size_b0|... + * + * When key struct is wrapped with an iobuf serialization it is + * prefixed with iobuf size which is serialized as 4 bytes of a size + * (in a little endian encoding) and then the iobuf content + * + * iobuf size bytes (little endian): + * + * |size_b0|size_b1|size_b2|size_b3|... + * + * To check if metadata key was wrapped with an iobuf we peek first + * 4 bytes to try to decode iobuf size store in little endian format + * if returned size is equal to size of remaining key buffer size it + * means that we need to unwrap key from an iobuf. + * + * # Old serializer format + * + * When serialized with previous `kafka_internal/group` serializer a key was + * represented by + * + * struct group_log_record_key { + * enum class type : int8_t { group_metadata, offset_commit, noop }; + * type record_type; + * iobuf key; + * }; + * + * When serialized with ADL its binary representation is as follows + * + * |record_tp_b0|size_b0|size_b1|size_b2|... + * + * Those 4 first bytes will be decoded as an iobuf size. + * + * TODO: remove this code in future releases + */ + + iobuf_const_parser parser(buffer); + // peek first 4-byte + auto deserialized_size = reflection::from_iobuf(parser.peek(4)); + + if (unlikely( + buffer.size_bytes() == static_cast(deserialized_size) + 4)) { + vlog(klog.debug, "Unwrapping group metadata key from iobuf"); + // unwrap from iobuf + return reflection::from_iobuf(std::move(buffer)); + } + return buffer; +} } // namespace group_metadata_serializer make_backward_compatible_serializer() { struct impl : group_metadata_serializer::impl { group_metadata_type get_metadata_type(iobuf buffer) final { auto key = reflection::from_iobuf( - std::move(buffer)); + maybe_unwrap_from_iobuf(std::move(buffer))); switch (key.record_type) { case old::group_log_record_key::type::offset_commit: return group_metadata_type::offset_commit; @@ -320,7 +395,7 @@ group_metadata_serializer make_backward_compatible_serializer() { group_metadata_kv decode_group_metadata(model::record record) final { group_metadata_kv ret; auto record_key = reflection::from_iobuf( - record.release_key()); + maybe_unwrap_from_iobuf(record.release_key())); auto group_id = kafka::group_id( reflection::from_iobuf( std::move(record_key.key))); @@ -363,7 +438,7 @@ group_metadata_serializer make_backward_compatible_serializer() { offset_metadata_kv decode_offset_metadata(model::record r) final { offset_metadata_kv ret; auto record_key = reflection::from_iobuf( - r.release_key()); + maybe_unwrap_from_iobuf(r.release_key())); auto key = reflection::from_iobuf( std::move(record_key.key)); @@ -394,7 +469,8 @@ group_metadata_serializer make_backward_compatible_serializer() { group_metadata_serializer make_consumer_offsets_serializer() { struct impl final : group_metadata_serializer::impl { group_metadata_type get_metadata_type(iobuf buffer) final { - auto reader = request_reader(std::move(buffer)); + auto reader = request_reader( + maybe_unwrap_from_iobuf(std::move(buffer))); return decode_metadata_type(reader); }; @@ -421,7 +497,8 @@ group_metadata_serializer make_consumer_offsets_serializer() { group_metadata_kv decode_group_metadata(model::record record) final { group_metadata_kv ret; - request_reader k_reader(record.release_key()); + request_reader k_reader( + maybe_unwrap_from_iobuf(record.release_key())); ret.key = group_metadata_key::decode(k_reader); if (record.has_value()) { request_reader v_reader(record.release_value()); @@ -433,7 +510,8 @@ group_metadata_serializer make_consumer_offsets_serializer() { offset_metadata_kv decode_offset_metadata(model::record record) final { offset_metadata_kv ret; - request_reader k_reader(record.release_key()); + request_reader k_reader( + maybe_unwrap_from_iobuf(record.release_key())); ret.key = offset_metadata_key::decode(k_reader); if (record.has_value()) { request_reader v_reader(record.release_value()); diff --git a/src/v/kafka/server/group_recovery_consumer.cc b/src/v/kafka/server/group_recovery_consumer.cc index 0852dbef8dc5..f010e4c266a5 100644 --- a/src/v/kafka/server/group_recovery_consumer.cc +++ b/src/v/kafka/server/group_recovery_consumer.cc @@ -141,7 +141,7 @@ void group_recovery_consumer::handle_record(model::record r) { } catch (...) { vlog( klog.error, - "error handling record record - {}", + "error handling group metadata record - {}", std::current_exception()); } } diff --git a/src/v/kafka/server/tests/group_metadata_serialization_test.cc b/src/v/kafka/server/tests/group_metadata_serialization_test.cc index 6c78de55b5d7..e4d898499295 100644 --- a/src/v/kafka/server/tests/group_metadata_serialization_test.cc +++ b/src/v/kafka/server/tests/group_metadata_serialization_test.cc @@ -29,6 +29,9 @@ #include #include +#include +#include + struct fixture { static ss::logger logger; @@ -374,3 +377,58 @@ FIXTURE_TEST(test_consumer_offsets_serializer, fixture) { BOOST_REQUIRE_EQUAL(offset_key, offset_md_kv.key); BOOST_REQUIRE_EQUAL(offset_md, offset_md_kv.value); } + +model::record build_tombstone_record(iobuf buffer) { + storage::record_batch_builder builder( + model::record_batch_type::raft_data, model::offset(0)); + + builder.add_raw_kv(std::move(buffer), std::nullopt); + auto records = std::move(builder).build().copy_records(); + return std::move(records.front()); +} + +FIXTURE_TEST(test_unwrapping_tombstones_from_iobuf, fixture) { + std::vector serializers; + serializers.reserve(2); + serializers.push_back(kafka::make_consumer_offsets_serializer()); + serializers.push_back(kafka::make_backward_compatible_serializer()); + + for (auto& serializer : serializers) { + kafka::group_metadata_key group_md_key; + group_md_key.group_id = random_named_string(); + + auto group_kv = serializer.to_kv(kafka::group_metadata_kv{ + .key = group_md_key, + }); + auto group_tombstone = build_tombstone_record(group_kv.key.copy()); + // not wrapped in iobuf + auto decoded_group_md_kv = serializer.decode_group_metadata( + std::move(group_tombstone)); + + auto iobuf_decoded_group_md_kv = serializer.decode_group_metadata( + build_tombstone_record(reflection::to_iobuf(group_kv.key.copy()))); + + BOOST_REQUIRE_EQUAL(group_md_key, decoded_group_md_kv.key); + BOOST_REQUIRE_EQUAL(group_md_key, iobuf_decoded_group_md_kv.key); + + kafka::offset_metadata_key offset_key; + offset_key.group_id = random_named_string(); + ; + offset_key.topic = random_named_string(); + offset_key.partition = random_named_int(); + + auto offset_kv = serializer.to_kv(kafka::offset_metadata_kv{ + .key = offset_key, + }); + + // not wrapped in iobuf + auto offset_md_kv = serializer.decode_offset_metadata( + build_tombstone_record(offset_kv.key.copy())); + + auto iobuf_offset_md_kv = serializer.decode_offset_metadata( + build_tombstone_record(reflection::to_iobuf(offset_kv.key.copy()))); + + BOOST_REQUIRE_EQUAL(offset_key, offset_md_kv.key); + BOOST_REQUIRE_EQUAL(offset_key, iobuf_offset_md_kv.key); + } +}