Skip to content

Commit

Permalink
Merge pull request #5068 from mmaslankaprv/v22.1.x
Browse files Browse the repository at this point in the history
[V22.1.x] backport of #4901
  • Loading branch information
mmaslankaprv committed Jun 14, 2022
2 parents 03489da + 59886d6 commit 25d8e27
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 9 deletions.
4 changes: 2 additions & 2 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2159,7 +2159,7 @@ void add_offset_tombstone_record(
.partition = tp.partition,
};
auto kv = serializer.to_kv(offset_metadata_kv{.key = std::move(key)});
builder.add_raw_kv(reflection::to_iobuf(std::move(kv.key)), std::nullopt);
builder.add_raw_kv(std::move(kv.key), std::nullopt);
}

void add_group_tombstone_record(
Expand All @@ -2170,7 +2170,7 @@ void add_group_tombstone_record(
.group_id = group,
};
auto kv = serializer.to_kv(group_metadata_kv{.key = std::move(key)});
builder.add_raw_kv(reflection::to_iobuf(std::move(kv.key)), std::nullopt);
builder.add_raw_kv(std::move(kv.key), std::nullopt);
}
} // namespace

Expand Down
90 changes: 84 additions & 6 deletions src/v/kafka/server/group_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@
#include "kafka/server/group_metadata.h"

#include "bytes/bytes.h"
#include "bytes/iobuf.h"
#include "bytes/iobuf_parser.h"
#include "kafka/protocol/request_reader.h"
#include "kafka/protocol/response_writer.h"
#include "kafka/server/logger.h"
#include "kafka/types.h"
#include "model/adl_serde.h"
#include "model/fundamental.h"
#include "model/timestamp.h"
#include "reflection/adl.h"
#include "utils/to_string.h"
#include "vassert.h"
#include "vlog.h"

#include <fmt/core.h>
#include <fmt/ostream.h>
Expand Down Expand Up @@ -239,13 +243,84 @@ iobuf metadata_to_iobuf(const T& t) {
T::encode(writer, t);
return buffer;
}
iobuf maybe_unwrap_from_iobuf(iobuf buffer) {
/*
* Previously in redpanda we had an error leading to wrapping
* tombstone record keys with an iobuf serialization envelope. In
* order to provide compatibility and be able to read keys wrapped
* with an additional iobuf we introduce a check leveraging
* serialization format.
*
* # New serializer format - __consumer_offset topic
*
*
* In Kafka the only allowed values for gron metadata versions are
* 0, 1, and 2. Group metadata use big endian encoding hence the
* first two bytes of a serialized group metadata key contains
* information about a version. Both offset_metadata_key and
* group_metadata_key first serialized field is a `group_id` string
* which is encoded as 2 bytes of size and then array of characters.
* This way 4 first bytes of encoded key are:
*
* b0 - denotes LSB
* bN - denotes N byte indexed from LSB to MSB (LSB has index 0)
*
* Key bytes (big endian):
*
* |version_b1|version_b0|size_b1|size_b0|...
*
* When key struct is wrapped with an iobuf serialization it is
* prefixed with iobuf size which is serialized as 4 bytes of a size
* (in a little endian encoding) and then the iobuf content
*
* iobuf size bytes (little endian):
*
* |size_b0|size_b1|size_b2|size_b3|...
*
* To check if metadata key was wrapped with an iobuf we peek first
* 4 bytes to try to decode iobuf size store in little endian format
* if returned size is equal to size of remaining key buffer size it
* means that we need to unwrap key from an iobuf.
*
* # Old serializer format
*
* When serialized with previous `kafka_internal/group` serializer a key was
* represented by
*
* struct group_log_record_key {
* enum class type : int8_t { group_metadata, offset_commit, noop };
* type record_type;
* iobuf key;
* };
*
* When serialized with ADL its binary representation is as follows
*
* |record_tp_b0|size_b0|size_b1|size_b2|...
*
* Those 4 first bytes will be decoded as an iobuf size.
*
* TODO: remove this code in future releases
*/

iobuf_const_parser parser(buffer);
// peek first 4-byte
auto deserialized_size = reflection::from_iobuf<int32_t>(parser.peek(4));

if (unlikely(
buffer.size_bytes() == static_cast<size_t>(deserialized_size) + 4)) {
vlog(klog.debug, "Unwrapping group metadata key from iobuf");
// unwrap from iobuf
return reflection::from_iobuf<iobuf>(std::move(buffer));
}
return buffer;
}
} // namespace

