From d32ac70b2de6cb8ec7d980681b3c2f75a508c49e Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 24 May 2022 11:12:15 +0200 Subject: [PATCH 1/7] k/gr_recovery_consumer: improved exception message Signed-off-by: Michal Maslanka --- src/v/kafka/server/group_recovery_consumer.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/kafka/server/group_recovery_consumer.cc b/src/v/kafka/server/group_recovery_consumer.cc index 0852dbef8dc5..f010e4c266a5 100644 --- a/src/v/kafka/server/group_recovery_consumer.cc +++ b/src/v/kafka/server/group_recovery_consumer.cc @@ -141,7 +141,7 @@ void group_recovery_consumer::handle_record(model::record r) { } catch (...) { vlog( klog.error, - "error handling record record - {}", + "error handling group metadata record - {}", std::current_exception()); } } From 412dbcd056d70f18fb06889e85377a5a83020ad4 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 24 May 2022 11:12:48 +0200 Subject: [PATCH 2/7] k/group: do not serialize key iobuf as iobuf When serialized a key is already an iobuf. Wrapping it with another serialization layer is incorrect as it introduces additional size field in the binary format that deserializer doesn't expect. Signed-off-by: Michal Maslanka --- src/v/kafka/server/group.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index f46558635c04..d50e117c5627 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -2393,7 +2393,7 @@ void add_offset_tombstone_record( .partition = tp.partition, }; auto kv = serializer.to_kv(offset_metadata_kv{.key = std::move(key)}); - builder.add_raw_kv(reflection::to_iobuf(std::move(kv.key)), std::nullopt); + builder.add_raw_kv(std::move(kv.key), std::nullopt); } void add_group_tombstone_record( @@ -2404,7 +2404,7 @@ void add_group_tombstone_record( .group_id = group, }; auto kv = serializer.to_kv(group_metadata_kv{.key = std::move(key)}); - builder.add_raw_kv(reflection::to_iobuf(std::move(kv.key)), std::nullopt); + builder.add_raw_kv(std::move(kv.key), std::nullopt); } } // namespace From 19ee4ebf57054c9cca58e8359afece316f01a0f5 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 26 May 2022 16:48:53 +0200 Subject: [PATCH 3/7] k/group_metadata: added compatibility with incorrectly serialized keys Previously in redpanda we had an error leading to wrapping tombstone record keys with an iobuf serialization envelope. In order to provide compatibility and be able to read keys wrapped with an additional iobuf we introduce this check. The check validates the key version being in allowed range. Otherwise we try to unwrap a key from iobuf. Signed-off-by: Michal Maslanka --- src/v/kafka/server/group_metadata.cc | 89 ++++++++++++++++++++++++++-- 1 file changed, 83 insertions(+), 6 deletions(-) diff --git a/src/v/kafka/server/group_metadata.cc b/src/v/kafka/server/group_metadata.cc index ccc5587e3451..7da1e3f1075e 100644 --- a/src/v/kafka/server/group_metadata.cc +++ b/src/v/kafka/server/group_metadata.cc @@ -12,8 +12,11 @@ #include "kafka/server/group_metadata.h" #include "bytes/bytes.h" +#include "bytes/iobuf.h" +#include "bytes/iobuf_parser.h" #include "kafka/protocol/request_reader.h" #include "kafka/protocol/response_writer.h" +#include "kafka/server/logger.h" #include "kafka/types.h" #include "model/adl_serde.h" #include "model/fundamental.h" @@ -239,13 +242,84 @@ iobuf metadata_to_iobuf(const T& t) { T::encode(writer, t); return buffer; } +iobuf maybe_unwrap_from_iobuf(iobuf buffer) { + /* + * Previously in redpanda we had an error leading to wrapping + * tombstone record keys with an iobuf serialization envelope. In + * order to provide compatibility and be able to read keys wrapped + * with an additional iobuf we introduce a check leveraging + * serialization format. + * + * # New serializer format - __consumer_offset topic + * + * + * In Kafka the only allowed values for group metadata versions are + * 0, 1, and 2. Group metadata use big endian encoding hence the + * first two bytes of a serialized group metadata key contains + * information about a version. Both offset_metadata_key and + * group_metadata_key first serialized field is a `group_id` string + * which is encoded as 2 bytes of size and then array of characters. + * This way 4 first bytes of encoded key are: + * + * b0 - denotes LSB + * bN - denotes N byte indexed from LSB to MSB (LSB has index 0) + * + * Key bytes (big endian): + * + * |version_b1|version_b0|size_b1|size_b0|... + * + * When key struct is wrapped with an iobuf serialization it is + * prefixed with iobuf size which is serialized as 4 bytes of a size + * (in a little endian encoding) and then the iobuf content + * + * iobuf size bytes (little endian): + * + * |size_b0|size_b1|size_b2|size_b3|... + * + * To check if metadata key was wrapped with an iobuf we peek first + * 4 bytes to try to decode iobuf size store in little endian format + * if returned size is equal to size of remaining key buffer size it + * means that we need to unwrap key from an iobuf. + * + * # Old serializer format + * + * When serialized with previous `kafka_internal/group` serializer a key was + * represented by + * + * struct group_log_record_key { + * enum class type : int8_t { group_metadata, offset_commit, noop }; + * type record_type; + * iobuf key; + * }; + * + * When serialized with ADL its binary representation is as follows + * + * |record_tp_b0|size_b0|size_b1|size_b2|... + * + * Those 4 first bytes will be decoded as an iobuf size. + * + * TODO: remove this code in future releases + */ + + iobuf_const_parser parser(buffer); + // peek first 4-byte + auto deserialized_size = reflection::from_iobuf(parser.peek(4)); + + if (unlikely( + buffer.size_bytes() == static_cast(deserialized_size) + 4)) { + vlog(kafka::klog.debug, "Unwrapping group metadata key from iobuf"); + // unwrap from iobuf + return reflection::from_iobuf(std::move(buffer)); + } + return buffer; +} } // namespace group_metadata_serializer make_backward_compatible_serializer() { struct impl : group_metadata_serializer::impl { group_metadata_type get_metadata_type(iobuf buffer) final { auto key = reflection::from_iobuf( - std::move(buffer)); + maybe_unwrap_from_iobuf(std::move(buffer))); switch (key.record_type) { case old::group_log_record_key::type::offset_commit: return group_metadata_type::offset_commit; @@ -320,7 +394,7 @@ group_metadata_serializer make_backward_compatible_serializer() { group_metadata_kv decode_group_metadata(model::record record) final { group_metadata_kv ret; auto record_key = reflection::from_iobuf( - record.release_key()); + maybe_unwrap_from_iobuf(record.release_key())); auto group_id = kafka::group_id( reflection::from_iobuf( std::move(record_key.key))); @@ -363,7 +437,7 @@ group_metadata_serializer make_backward_compatible_serializer() { offset_metadata_kv decode_offset_metadata(model::record r) final { offset_metadata_kv ret; auto record_key = reflection::from_iobuf( - r.release_key()); + maybe_unwrap_from_iobuf(r.release_key())); auto key = reflection::from_iobuf( std::move(record_key.key)); @@ -394,7 +468,8 @@ group_metadata_serializer make_backward_compatible_serializer() { group_metadata_serializer make_consumer_offsets_serializer() { struct impl final : group_metadata_serializer::impl { group_metadata_type get_metadata_type(iobuf buffer) final { - auto reader = request_reader(std::move(buffer)); + auto reader = request_reader( + maybe_unwrap_from_iobuf(std::move(buffer))); return decode_metadata_type(reader); }; @@ -421,7 +496,8 @@ group_metadata_serializer make_consumer_offsets_serializer() { group_metadata_kv decode_group_metadata(model::record record) final { group_metadata_kv ret; - request_reader k_reader(record.release_key()); + request_reader k_reader( + maybe_unwrap_from_iobuf(record.release_key())); ret.key = group_metadata_key::decode(k_reader); if (record.has_value()) { request_reader v_reader(record.release_value()); @@ -433,7 +509,8 @@ group_metadata_serializer make_consumer_offsets_serializer() { offset_metadata_kv decode_offset_metadata(model::record record) final { offset_metadata_kv ret; - request_reader k_reader(record.release_key()); + request_reader k_reader( + maybe_unwrap_from_iobuf(record.release_key())); ret.key = offset_metadata_key::decode(k_reader); if (record.has_value()) { request_reader v_reader(record.release_value()); From a352a1710de499329487f77e4c649a4276d15244 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 2 Jun 2022 19:56:28 +0200 Subject: [PATCH 4/7] k/group_metadata: added tests validating serialization of metadata key Added test validating a compatibility of decoding group metadata keys wrapped in iobuf. Signed-off-by: Michal Maslanka --- .../group_metadata_serialization_test.cc | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/src/v/kafka/server/tests/group_metadata_serialization_test.cc b/src/v/kafka/server/tests/group_metadata_serialization_test.cc index 6c78de55b5d7..e4d898499295 100644 --- a/src/v/kafka/server/tests/group_metadata_serialization_test.cc +++ b/src/v/kafka/server/tests/group_metadata_serialization_test.cc @@ -29,6 +29,9 @@ #include #include +#include +#include + struct fixture { static ss::logger logger; @@ -374,3 +377,58 @@ FIXTURE_TEST(test_consumer_offsets_serializer, fixture) { BOOST_REQUIRE_EQUAL(offset_key, offset_md_kv.key); BOOST_REQUIRE_EQUAL(offset_md, offset_md_kv.value); } + +model::record build_tombstone_record(iobuf buffer) { + storage::record_batch_builder builder( + model::record_batch_type::raft_data, model::offset(0)); + + builder.add_raw_kv(std::move(buffer), std::nullopt); + auto records = std::move(builder).build().copy_records(); + return std::move(records.front()); +} + +FIXTURE_TEST(test_unwrapping_tombstones_from_iobuf, fixture) { + std::vector serializers; + serializers.reserve(2); + serializers.push_back(kafka::make_consumer_offsets_serializer()); + serializers.push_back(kafka::make_backward_compatible_serializer()); + + for (auto& serializer : serializers) { + kafka::group_metadata_key group_md_key; + group_md_key.group_id = random_named_string(); + + auto group_kv = serializer.to_kv(kafka::group_metadata_kv{ + .key = group_md_key, + }); + auto group_tombstone = build_tombstone_record(group_kv.key.copy()); + // not wrapped in iobuf + auto decoded_group_md_kv = serializer.decode_group_metadata( + std::move(group_tombstone)); + + auto iobuf_decoded_group_md_kv = serializer.decode_group_metadata( + build_tombstone_record(reflection::to_iobuf(group_kv.key.copy()))); + + BOOST_REQUIRE_EQUAL(group_md_key, decoded_group_md_kv.key); + BOOST_REQUIRE_EQUAL(group_md_key, iobuf_decoded_group_md_kv.key); + + kafka::offset_metadata_key offset_key; + offset_key.group_id = random_named_string(); + ; + offset_key.topic = random_named_string(); + offset_key.partition = random_named_int(); + + auto offset_kv = serializer.to_kv(kafka::offset_metadata_kv{ + .key = offset_key, + }); + + // not wrapped in iobuf + auto offset_md_kv = serializer.decode_offset_metadata( + build_tombstone_record(offset_kv.key.copy())); + + auto iobuf_offset_md_kv = serializer.decode_offset_metadata( + build_tombstone_record(reflection::to_iobuf(offset_kv.key.copy()))); + + BOOST_REQUIRE_EQUAL(offset_key, offset_md_kv.key); + BOOST_REQUIRE_EQUAL(offset_key, iobuf_offset_md_kv.key); + } +} From 48c2e7e5cb763cee70cb7408d8ee42a147ef78e8 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 24 May 2022 11:14:45 +0200 Subject: [PATCH 5/7] tests: added dead group recovery test Signed-off-by: Michal Maslanka --- tests/rptest/tests/consumer_group_test.py | 68 +++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/tests/rptest/tests/consumer_group_test.py b/tests/rptest/tests/consumer_group_test.py index fbe2b847d48d..d6e6350bc4a5 100644 --- a/tests/rptest/tests/consumer_group_test.py +++ b/tests/rptest/tests/consumer_group_test.py @@ -303,3 +303,71 @@ def test_consumer_is_removed_when_timedout(self, static_members): c.stop() c.wait() c.free() + + @cluster(num_nodes=6) + @parametrize(static_members=True) + @parametrize(static_members=False) + def test_dead_group_recovery(self, static_members): + """ + Test validating that all offsets persisted in the group are removed when corresponding partition is removed. + """ + self.setup_producer(20) + group = 'test-gr-1' + # using short session timeout to make the test finish faster + consumers = self.create_consumers( + 2, + self.topic_spec.name, + group, + static_members=static_members, + consumer_properties={"session.timeout.ms": 6000}) + + # wait for some messages + wait_until(lambda: ConsumerGroupTest.consumed_at_least(consumers, 50), + 30, 2) + # at this point we have stable group + self.validate_group_state(group, + expected_state="Stable", + static_members=static_members) + + # stop consumers + for c in consumers: + c.stop() + + rpk = RpkTool(self.redpanda) + + def group_is_empty(): + rpk_group = rpk.group_describe(group) + + return rpk_group.members == 0 and rpk_group.state == "Empty" + + # group should be empty now + + wait_until(group_is_empty, 30, 2) + + # delete topic + rpk.delete_topic(self.topic_spec.name) + + def group_is_dead(): + rpk_group = rpk.group_describe(group) + + return rpk_group.members == 0 and rpk_group.state == "Dead" + + wait_until(group_is_dead, 30, 2) + + # recreate topic + self.redpanda.restart_nodes(self.redpanda.nodes) + # after recovery group should still be dead as it was deleted + wait_until(group_is_dead, 30, 2) + + self.client().create_topic(self.topic_spec) + for c in consumers: + c.start() + wait_until( + lambda: ConsumerGroupTest.consumed_at_least(consumers, 2000), 30, + 2) + for c in consumers: + c.stop() + c.wait() + c.free() + self.producer.wait() + self.producer.free() From fdb6e7212c494b00d5dfd5a3a812b1c23fa658eb Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 3 Jun 2022 09:48:48 +0200 Subject: [PATCH 6/7] tests/rpk: added group_delete method Signed-off-by: Michal Maslanka --- tests/rptest/clients/rpk.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/rptest/clients/rpk.py b/tests/rptest/clients/rpk.py index 42f7d743f674..e801486ba01d 100644 --- a/tests/rptest/clients/rpk.py +++ b/tests/rptest/clients/rpk.py @@ -394,6 +394,10 @@ def group_seek_to_group(self, group, to_group): cmd = ["seek", group, "--to-group", to_group] self._run_group(cmd) + def group_delete(self, group): + cmd = ["delete", group] + self._run_group(cmd) + def wasm_deploy(self, script, name, description): cmd = [ self._rpk_binary(), 'wasm', 'deploy', script, '--brokers', From 73566a6ef57939ed4f474996767edbc16a8ef84c Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 3 Jun 2022 09:50:31 +0200 Subject: [PATCH 7/7] tests/migration_test: extended test to validate tombstones migration Added additional consumer group to migration test. The group is deleted during the test. Group deletion triggers a code path that involves reading and interpreting group tombstones. Signed-off-by: Michal Maslanka --- .../tests/consumer_offsets_migration_test.py | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/tests/rptest/tests/consumer_offsets_migration_test.py b/tests/rptest/tests/consumer_offsets_migration_test.py index e4938134e76a..c437e4ee7868 100644 --- a/tests/rptest/tests/consumer_offsets_migration_test.py +++ b/tests/rptest/tests/consumer_offsets_migration_test.py @@ -85,9 +85,12 @@ def failure_injector_loop(): spec = TopicSpec(partition_count=6, replication_factor=3) self.client().create_topic(spec) self.topic = spec.name + # use two groups, one of them will be deleted to test migrating tombstone records + group_id = 'test-group-1' + second_group_id = 'test-group-2' self.start_producer(1, throughput=5000) - self.start_consumer(1) + self.start_consumer(1, group_id=group_id) self.await_startup() def cluster_is_stable(): @@ -105,6 +108,10 @@ def cluster_is_stable(): kcl = KCL(self.redpanda) + rpk = RpkTool(self.redpanda) + # start consumer in second group + rpk.consume(spec.name, 100, group=second_group_id) + def _group_present(): return len(kcl.list_groups().splitlines()) > 1 @@ -116,8 +123,18 @@ def _group_present(): assert "__consumer_offsets" not in topics + # delete second group + def _second_group_empty(): + return rpk.group_describe(second_group_id).state == "Empty" + + wait_until(_second_group_empty, 60, 1) + rpk.group_delete(group=second_group_id) + + # recreate second group, recovering a group will require reading tombstones + rpk.consume(spec.name, 100, group=second_group_id) + # enable consumer offsets support - self.redpanda.set_environment({"__REDPANDA_LOGICAL_VERSION": 2}) + self.redpanda.set_environment({}) for n in self.redpanda.nodes: id = self.redpanda.idx(n) suppressed.add(id) @@ -196,7 +213,7 @@ def node_stopped(node_id): assert "__consumer_offsets" not in topics # enable consumer offsets support - self.redpanda.set_environment({"__REDPANDA_LOGICAL_VERSION": 2}) + self.redpanda.set_environment({}) def get_raft0_follower(): ctrl = self.redpanda.controller