Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed serialization of group tombstones #4901

4 changes: 2 additions & 2 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2393,7 +2393,7 @@ void add_offset_tombstone_record(
.partition = tp.partition,
};
auto kv = serializer.to_kv(offset_metadata_kv{.key = std::move(key)});
builder.add_raw_kv(reflection::to_iobuf(std::move(kv.key)), std::nullopt);
builder.add_raw_kv(std::move(kv.key), std::nullopt);
}

void add_group_tombstone_record(
Expand All @@ -2404,7 +2404,7 @@ void add_group_tombstone_record(
.group_id = group,
};
auto kv = serializer.to_kv(group_metadata_kv{.key = std::move(key)});
builder.add_raw_kv(reflection::to_iobuf(std::move(kv.key)), std::nullopt);
builder.add_raw_kv(std::move(kv.key), std::nullopt);
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
}
} // namespace

Expand Down
89 changes: 83 additions & 6 deletions src/v/kafka/server/group_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
#include "kafka/server/group_metadata.h"

#include "bytes/bytes.h"
#include "bytes/iobuf.h"
#include "bytes/iobuf_parser.h"
#include "kafka/protocol/request_reader.h"
#include "kafka/protocol/response_writer.h"
#include "kafka/server/logger.h"
#include "kafka/types.h"
#include "model/adl_serde.h"
#include "model/fundamental.h"
Expand Down Expand Up @@ -239,13 +242,84 @@ iobuf metadata_to_iobuf(const T& t) {
T::encode(writer, t);
return buffer;
}
iobuf maybe_unwrap_from_iobuf(iobuf buffer) {
/*
* Previously in redpanda we had an error leading to wrapping
* tombstone record keys with an iobuf serialization envelope. In
* order to provide compatibility and be able to read keys wrapped
* with an additional iobuf we introduce a check leveraging
* serialization format.
*
* # New serializer format - __consumer_offset topic
*
*
* In Kafka the only allowed values for group metadata versions are
* 0, 1, and 2. Group metadata use big endian encoding hence the
* first two bytes of a serialized group metadata key contains
* information about a version. Both offset_metadata_key and
* group_metadata_key first serialized field is a `group_id` string
* which is encoded as 2 bytes of size and then array of characters.
* This way 4 first bytes of encoded key are:
*
* b0 - denotes LSB
* bN - denotes N byte indexed from LSB to MSB (LSB has index 0)
*
* Key bytes (big endian):
*
* |version_b1|version_b0|size_b1|size_b0|...
*
* When key struct is wrapped with an iobuf serialization it is
* prefixed with iobuf size which is serialized as 4 bytes of a size
* (in a little endian encoding) and then the iobuf content
*
* iobuf size bytes (little endian):
*
* |size_b0|size_b1|size_b2|size_b3|...
*
* To check if metadata key was wrapped with an iobuf we peek first
* 4 bytes to try to decode iobuf size store in little endian format
* if returned size is equal to size of remaining key buffer size it
* means that we need to unwrap key from an iobuf.
*
* # Old serializer format
*
* When serialized with previous `kafka_internal/group` serializer a key was
* represented by
*
* struct group_log_record_key {
* enum class type : int8_t { group_metadata, offset_commit, noop };
* type record_type;
* iobuf key;
* };
*
* When serialized with ADL its binary representation is as follows
*
* |record_tp_b0|size_b0|size_b1|size_b2|...
*
* Those 4 first bytes will be decoded as an iobuf size.
*
* TODO: remove this code in future releases
*/

iobuf_const_parser parser(buffer);
// peek first 4-byte
auto deserialized_size = reflection::from_iobuf<int32_t>(parser.peek(4));

if (unlikely(
buffer.size_bytes() == static_cast<size_t>(deserialized_size) + 4)) {
vlog(kafka::klog.debug, "Unwrapping group metadata key from iobuf");
// unwrap from iobuf
return reflection::from_iobuf<iobuf>(std::move(buffer));
}
return buffer;
}
} // namespace

