Skip to content

Commit

Permalink
k/group_metadata: added compatibility with incorrectly serialized keys
Browse files Browse the repository at this point in the history
Previously in redpanda we had an error leading to wrapping tombstone
record keys with an iobuf serialization envelope. In order to provide
compatibility and be able to read keys wrapped with an additional iobuf
we introduce this check. The check validates the key version being in
allowed range. Otherwise we try to unwrap a key from iobuf.

Signed-off-by: Michal Maslanka <michal@vectorized.io>
  • Loading branch information
mmaslankaprv committed Jun 1, 2022
1 parent 412dbcd commit 609b763
Showing 1 changed file with 57 additions and 3 deletions.
60 changes: 57 additions & 3 deletions src/v/kafka/server/group_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "kafka/server/group_metadata.h"

#include "bytes/bytes.h"
#include "bytes/iobuf.h"
#include "bytes/iobuf_parser.h"
#include "kafka/protocol/request_reader.h"
#include "kafka/protocol/response_writer.h"
#include "kafka/types.h"
Expand Down Expand Up @@ -390,11 +392,61 @@ group_metadata_serializer make_backward_compatible_serializer() {
};
return group_metadata_serializer(std::make_unique<impl>());
}
iobuf maybe_unwrap_from_iobuf(iobuf buffer) {
/*
* Previously in redpanda we had an error leading to wrapping
* tombstone record keys with an iobuf serialization envelope. In order to
* provide compatibility and be able to read keys wrapped with an additional
* iobuf we introduce a check leveraging serialization format.
*
* In Kafka the only allowed values for group metadata versions are 0, 1,
* and 2. Group metadata use big endian encoding hence the first two bytes
* of a serialized group metadata key contains information about a version.
* Both offset_metadata_key and group_metadata_key first serialized field
* is a `group_id` string which is encoded as 2 bytes of size and then
* array of characters. This way 4 first bytes of encoded key are:
*
* b0 - denotes LSB
* bN - denotes N byte indexed from LSB to MSB (LSB has index 0)
*
* Key bytes (big endian):
*
* |version_b1|version_b0|size_b1|size_b0|...
*
* When key struct is wrapped with an iobuf serialization it is prefixed
* with iobuf size which is serialized as 4 bytes of a size (in a little
* endian encoding) and then the iobuf content
*
* iobuf size bytes (little endian):
*
* |size_b0|size_b1|size_b2|size_b3|...
*
* To check if metadata key was wrapped with an iobuf we peek first 4 bytes
* to try to decode iobuf size store in little endian format if returned
* size is equal to size of remaining key buffer size it means that we need
* to unwrap key from an iobuf.
*
*
* TODO: remove this code in future releases
*/

iobuf_parser parser(buffer.copy());
// peek first 4-byte
auto deserialized_size = reflection::from_iobuf<int32_t>(parser.peek(4));

if (unlikely(
buffer.size_bytes() == static_cast<size_t>(deserialized_size) + 4)) {
// unwrap from iobuf
return reflection::from_iobuf<iobuf>(std::move(buffer));
}
return buffer;
}

group_metadata_serializer make_consumer_offsets_serializer() {
struct impl final : group_metadata_serializer::impl {
group_metadata_type get_metadata_type(iobuf buffer) final {
auto reader = request_reader(std::move(buffer));
auto reader = request_reader(
maybe_unwrap_from_iobuf(std::move(buffer)));
return decode_metadata_type(reader);
};

Expand All @@ -421,7 +473,8 @@ group_metadata_serializer make_consumer_offsets_serializer() {

group_metadata_kv decode_group_metadata(model::record record) final {
group_metadata_kv ret;
request_reader k_reader(record.release_key());
request_reader k_reader(
maybe_unwrap_from_iobuf(record.release_key()));
ret.key = group_metadata_key::decode(k_reader);
if (record.has_value()) {
request_reader v_reader(record.release_value());
Expand All @@ -433,7 +486,8 @@ group_metadata_serializer make_consumer_offsets_serializer() {

offset_metadata_kv decode_offset_metadata(model::record record) final {
offset_metadata_kv ret;
request_reader k_reader(record.release_key());
request_reader k_reader(
maybe_unwrap_from_iobuf(record.release_key()));
ret.key = offset_metadata_key::decode(k_reader);
if (record.has_value()) {
request_reader v_reader(record.release_value());
Expand Down

0 comments on commit 609b763

Please sign in to comment.