diff --git a/src/v/kafka/server/group_metadata.cc b/src/v/kafka/server/group_metadata.cc index ccc5587e34510..22bf84bac57f8 100644 --- a/src/v/kafka/server/group_metadata.cc +++ b/src/v/kafka/server/group_metadata.cc @@ -12,6 +12,8 @@ #include "kafka/server/group_metadata.h" #include "bytes/bytes.h" +#include "bytes/iobuf.h" +#include "bytes/iobuf_parser.h" #include "kafka/protocol/request_reader.h" #include "kafka/protocol/response_writer.h" #include "kafka/types.h" @@ -390,11 +392,61 @@ group_metadata_serializer make_backward_compatible_serializer() { }; return group_metadata_serializer(std::make_unique()); } +iobuf maybe_unwrap_from_iobuf(iobuf buffer) { + /* + * Previously in redpanda we had an error leading to wrapping + * tombstone record keys with an iobuf serialization envelope. In order to + * provide compatibility and be able to read keys wrapped with an additional + * iobuf we introduce a check leveraging serialization format. + * + * In Kafka the only allowed values for group metadata versions are 0, 1, + * and 2. Group metadata use big endian encoding hence the first two bytes + * of a serialized group metadata key contains information about a version. + * Both offset_metadata_key and group_metadata_key first serialized field + * is a `group_id` string which is encoded as 2 bytes of size and then + * array of characters. This way 4 first bytes of encoded key are: + * + * b0 - denotes LSB + * bN - denotes N byte indexed from LSB to MSB (LSB has index 0) + * + * Key bytes (big endian): + * + * |version_b1|version_b0|size_b1|size_b0|... + * + * When key struct is wrapped with an iobuf serialization it is prefixed + * with iobuf size which is serialized as 4 bytes of a size (in a little + * endian encoding) and then the iobuf content + * + * iobuf size bytes (little endian): + * + * |size_b0|size_b1|size_b2|size_b3|... + * + * To check if metadata key was wrapped with an iobuf we peek first 4 bytes + * to try to decode iobuf size store in little endian format if returned + * size is equal to size of remaining key buffer size it means that we need + * to unwrap key from an iobuf. + * + * + * TODO: remove this code in future releases + */ + + iobuf_parser parser(buffer.copy()); + // peek first 4-byte + auto deserialized_size = reflection::from_iobuf(parser.peek(4)); + + if (unlikely( + buffer.size_bytes() == static_cast(deserialized_size) + 4)) { + // unwrap from iobuf + return reflection::from_iobuf(std::move(buffer)); + } + return buffer; +} group_metadata_serializer make_consumer_offsets_serializer() { struct impl final : group_metadata_serializer::impl { group_metadata_type get_metadata_type(iobuf buffer) final { - auto reader = request_reader(std::move(buffer)); + auto reader = request_reader( + maybe_unwrap_from_iobuf(std::move(buffer))); return decode_metadata_type(reader); }; @@ -421,7 +473,8 @@ group_metadata_serializer make_consumer_offsets_serializer() { group_metadata_kv decode_group_metadata(model::record record) final { group_metadata_kv ret; - request_reader k_reader(record.release_key()); + request_reader k_reader( + maybe_unwrap_from_iobuf(record.release_key())); ret.key = group_metadata_key::decode(k_reader); if (record.has_value()) { request_reader v_reader(record.release_value()); @@ -433,7 +486,8 @@ group_metadata_serializer make_consumer_offsets_serializer() { offset_metadata_kv decode_offset_metadata(model::record record) final { offset_metadata_kv ret; - request_reader k_reader(record.release_key()); + request_reader k_reader( + maybe_unwrap_from_iobuf(record.release_key())); ret.key = offset_metadata_key::decode(k_reader); if (record.has_value()) { request_reader v_reader(record.release_value());