diff --git a/src/v/kafka/server/group_metadata.cc b/src/v/kafka/server/group_metadata.cc index ccc5587e34510..760e0f2f772c6 100644 --- a/src/v/kafka/server/group_metadata.cc +++ b/src/v/kafka/server/group_metadata.cc @@ -12,6 +12,8 @@ #include "kafka/server/group_metadata.h" #include "bytes/bytes.h" +#include "bytes/iobuf.h" +#include "bytes/iobuf_parser.h" #include "kafka/protocol/request_reader.h" #include "kafka/protocol/response_writer.h" #include "kafka/types.h" @@ -390,11 +392,31 @@ group_metadata_serializer make_backward_compatible_serializer() { }; return group_metadata_serializer(std::make_unique()); } +iobuf maybe_unwrap_from_iobuf(iobuf buffer) { + /* + * Previously in redpanda we had an error leading to wrapping + * tombstone record keys with an iobuf serialization envelope. In order to + * provide compatibility and be able to read keys wrapped with an additional + * iobuf we introduce this check. The check validates the key version being + * in allowed range. otherwise we try to unwrap a key from iobuf. + * + * TODO: remove this code in future releases + * + */ + iobuf_parser parser(buffer.copy()); + int16_t key_version = request_reader(parser.peek(2)).read_int16(); + if (unlikely(key_version < 0 || key_version > 2)) { + // unwrap from iobuf + return reflection::from_iobuf(std::move(buffer)); + } + return buffer; +} group_metadata_serializer make_consumer_offsets_serializer() { struct impl final : group_metadata_serializer::impl { group_metadata_type get_metadata_type(iobuf buffer) final { - auto reader = request_reader(std::move(buffer)); + auto reader = request_reader( + maybe_unwrap_from_iobuf(std::move(buffer))); return decode_metadata_type(reader); }; @@ -421,7 +443,8 @@ group_metadata_serializer make_consumer_offsets_serializer() { group_metadata_kv decode_group_metadata(model::record record) final { group_metadata_kv ret; - request_reader k_reader(record.release_key()); + request_reader k_reader( + maybe_unwrap_from_iobuf(record.release_key())); ret.key = group_metadata_key::decode(k_reader); if (record.has_value()) { request_reader v_reader(record.release_value()); @@ -433,7 +456,8 @@ group_metadata_serializer make_consumer_offsets_serializer() { offset_metadata_kv decode_offset_metadata(model::record record) final { offset_metadata_kv ret; - request_reader k_reader(record.release_key()); + request_reader k_reader( + maybe_unwrap_from_iobuf(record.release_key())); ret.key = offset_metadata_key::decode(k_reader); if (record.has_value()) { request_reader v_reader(record.release_value());