diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index f46558635c04..d50e117c5627 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -2393,7 +2393,7 @@ void add_offset_tombstone_record( .partition = tp.partition, }; auto kv = serializer.to_kv(offset_metadata_kv{.key = std::move(key)}); - builder.add_raw_kv(reflection::to_iobuf(std::move(kv.key)), std::nullopt); + builder.add_raw_kv(std::move(kv.key), std::nullopt); } void add_group_tombstone_record( @@ -2404,7 +2404,7 @@ void add_group_tombstone_record( .group_id = group, }; auto kv = serializer.to_kv(group_metadata_kv{.key = std::move(key)}); - builder.add_raw_kv(reflection::to_iobuf(std::move(kv.key)), std::nullopt); + builder.add_raw_kv(std::move(kv.key), std::nullopt); } } // namespace diff --git a/src/v/kafka/server/group_metadata.cc b/src/v/kafka/server/group_metadata.cc index ccc5587e3451..7da1e3f1075e 100644 --- a/src/v/kafka/server/group_metadata.cc +++ b/src/v/kafka/server/group_metadata.cc @@ -12,8 +12,11 @@ #include "kafka/server/group_metadata.h" #include "bytes/bytes.h" +#include "bytes/iobuf.h" +#include "bytes/iobuf_parser.h" #include "kafka/protocol/request_reader.h" #include "kafka/protocol/response_writer.h" +#include "kafka/server/logger.h" #include "kafka/types.h" #include "model/adl_serde.h" #include "model/fundamental.h" @@ -239,13 +242,84 @@ iobuf metadata_to_iobuf(const T& t) { T::encode(writer, t); return buffer; } +iobuf maybe_unwrap_from_iobuf(iobuf buffer) { + /* + * Previously in redpanda we had an error leading to wrapping + * tombstone record keys with an iobuf serialization envelope. In + * order to provide compatibility and be able to read keys wrapped + * with an additional iobuf we introduce a check leveraging + * serialization format. + * + * # New serializer format - __consumer_offset topic + * + * + * In Kafka the only allowed values for group metadata versions are + * 0, 1, and 2. Group metadata use big endian encoding hence the + * first two bytes of a serialized group metadata key contains + * information about a version. Both offset_metadata_key and + * group_metadata_key first serialized field is a `group_id` string + * which is encoded as 2 bytes of size and then array of characters. + * This way 4 first bytes of encoded key are: + * + * b0 - denotes LSB + * bN - denotes N byte indexed from LSB to MSB (LSB has index 0) + * + * Key bytes (big endian): + * + * |version_b1|version_b0|size_b1|size_b0|... + * + * When key struct is wrapped with an iobuf serialization it is + * prefixed with iobuf size which is serialized as 4 bytes of a size + * (in a little endian encoding) and then the iobuf content + * + * iobuf size bytes (little endian): + * + * |size_b0|size_b1|size_b2|size_b3|... + * + * To check if metadata key was wrapped with an iobuf we peek first + * 4 bytes to try to decode iobuf size store in little endian format + * if returned size is equal to size of remaining key buffer size it + * means that we need to unwrap key from an iobuf. + * + * # Old serializer format + * + * When serialized with previous `kafka_internal/group` serializer a key was + * represented by + * + * struct group_log_record_key { + * enum class type : int8_t { group_metadata, offset_commit, noop }; + * type record_type; + * iobuf key; + * }; + * + * When serialized with ADL its binary representation is as follows + * + * |record_tp_b0|size_b0|size_b1|size_b2|... + * + * Those 4 first bytes will be decoded as an iobuf size. + * + * TODO: remove this code in future releases + */ + + iobuf_const_parser parser(buffer); + // peek first 4-byte + auto deserialized_size = reflection::from_iobuf(parser.peek(4)); + + if (unlikely( + buffer.size_bytes() == static_cast(deserialized_size) + 4)) { + vlog(kafka::klog.debug, "Unwrapping group metadata key from iobuf"); + // unwrap from iobuf + return reflection::from_iobuf(std::move(buffer)); + } + return buffer; +} } // namespace group_metadata_serializer make_backward_compatible_serializer() { struct impl : group_metadata_serializer::impl { group_metadata_type get_metadata_type(iobuf buffer) final { auto key = reflection::from_iobuf( - std::move(buffer)); + maybe_unwrap_from_iobuf(std::move(buffer))); switch (key.record_type) { case old::group_log_record_key::type::offset_commit: return group_metadata_type::offset_commit; @@ -320,7 +394,7 @@ group_metadata_serializer make_backward_compatible_serializer() { group_metadata_kv decode_group_metadata(model::record record) final { group_metadata_kv ret; auto record_key = reflection::from_iobuf( - record.release_key()); + maybe_unwrap_from_iobuf(record.release_key())); auto group_id = kafka::group_id( reflection::from_iobuf( std::move(record_key.key))); @@ -363,7 +437,7 @@ group_metadata_serializer make_backward_compatible_serializer() { offset_metadata_kv decode_offset_metadata(model::record r) final { offset_metadata_kv ret; auto record_key = reflection::from_iobuf( - r.release_key()); + maybe_unwrap_from_iobuf(r.release_key())); auto key = reflection::from_iobuf( std::move(record_key.key)); @@ -394,7 +468,8 @@ group_metadata_serializer make_backward_compatible_serializer() { group_metadata_serializer make_consumer_offsets_serializer() { struct impl final : group_metadata_serializer::impl { group_metadata_type get_metadata_type(iobuf buffer) final { - auto reader = request_reader(std::move(buffer)); + auto reader = request_reader( + maybe_unwrap_from_iobuf(std::move(buffer))); return decode_metadata_type(reader); }; @@ -421,7 +496,8 @@ group_metadata_serializer make_consumer_offsets_serializer() { group_metadata_kv decode_group_metadata(model::record record) final { group_metadata_kv ret; - request_reader k_reader(record.release_key()); + request_reader k_reader( + maybe_unwrap_from_iobuf(record.release_key())); ret.key = group_metadata_key::decode(k_reader); if (record.has_value()) { request_reader v_reader(record.release_value()); @@ -433,7 +509,8 @@ group_metadata_serializer make_consumer_offsets_serializer() { offset_metadata_kv decode_offset_metadata(model::record record) final { offset_metadata_kv ret; - request_reader k_reader(record.release_key()); + request_reader k_reader( + maybe_unwrap_from_iobuf(record.release_key())); ret.key = offset_metadata_key::decode(k_reader); if (record.has_value()) { request_reader v_reader(record.release_value()); diff --git a/src/v/kafka/server/group_recovery_consumer.cc b/src/v/kafka/server/group_recovery_consumer.cc index 0852dbef8dc5..f010e4c266a5 100644 --- a/src/v/kafka/server/group_recovery_consumer.cc +++ b/src/v/kafka/server/group_recovery_consumer.cc @@ -141,7 +141,7 @@ void group_recovery_consumer::handle_record(model::record r) { } catch (...) { vlog( klog.error, - "error handling record record - {}", + "error handling group metadata record - {}", std::current_exception()); } } diff --git a/src/v/kafka/server/tests/group_metadata_serialization_test.cc b/src/v/kafka/server/tests/group_metadata_serialization_test.cc index 6c78de55b5d7..e4d898499295 100644 --- a/src/v/kafka/server/tests/group_metadata_serialization_test.cc +++ b/src/v/kafka/server/tests/group_metadata_serialization_test.cc @@ -29,6 +29,9 @@ #include #include +#include +#include + struct fixture { static ss::logger logger; @@ -374,3 +377,58 @@ FIXTURE_TEST(test_consumer_offsets_serializer, fixture) { BOOST_REQUIRE_EQUAL(offset_key, offset_md_kv.key); BOOST_REQUIRE_EQUAL(offset_md, offset_md_kv.value); } + +model::record build_tombstone_record(iobuf buffer) { + storage::record_batch_builder builder( + model::record_batch_type::raft_data, model::offset(0)); + + builder.add_raw_kv(std::move(buffer), std::nullopt); + auto records = std::move(builder).build().copy_records(); + return std::move(records.front()); +} + +FIXTURE_TEST(test_unwrapping_tombstones_from_iobuf, fixture) { + std::vector serializers; + serializers.reserve(2); + serializers.push_back(kafka::make_consumer_offsets_serializer()); + serializers.push_back(kafka::make_backward_compatible_serializer()); + + for (auto& serializer : serializers) { + kafka::group_metadata_key group_md_key; + group_md_key.group_id = random_named_string(); + + auto group_kv = serializer.to_kv(kafka::group_metadata_kv{ + .key = group_md_key, + }); + auto group_tombstone = build_tombstone_record(group_kv.key.copy()); + // not wrapped in iobuf + auto decoded_group_md_kv = serializer.decode_group_metadata( + std::move(group_tombstone)); + + auto iobuf_decoded_group_md_kv = serializer.decode_group_metadata( + build_tombstone_record(reflection::to_iobuf(group_kv.key.copy()))); + + BOOST_REQUIRE_EQUAL(group_md_key, decoded_group_md_kv.key); + BOOST_REQUIRE_EQUAL(group_md_key, iobuf_decoded_group_md_kv.key); + + kafka::offset_metadata_key offset_key; + offset_key.group_id = random_named_string(); + ; + offset_key.topic = random_named_string(); + offset_key.partition = random_named_int(); + + auto offset_kv = serializer.to_kv(kafka::offset_metadata_kv{ + .key = offset_key, + }); + + // not wrapped in iobuf + auto offset_md_kv = serializer.decode_offset_metadata( + build_tombstone_record(offset_kv.key.copy())); + + auto iobuf_offset_md_kv = serializer.decode_offset_metadata( + build_tombstone_record(reflection::to_iobuf(offset_kv.key.copy()))); + + BOOST_REQUIRE_EQUAL(offset_key, offset_md_kv.key); + BOOST_REQUIRE_EQUAL(offset_key, iobuf_offset_md_kv.key); + } +} diff --git a/tests/rptest/clients/rpk.py b/tests/rptest/clients/rpk.py index 42f7d743f674..e801486ba01d 100644 --- a/tests/rptest/clients/rpk.py +++ b/tests/rptest/clients/rpk.py @@ -394,6 +394,10 @@ def group_seek_to_group(self, group, to_group): cmd = ["seek", group, "--to-group", to_group] self._run_group(cmd) + def group_delete(self, group): + cmd = ["delete", group] + self._run_group(cmd) + def wasm_deploy(self, script, name, description): cmd = [ self._rpk_binary(), 'wasm', 'deploy', script, '--brokers', diff --git a/tests/rptest/tests/consumer_group_test.py b/tests/rptest/tests/consumer_group_test.py index fbe2b847d48d..d6e6350bc4a5 100644 --- a/tests/rptest/tests/consumer_group_test.py +++ b/tests/rptest/tests/consumer_group_test.py @@ -303,3 +303,71 @@ def test_consumer_is_removed_when_timedout(self, static_members): c.stop() c.wait() c.free() + + @cluster(num_nodes=6) + @parametrize(static_members=True) + @parametrize(static_members=False) + def test_dead_group_recovery(self, static_members): + """ + Test validating that all offsets persisted in the group are removed when corresponding partition is removed. + """ + self.setup_producer(20) + group = 'test-gr-1' + # using short session timeout to make the test finish faster + consumers = self.create_consumers( + 2, + self.topic_spec.name, + group, + static_members=static_members, + consumer_properties={"session.timeout.ms": 6000}) + + # wait for some messages + wait_until(lambda: ConsumerGroupTest.consumed_at_least(consumers, 50), + 30, 2) + # at this point we have stable group + self.validate_group_state(group, + expected_state="Stable", + static_members=static_members) + + # stop consumers + for c in consumers: + c.stop() + + rpk = RpkTool(self.redpanda) + + def group_is_empty(): + rpk_group = rpk.group_describe(group) + + return rpk_group.members == 0 and rpk_group.state == "Empty" + + # group should be empty now + + wait_until(group_is_empty, 30, 2) + + # delete topic + rpk.delete_topic(self.topic_spec.name) + + def group_is_dead(): + rpk_group = rpk.group_describe(group) + + return rpk_group.members == 0 and rpk_group.state == "Dead" + + wait_until(group_is_dead, 30, 2) + + # recreate topic + self.redpanda.restart_nodes(self.redpanda.nodes) + # after recovery group should still be dead as it was deleted + wait_until(group_is_dead, 30, 2) + + self.client().create_topic(self.topic_spec) + for c in consumers: + c.start() + wait_until( + lambda: ConsumerGroupTest.consumed_at_least(consumers, 2000), 30, + 2) + for c in consumers: + c.stop() + c.wait() + c.free() + self.producer.wait() + self.producer.free() diff --git a/tests/rptest/tests/consumer_offsets_migration_test.py b/tests/rptest/tests/consumer_offsets_migration_test.py index e4938134e76a..c437e4ee7868 100644 --- a/tests/rptest/tests/consumer_offsets_migration_test.py +++ b/tests/rptest/tests/consumer_offsets_migration_test.py @@ -85,9 +85,12 @@ def failure_injector_loop(): spec = TopicSpec(partition_count=6, replication_factor=3) self.client().create_topic(spec) self.topic = spec.name + # use two groups, one of them will be deleted to test migrating tombstone records + group_id = 'test-group-1' + second_group_id = 'test-group-2' self.start_producer(1, throughput=5000) - self.start_consumer(1) + self.start_consumer(1, group_id=group_id) self.await_startup() def cluster_is_stable(): @@ -105,6 +108,10 @@ def cluster_is_stable(): kcl = KCL(self.redpanda) + rpk = RpkTool(self.redpanda) + # start consumer in second group + rpk.consume(spec.name, 100, group=second_group_id) + def _group_present(): return len(kcl.list_groups().splitlines()) > 1 @@ -116,8 +123,18 @@ def _group_present(): assert "__consumer_offsets" not in topics + # delete second group + def _second_group_empty(): + return rpk.group_describe(second_group_id).state == "Empty" + + wait_until(_second_group_empty, 60, 1) + rpk.group_delete(group=second_group_id) + + # recreate second group, recovering a group will require reading tombstones + rpk.consume(spec.name, 100, group=second_group_id) + # enable consumer offsets support - self.redpanda.set_environment({"__REDPANDA_LOGICAL_VERSION": 2}) + self.redpanda.set_environment({}) for n in self.redpanda.nodes: id = self.redpanda.idx(n) suppressed.add(id) @@ -196,7 +213,7 @@ def node_stopped(node_id): assert "__consumer_offsets" not in topics # enable consumer offsets support - self.redpanda.set_environment({"__REDPANDA_LOGICAL_VERSION": 2}) + self.redpanda.set_environment({}) def get_raft0_follower(): ctrl = self.redpanda.controller