group_metadata_serializer make_backward_compatible_serializer() {
struct impl : group_metadata_serializer::impl {
group_metadata_type get_metadata_type(iobuf buffer) final {
auto key = reflection::from_iobuf<old::group_log_record_key>(
std::move(buffer));
maybe_unwrap_from_iobuf(std::move(buffer)));
switch (key.record_type) {
case old::group_log_record_key::type::offset_commit:
return group_metadata_type::offset_commit;
Expand Down Expand Up @@ -320,7 +394,7 @@ group_metadata_serializer make_backward_compatible_serializer() {
group_metadata_kv decode_group_metadata(model::record record) final {
group_metadata_kv ret;
auto record_key = reflection::from_iobuf<old::group_log_record_key>(
record.release_key());
maybe_unwrap_from_iobuf(record.release_key()));
auto group_id = kafka::group_id(
reflection::from_iobuf<kafka::group_id::type>(
std::move(record_key.key)));
Expand Down Expand Up @@ -363,7 +437,7 @@ group_metadata_serializer make_backward_compatible_serializer() {
offset_metadata_kv decode_offset_metadata(model::record r) final {
offset_metadata_kv ret;
auto record_key = reflection::from_iobuf<old::group_log_record_key>(
r.release_key());
maybe_unwrap_from_iobuf(r.release_key()));
auto key = reflection::from_iobuf<old::group_log_offset_key>(
std::move(record_key.key));

Expand Down Expand Up @@ -394,7 +468,8 @@ group_metadata_serializer make_backward_compatible_serializer() {
group_metadata_serializer make_consumer_offsets_serializer() {
struct impl final : group_metadata_serializer::impl {
group_metadata_type get_metadata_type(iobuf buffer) final {
auto reader = request_reader(std::move(buffer));
auto reader = request_reader(
maybe_unwrap_from_iobuf(std::move(buffer)));
return decode_metadata_type(reader);
};

Expand All @@ -421,7 +496,8 @@ group_metadata_serializer make_consumer_offsets_serializer() {

group_metadata_kv decode_group_metadata(model::record record) final {
group_metadata_kv ret;
request_reader k_reader(record.release_key());
request_reader k_reader(
maybe_unwrap_from_iobuf(record.release_key()));
ret.key = group_metadata_key::decode(k_reader);
if (record.has_value()) {
request_reader v_reader(record.release_value());
Expand All @@ -433,7 +509,8 @@ group_metadata_serializer make_consumer_offsets_serializer() {

offset_metadata_kv decode_offset_metadata(model::record record) final {
offset_metadata_kv ret;
request_reader k_reader(record.release_key());
request_reader k_reader(
maybe_unwrap_from_iobuf(record.release_key()));
ret.key = offset_metadata_key::decode(k_reader);
if (record.has_value()) {
request_reader v_reader(record.release_value());
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/server/group_recovery_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void group_recovery_consumer::handle_record(model::record r) {
} catch (...) {
vlog(
klog.error,
"error handling record record - {}",
"error handling group metadata record - {}",
std::current_exception());
}
}
Expand Down
58 changes: 58 additions & 0 deletions src/v/kafka/server/tests/group_metadata_serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
#include <boost/range/irange.hpp>
#include <boost/test/tools/old/interface.hpp>

#include <limits>
#include <optional>

struct fixture {
static ss::logger logger;

Expand Down Expand Up @@ -374,3 +377,58 @@ FIXTURE_TEST(test_consumer_offsets_serializer, fixture) {
BOOST_REQUIRE_EQUAL(offset_key, offset_md_kv.key);
BOOST_REQUIRE_EQUAL(offset_md, offset_md_kv.value);
}

model::record build_tombstone_record(iobuf buffer) {
storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));

builder.add_raw_kv(std::move(buffer), std::nullopt);
auto records = std::move(builder).build().copy_records();
return std::move(records.front());
}

FIXTURE_TEST(test_unwrapping_tombstones_from_iobuf, fixture) {
std::vector<kafka::group_metadata_serializer> serializers;
serializers.reserve(2);
serializers.push_back(kafka::make_consumer_offsets_serializer());
serializers.push_back(kafka::make_backward_compatible_serializer());

for (auto& serializer : serializers) {
kafka::group_metadata_key group_md_key;
group_md_key.group_id = random_named_string<kafka::group_id>();

auto group_kv = serializer.to_kv(kafka::group_metadata_kv{
.key = group_md_key,
});
auto group_tombstone = build_tombstone_record(group_kv.key.copy());
// not wrapped in iobuf
auto decoded_group_md_kv = serializer.decode_group_metadata(
std::move(group_tombstone));

auto iobuf_decoded_group_md_kv = serializer.decode_group_metadata(
build_tombstone_record(reflection::to_iobuf(group_kv.key.copy())));

BOOST_REQUIRE_EQUAL(group_md_key, decoded_group_md_kv.key);
BOOST_REQUIRE_EQUAL(group_md_key, iobuf_decoded_group_md_kv.key);

kafka::offset_metadata_key offset_key;
offset_key.group_id = random_named_string<kafka::group_id>();
;
offset_key.topic = random_named_string<model::topic>();
offset_key.partition = random_named_int<model::partition_id>();

auto offset_kv = serializer.to_kv(kafka::offset_metadata_kv{
.key = offset_key,
});

// not wrapped in iobuf
auto offset_md_kv = serializer.decode_offset_metadata(
build_tombstone_record(offset_kv.key.copy()));

auto iobuf_offset_md_kv = serializer.decode_offset_metadata(
build_tombstone_record(reflection::to_iobuf(offset_kv.key.copy())));

BOOST_REQUIRE_EQUAL(offset_key, offset_md_kv.key);
BOOST_REQUIRE_EQUAL(offset_key, iobuf_offset_md_kv.key);
}
}
4 changes: 4 additions & 0 deletions tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ def group_seek_to_group(self, group, to_group):
cmd = ["seek", group, "--to-group", to_group]
self._run_group(cmd)

def group_delete(self, group):
cmd = ["delete", group]
self._run_group(cmd)

def wasm_deploy(self, script, name, description):
cmd = [
self._rpk_binary(), 'wasm', 'deploy', script, '--brokers',
Expand Down
68 changes: 68 additions & 0 deletions tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,71 @@ def test_consumer_is_removed_when_timedout(self, static_members):
c.stop()
c.wait()
c.free()

@cluster(num_nodes=6)
@parametrize(static_members=True)
@parametrize(static_members=False)
def test_dead_group_recovery(self, static_members):
"""
Test validating that all offsets persisted in the group are removed when corresponding partition is removed.
"""
self.setup_producer(20)
group = 'test-gr-1'
# using short session timeout to make the test finish faster
consumers = self.create_consumers(
2,
self.topic_spec.name,
group,
static_members=static_members,
consumer_properties={"session.timeout.ms": 6000})

# wait for some messages
wait_until(lambda: ConsumerGroupTest.consumed_at_least(consumers, 50),
30, 2)
# at this point we have stable group
self.validate_group_state(group,
expected_state="Stable",
static_members=static_members)

# stop consumers
for c in consumers:
c.stop()

rpk = RpkTool(self.redpanda)

def group_is_empty():
rpk_group = rpk.group_describe(group)

return rpk_group.members == 0 and rpk_group.state == "Empty"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, is it possible for only one of these to be true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead group will have 0 members


# group should be empty now

wait_until(group_is_empty, 30, 2)

# delete topic
rpk.delete_topic(self.topic_spec.name)

def group_is_dead():
rpk_group = rpk.group_describe(group)

return rpk_group.members == 0 and rpk_group.state == "Dead"

wait_until(group_is_dead, 30, 2)

# recreate topic
self.redpanda.restart_nodes(self.redpanda.nodes)
# after recovery group should still be dead as it was deleted
wait_until(group_is_dead, 30, 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have much context here, but do we still need the wait_until here? What other state would it be that requires retries?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after a restart we may experience issues related with group coordinator not yet being available or leadership transferred


self.client().create_topic(self.topic_spec)
for c in consumers:
c.start()
wait_until(
lambda: ConsumerGroupTest.consumed_at_least(consumers, 2000), 30,
2)
for c in consumers:
c.stop()
c.wait()
c.free()
self.producer.wait()
self.producer.free()
23 changes: 20 additions & 3 deletions tests/rptest/tests/consumer_offsets_migration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,12 @@ def failure_injector_loop():
spec = TopicSpec(partition_count=6, replication_factor=3)
self.client().create_topic(spec)
self.topic = spec.name
# use two groups, one of them will be deleted to test migrating tombstone records
group_id = 'test-group-1'
second_group_id = 'test-group-2'

self.start_producer(1, throughput=5000)
self.start_consumer(1)
self.start_consumer(1, group_id=group_id)
self.await_startup()

def cluster_is_stable():
Expand All @@ -105,6 +108,10 @@ def cluster_is_stable():

kcl = KCL(self.redpanda)

rpk = RpkTool(self.redpanda)
# start consumer in second group
rpk.consume(spec.name, 100, group=second_group_id)

def _group_present():
return len(kcl.list_groups().splitlines()) > 1

Expand All @@ -116,8 +123,18 @@ def _group_present():

assert "__consumer_offsets" not in topics

# delete second group
def _second_group_empty():
return rpk.group_describe(second_group_id).state == "Empty"

wait_until(_second_group_empty, 60, 1)
rpk.group_delete(group=second_group_id)

# recreate second group, recovering a group will require reading tombstones
rpk.consume(spec.name, 100, group=second_group_id)

# enable consumer offsets support
self.redpanda.set_environment({"__REDPANDA_LOGICAL_VERSION": 2})
self.redpanda.set_environment({})
for n in self.redpanda.nodes:
id = self.redpanda.idx(n)
suppressed.add(id)
Expand Down Expand Up @@ -196,7 +213,7 @@ def node_stopped(node_id):
assert "__consumer_offsets" not in topics

# enable consumer offsets support
self.redpanda.set_environment({"__REDPANDA_LOGICAL_VERSION": 2})
self.redpanda.set_environment({})

def get_raft0_follower():
ctrl = self.redpanda.controller
Expand Down