group_metadata_serializer make_backward_compatible_serializer() {
struct impl : group_metadata_serializer::impl {
group_metadata_type get_metadata_type(iobuf buffer) final {
auto key = reflection::from_iobuf<old::group_log_record_key>(
std::move(buffer));
maybe_unwrap_from_iobuf(std::move(buffer)));
switch (key.record_type) {
case old::group_log_record_key::type::offset_commit:
return group_metadata_type::offset_commit;
Expand Down Expand Up @@ -320,7 +395,7 @@ group_metadata_serializer make_backward_compatible_serializer() {
group_metadata_kv decode_group_metadata(model::record record) final {
group_metadata_kv ret;
auto record_key = reflection::from_iobuf<old::group_log_record_key>(
record.release_key());
maybe_unwrap_from_iobuf(record.release_key()));
auto group_id = kafka::group_id(
reflection::from_iobuf<kafka::group_id::type>(
std::move(record_key.key)));
Expand Down Expand Up @@ -363,7 +438,7 @@ group_metadata_serializer make_backward_compatible_serializer() {
offset_metadata_kv decode_offset_metadata(model::record r) final {
offset_metadata_kv ret;
auto record_key = reflection::from_iobuf<old::group_log_record_key>(
r.release_key());
maybe_unwrap_from_iobuf(r.release_key()));
auto key = reflection::from_iobuf<old::group_log_offset_key>(
std::move(record_key.key));

Expand Down Expand Up @@ -394,7 +469,8 @@ group_metadata_serializer make_backward_compatible_serializer() {
group_metadata_serializer make_consumer_offsets_serializer() {
struct impl final : group_metadata_serializer::impl {
group_metadata_type get_metadata_type(iobuf buffer) final {
auto reader = request_reader(std::move(buffer));
auto reader = request_reader(
maybe_unwrap_from_iobuf(std::move(buffer)));
return decode_metadata_type(reader);
};

Expand All @@ -421,7 +497,8 @@ group_metadata_serializer make_consumer_offsets_serializer() {

group_metadata_kv decode_group_metadata(model::record record) final {
group_metadata_kv ret;
request_reader k_reader(record.release_key());
request_reader k_reader(
maybe_unwrap_from_iobuf(record.release_key()));
ret.key = group_metadata_key::decode(k_reader);
if (record.has_value()) {
request_reader v_reader(record.release_value());
Expand All @@ -433,7 +510,8 @@ group_metadata_serializer make_consumer_offsets_serializer() {

offset_metadata_kv decode_offset_metadata(model::record record) final {
offset_metadata_kv ret;
request_reader k_reader(record.release_key());
request_reader k_reader(
maybe_unwrap_from_iobuf(record.release_key()));
ret.key = offset_metadata_key::decode(k_reader);
if (record.has_value()) {
request_reader v_reader(record.release_value());
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/server/group_recovery_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void group_recovery_consumer::handle_record(model::record r) {
} catch (...) {
vlog(
klog.error,
"error handling record record - {}",
"error handling group metadata record - {}",
std::current_exception());
}
}
Expand Down
58 changes: 58 additions & 0 deletions src/v/kafka/server/tests/group_metadata_serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
#include <boost/range/irange.hpp>
#include <boost/test/tools/old/interface.hpp>

#include <limits>
#include <optional>

struct fixture {
static ss::logger logger;

Expand Down Expand Up @@ -374,3 +377,58 @@ FIXTURE_TEST(test_consumer_offsets_serializer, fixture) {
BOOST_REQUIRE_EQUAL(offset_key, offset_md_kv.key);
BOOST_REQUIRE_EQUAL(offset_md, offset_md_kv.value);
}

model::record build_tombstone_record(iobuf buffer) {
storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));

builder.add_raw_kv(std::move(buffer), std::nullopt);
auto records = std::move(builder).build().copy_records();
return std::move(records.front());
}

FIXTURE_TEST(test_unwrapping_tombstones_from_iobuf, fixture) {
std::vector<kafka::group_metadata_serializer> serializers;
serializers.reserve(2);
serializers.push_back(kafka::make_consumer_offsets_serializer());
serializers.push_back(kafka::make_backward_compatible_serializer());

for (auto& serializer : serializers) {
kafka::group_metadata_key group_md_key;
group_md_key.group_id = random_named_string<kafka::group_id>();

auto group_kv = serializer.to_kv(kafka::group_metadata_kv{
.key = group_md_key,
});
auto group_tombstone = build_tombstone_record(group_kv.key.copy());
// not wrapped in iobuf
auto decoded_group_md_kv = serializer.decode_group_metadata(
std::move(group_tombstone));

auto iobuf_decoded_group_md_kv = serializer.decode_group_metadata(
build_tombstone_record(reflection::to_iobuf(group_kv.key.copy())));

BOOST_REQUIRE_EQUAL(group_md_key, decoded_group_md_kv.key);
BOOST_REQUIRE_EQUAL(group_md_key, iobuf_decoded_group_md_kv.key);

kafka::offset_metadata_key offset_key;
offset_key.group_id = random_named_string<kafka::group_id>();
;
offset_key.topic = random_named_string<model::topic>();
offset_key.partition = random_named_int<model::partition_id>();

auto offset_kv = serializer.to_kv(kafka::offset_metadata_kv{
.key = offset_key,
});

// not wrapped in iobuf
auto offset_md_kv = serializer.decode_offset_metadata(
build_tombstone_record(offset_kv.key.copy()));

auto iobuf_offset_md_kv = serializer.decode_offset_metadata(
build_tombstone_record(reflection::to_iobuf(offset_kv.key.copy())));

BOOST_REQUIRE_EQUAL(offset_key, offset_md_kv.key);
BOOST_REQUIRE_EQUAL(offset_key, iobuf_offset_md_kv.key);
}
}

0 comments on commit 25d8e27

Please sign in to comment.