From 8230cb7040b4a73f22fe27d459167ceee560066c Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 26 May 2022 16:48:53 +0200 Subject: [PATCH] k/group_metadata: added compatibility with incorrectly serialized keys Previously in redpanda we had an error leading to wrapping tombstone record keys with an iobuf serialization envelope. In order to provide compatibility and be able to read keys wrapped with an additional iobuf we introduce this check. The check validates the key version being in allowed range. Otherwise we try to unwrap a key from iobuf. Signed-off-by: Michal Maslanka --- src/v/kafka/server/group_metadata.cc | 30 +++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/v/kafka/server/group_metadata.cc b/src/v/kafka/server/group_metadata.cc index ccc5587e34510..760e0f2f772c6 100644 --- a/src/v/kafka/server/group_metadata.cc +++ b/src/v/kafka/server/group_metadata.cc @@ -12,6 +12,8 @@ #include "kafka/server/group_metadata.h" #include "bytes/bytes.h" +#include "bytes/iobuf.h" +#include "bytes/iobuf_parser.h" #include "kafka/protocol/request_reader.h" #include "kafka/protocol/response_writer.h" #include "kafka/types.h" @@ -390,11 +392,31 @@ group_metadata_serializer make_backward_compatible_serializer() { }; return group_metadata_serializer(std::make_unique()); } +iobuf maybe_unwrap_from_iobuf(iobuf buffer) { + /* + * Previously in redpanda we had an error leading to wrapping + * tombstone record keys with an iobuf serialization envelope. In order to + * provide compatibility and be able to read keys wrapped with an additional + * iobuf we introduce this check. The check validates the key version being + * in allowed range. otherwise we try to unwrap a key from iobuf. + * + * TODO: remove this code in future releases + * + */ + iobuf_parser parser(buffer.copy()); + int16_t key_version = request_reader(parser.peek(2)).read_int16(); + if (unlikely(key_version < 0 || key_version > 2)) { + // unwrap from iobuf + return reflection::from_iobuf(std::move(buffer)); + } + return buffer; +} group_metadata_serializer make_consumer_offsets_serializer() { struct impl final : group_metadata_serializer::impl { group_metadata_type get_metadata_type(iobuf buffer) final { - auto reader = request_reader(std::move(buffer)); + auto reader = request_reader( + maybe_unwrap_from_iobuf(std::move(buffer))); return decode_metadata_type(reader); }; @@ -421,7 +443,8 @@ group_metadata_serializer make_consumer_offsets_serializer() { group_metadata_kv decode_group_metadata(model::record record) final { group_metadata_kv ret; - request_reader k_reader(record.release_key()); + request_reader k_reader( + maybe_unwrap_from_iobuf(record.release_key())); ret.key = group_metadata_key::decode(k_reader); if (record.has_value()) { request_reader v_reader(record.release_value()); @@ -433,7 +456,8 @@ group_metadata_serializer make_consumer_offsets_serializer() { offset_metadata_kv decode_offset_metadata(model::record record) final { offset_metadata_kv ret; - request_reader k_reader(record.release_key()); + request_reader k_reader( + maybe_unwrap_from_iobuf(record.release_key())); ret.key = offset_metadata_key::decode(k_reader); if (record.has_value()) { request_reader v_reader(record.release_value());