From a3342666d15ac5c06b1002ed9e6edb93e836f141 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 10:25:17 +0200 Subject: [PATCH 01/16] k/group: added warn method to group context logger Signed-off-by: Michal Maslanka --- src/v/kafka/server/group.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index 887356658d04..fdfb7098c33c 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -574,6 +574,11 @@ class group { log(ss::log_level::error, format, std::forward(args)...); } + template + void warn(const char* format, Args&&... args) const { + log(ss::log_level::warn, format, std::forward(args)...); + } + template void debug(const char* format, Args&&... args) const { log(ss::log_level::debug, format, std::forward(args)...); From 477a2ba2e056d325ef6d692d1e210272423795b7 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 10:43:30 +0200 Subject: [PATCH 02/16] k/generator: using named type for group.instance.id in request Signed-off-by: Michal Maslanka --- src/v/kafka/protocol/schemata/generator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/v/kafka/protocol/schemata/generator.py b/src/v/kafka/protocol/schemata/generator.py index 554b8ffb592b..da405c9615b2 100644 --- a/src/v/kafka/protocol/schemata/generator.py +++ b/src/v/kafka/protocol/schemata/generator.py @@ -81,6 +81,7 @@ }, }, "MemberId": ("kafka::member_id", "string"), + "GroupInstanceId": ("kafka::group_instance_id", "string"), }, "AddPartitionsToTxnRequestData": { "Topics": { From 27b2cb939c23eaf55f795312cccc3b1d439593ed Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 10:27:58 +0200 Subject: [PATCH 03/16] k/group: added generic checkpoint method Checkpoint assignments may come from either the sync request or the members metadata stored in group metadata. Added generic checkpoint method that allows to build a checkpoint using different assignments providers. Signed-off-by: Michal Maslanka --- src/v/kafka/server/group.cc | 31 ++----------------------------- src/v/kafka/server/group.h | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 6a58c2aad3b6..1e022f14b8d1 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -1155,35 +1155,8 @@ group::sync_group_stages group::handle_sync_group(sync_group_request&& r) { } model::record_batch group::checkpoint(const assignments_type& assignments) { - group_metadata_key key; - key.group_id = _id; - group_metadata_value metadata; - - metadata.protocol_type = protocol_type().value_or(kafka::protocol_type("")); - metadata.generation = generation(); - metadata.protocol = protocol(); - metadata.leader = leader(); - metadata.state_timestamp = _state_timestamp; - - for (const auto& it : _members) { - auto& member = it.second; - auto state = it.second->state().copy(); - // this is not coming from the member itself because the checkpoint - // occurs right before the members go live and get their assignments. - state.assignment = bytes_to_iobuf(assignments.at(member->id())); - state.subscription = bytes_to_iobuf( - it.second->get_protocol_metadata(_protocol.value())); - metadata.members.push_back(std::move(state)); - } - - cluster::simple_batch_builder builder( - model::record_batch_type::raft_data, model::offset(0)); - - auto kv = _md_serializer.to_kv( - group_metadata_kv{.key = std::move(key), .value = std::move(metadata)}); - builder.add_raw_kv(std::move(kv.key), std::move(kv.value)); - - return std::move(builder).build(); + return do_checkpoint( + [&assignments](const member_id& id) { return assignments.at(id); }); } group::sync_group_stages group::sync_group_completing_rebalance( diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index fdfb7098c33c..f770b1536b7d 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -637,6 +637,39 @@ class group { } model::record_batch checkpoint(const assignments_type& assignments); + template + model::record_batch do_checkpoint(Func&& assignments_provider) { + kafka::group_metadata_key key; + key.group_id = _id; + kafka::group_metadata_value metadata; + + metadata.protocol_type = protocol_type().value_or( + kafka::protocol_type("")); + metadata.generation = generation(); + metadata.protocol = protocol(); + metadata.leader = leader(); + metadata.state_timestamp = _state_timestamp; + + for (const auto& [id, member] : _members) { + auto state = member->state().copy(); + // this is not coming from the member itself because the checkpoint + // occurs right before the members go live and get their + // assignments. + state.assignment = bytes_to_iobuf(assignments_provider(id)); + state.subscription = bytes_to_iobuf( + member->get_protocol_metadata(_protocol.value())); + metadata.members.push_back(std::move(state)); + } + + cluster::simple_batch_builder builder( + model::record_batch_type::raft_data, model::offset(0)); + + auto kv = _md_serializer.to_kv(group_metadata_kv{ + .key = std::move(key), .value = std::move(metadata)}); + builder.add_raw_kv(std::move(kv.key), std::move(kv.value)); + + return std::move(builder).build(); + } cluster::abort_origin get_abort_origin(const model::producer_identity&, model::tx_seq) const; From a445bfd8824f6a0eead91c1e09d5773c10996eec Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 10:47:11 +0200 Subject: [PATCH 04/16] fixture: make stop_signal part of the fixture Previously the `stop_signal` was destroyed in `fixture::start()` method leading to the signaling stop immediately after fixture application was started. Signed-off-by: Michal Maslanka --- src/v/kafka/client/test/fixture.h | 6 +++--- src/v/kafka/server/tests/topic_recreate_test.cc | 6 +++--- src/v/redpanda/tests/fixture.h | 16 ++++++++++++---- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/v/kafka/client/test/fixture.h b/src/v/kafka/client/test/fixture.h index 074266e75788..b564eedd4552 100644 --- a/src/v/kafka/client/test/fixture.h +++ b/src/v/kafka/client/test/fixture.h @@ -20,7 +20,8 @@ namespace kc = kafka::client; class kafka_client_fixture : public redpanda_thread_fixture { public: void restart() { - app.shutdown(); + shutdown(); + app_signal = std::make_unique<::stop_signal>(); ss::smp::invoke_on_all([this] { auto& config = config::shard_local_cfg(); config.get("disable_metrics").set_value(false); @@ -29,8 +30,7 @@ class kafka_client_fixture : public redpanda_thread_fixture { app.check_environment(); app.configure_admin_server(); app.wire_up_services(); - ::stop_signal app_signal; - app.start(app_signal); + app.start(*app_signal); } kc::client make_client() { return kc::client{proxy_client_config()}; } diff --git a/src/v/kafka/server/tests/topic_recreate_test.cc b/src/v/kafka/server/tests/topic_recreate_test.cc index 1a0d9835696a..94cb671b8377 100644 --- a/src/v/kafka/server/tests/topic_recreate_test.cc +++ b/src/v/kafka/server/tests/topic_recreate_test.cc @@ -122,7 +122,8 @@ class recreate_test_fixture : public redpanda_thread_fixture { } void restart() { - app.shutdown(); + shutdown(); + app_signal = std::make_unique<::stop_signal>(); ss::smp::invoke_on_all([this] { auto& config = config::shard_local_cfg(); config.get("disable_metrics").set_value(false); @@ -131,8 +132,7 @@ class recreate_test_fixture : public redpanda_thread_fixture { app.check_environment(); app.configure_admin_server(); app.wire_up_services(); - ::stop_signal app_signal; - app.start(app_signal); + app.start(*app_signal); } }; diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index d3a11869828d..c0d3cacfac2d 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -66,7 +66,8 @@ class redpanda_thread_fixture { , proxy_port(proxy_port) , schema_reg_port(schema_reg_port) , data_dir(std::move(base_dir)) - , remove_on_shutdown(remove_on_shutdown) { + , remove_on_shutdown(remove_on_shutdown) + , app_signal(std::make_unique<::stop_signal>()) { configure( node_id, kafka_port, @@ -82,8 +83,7 @@ class redpanda_thread_fixture { app.check_environment(); app.configure_admin_server(); app.wire_up_services(); - ::stop_signal app_signal; - app.start(app_signal); + app.start(*app_signal); // used by request context builder proto = std::make_unique( @@ -137,12 +137,19 @@ class redpanda_thread_fixture { true) {} ~redpanda_thread_fixture() { - app.shutdown(); + shutdown(); if (remove_on_shutdown) { std::filesystem::remove_all(data_dir); } } + void shutdown() { + if (!app_signal->abort_source().abort_requested()) { + app_signal->abort_source().request_abort(); + } + app.shutdown(); + } + config::configuration& lconf() { return config::shard_local_cfg(); } void configure( @@ -431,4 +438,5 @@ class redpanda_thread_fixture { std::filesystem::path data_dir; std::unique_ptr proto; bool remove_on_shutdown; + std::unique_ptr<::stop_signal> app_signal; }; From 10ac228765eb891e472ccc6a21b3b3f965e79d7e Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 10:34:22 +0200 Subject: [PATCH 05/16] k/group: handle static membership protocol Added handling members with `group.instance.id` set i.e. static members. When static member joins the group its `member_id` is identified using a mapping stored in `_static_members` map. This way broker can identify the `member_id` that is assigned to requested instance id. In dynamic membership protocol `member_id` is assigned by broker after each restart of consumer (there is no state persistence on the client side) hence every consumer restart causes rebalance since member with new id joins the group. With static members, when `group.instance.id` is present in the `JoinRequest` and mapping is already present coordinator will use that mapping to find member metadata. When using static membership protocol consumers does not send `LeaveGroup` requests when stopping. More information: [KIP-345](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances) Signed-off-by: Michal Maslanka --- src/v/kafka/protocol/leave_group.h | 2 + src/v/kafka/server/group.cc | 375 +++++++++++++++--- src/v/kafka/server/group.h | 38 ++ src/v/kafka/server/group_manager.cc | 19 +- src/v/kafka/server/handlers/leave_group.cc | 14 +- src/v/kafka/server/handlers/leave_group.h | 2 +- src/v/kafka/server/member.h | 5 + src/v/kafka/server/tests/CMakeLists.txt | 1 - src/v/kafka/server/tests/group_test.cc | 10 +- .../kafka/server/tests/offset_commit_test.cc | 38 -- 10 files changed, 410 insertions(+), 94 deletions(-) delete mode 100644 src/v/kafka/server/tests/offset_commit_test.cc diff --git a/src/v/kafka/protocol/leave_group.h b/src/v/kafka/protocol/leave_group.h index b6c4903c63df..28926897619e 100644 --- a/src/v/kafka/protocol/leave_group.h +++ b/src/v/kafka/protocol/leave_group.h @@ -37,6 +37,8 @@ struct leave_group_request final { // set during request processing after mapping group to ntp model::ntp ntp; + // additional context, set in decode + api_version version; void encode(response_writer& writer, api_version version) { data.encode(writer, version); diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 1e022f14b8d1..d0fcd5cee0ac 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -25,6 +25,7 @@ #include "likely.h" #include "model/fundamental.h" #include "raft/errc.h" +#include "ssx/future-util.h" #include "storage/record_batch_builder.h" #include "utils/to_string.h" #include "vassert.h" @@ -190,6 +191,17 @@ bool group::supports_protocols(const join_group_request& r) const { } void group::add_member_no_join(member_ptr member) { + if (member->group_instance_id()) { + auto [_, success] = _static_members.emplace( + *member->group_instance_id(), member->id()); + if (!success) { + throw std::runtime_error(fmt::format( + "group already contains member with group instance id: {}, group " + "state: {}", + member, + *this)); + } + } if (_members.empty()) { _protocol_type = member->protocol_type(); } @@ -219,7 +231,7 @@ ss::future group::add_member(member_ptr member) { return member->get_join_response(); } -ss::future group::update_member( +void group::update_member_no_join( member_ptr member, std::vector&& new_protocols) { vlog( _ctxlog.trace, @@ -246,6 +258,11 @@ ss::future group::update_member( for (auto& p : member->protocols()) { _supported_protocols[p.name]++; } +} + +ss::future group::update_member( + member_ptr member, std::vector&& new_protocols) { + update_member_no_join(member, std::move(new_protocols)); if (!member->is_joining()) { _num_members_joining++; @@ -509,10 +526,187 @@ group::join_group_unknown_member(join_group_request&& r) { } auto new_member_id = group::generate_member_id(r); + if (r.data.group_instance_id) { + return add_new_static_member(std::move(new_member_id), std::move(r)); + } + return add_new_dynamic_member(std::move(new_member_id), std::move(r)); +} + +group::join_group_stages +group::add_new_static_member(member_id new_member_id, join_group_request&& r) { + auto it = _static_members.find(r.data.group_instance_id.value()); + + if (it != _static_members.end()) { + vlog( + _ctxlog.debug, + "Static member with unknown member id and instance id {} joins the " + "group, replacing previously mapped {} member id with new member_id " + "{}", + r.data.group_instance_id.value(), + it->second, + new_member_id); + return update_static_member_and_rebalance( + it->second, std::move(new_member_id), std::move(r)); + } + vlog( + _ctxlog.debug, + "Static member with unknown member id and instance id {} joins the " + "group new member id: {}", + r.data.group_instance_id.value(), + new_member_id); + return add_member_and_rebalance(std::move(new_member_id), std::move(r)); +} + +group::join_group_stages group::update_static_member_and_rebalance( + member_id old_member_id, member_id new_member_id, join_group_request&& r) { + auto member = replace_static_member( + *r.data.group_instance_id, old_member_id, new_member_id); + /* + * Heartbeat of old member id will expire without effect since the + * group no longer contains that member id. New heartbeat shall be scheduled + * with new member id. + */ + schedule_next_heartbeat_expiration(member); + auto f = update_member(member, r.native_member_protocols()); + auto old_protocols = _members.at(new_member_id)->protocols(); + switch (state()) { + case group_state::stable: { + auto next_gen_protocol = select_protocol(); + if (_protocol == next_gen_protocol) { + vlog( + _ctxlog.trace, + "static member {} joins in stable state with the protocol that " + "does not require group protocol update, will not trigger " + "rebalance", + r.data.group_instance_id); + ssx::background + = store_group(checkpoint()) + .then([this, + member, + instance_id = *r.data.group_instance_id, + new_member_id = std::move(new_member_id), + old_member_id = std::move(old_member_id), + old_protocols = std::move(old_protocols)]( + result result) mutable { + if (!result) { + vlog( + _ctxlog.warn, + "unable to store group checkpoint - {}", + result.error()); + // Failed to persist member.id of the + // given static member, revert the update of + // the static member in the group. + auto member = replace_static_member( + instance_id, new_member_id, old_member_id); + update_member_no_join( + member, std::move(old_protocols)); + schedule_next_heartbeat_expiration(member); + try_finish_joining_member( + member, + make_join_error( + unknown_member_id, + map_store_offset_error_code(result.error()))); + return; + } + // leader -> member metadata + // followers -> [] + std::vector md; + if (is_leader(new_member_id)) { + md = member_metadata(); + } + + auto reply = join_group_response( + error_code::none, + generation(), + protocol().value_or(kafka::protocol_name()), + leader().value_or(kafka::member_id()), + new_member_id, + std::move(md)); + try_finish_joining_member(member, std::move(reply)); + }); + return join_group_stages(std::move(f)); + } else { + vlog( + _ctxlog.trace, + "static member {} joins in stable state with the protocol that " + "requires group protocol update, trying to trigger rebalance", + r.data.group_instance_id); + try_prepare_rebalance(); + } + return join_group_stages(std::move(f)); + } + case group_state::preparing_rebalance: + return join_group_stages(std::move(f)); + case group_state::completing_rebalance: + vlog( + _ctxlog.trace, + "updating static member {} (member_id: {}) metadata", + member->group_instance_id().value(), + member->id()); + try_prepare_rebalance(); + return join_group_stages(std::move(f)); + case group_state::empty: + [[fallthrough]]; + case group_state::dead: + throw std::runtime_error(fmt::format( + "group was not supposed to be in {} state when the unknown static " + "member {} rejoins, group state: {}", + state(), + r.data.group_instance_id, + *this)); + } +} + +member_ptr group::replace_static_member( + const group_instance_id& instance_id, + const member_id& old_member_id, + const member_id& new_member_id) { + auto it = _members.find(old_member_id); + if (it == _members.end()) { + throw std::runtime_error(fmt::format( + "can not replace not existing member with id {}, group state: {}", + old_member_id, + *this)); + } + auto member = it->second; + _members.erase(it); + member->replace_id(new_member_id); + _members.emplace(new_member_id, member); + + if (member->is_joining()) { + try_finish_joining_member( + member, + make_join_error(old_member_id, error_code::fenced_instance_id)); + } + if (member->is_syncing()) { + member->set_sync_response( + sync_group_response(error_code::fenced_instance_id)); + } + if (is_leader(old_member_id)) { + _leader = member->id(); + } + + auto static_it = _static_members.find(instance_id); + vassert( + static_it != _static_members.end(), + "Static member that is going to be replaced must existing in group " + "static members map. instance_id: {}, group_state: {}", + instance_id, + *this); + static_it->second = new_member_id; + return member; +} + +group::join_group_stages +group::add_new_dynamic_member(member_id new_member_id, join_group_request&& r) { + vassert( + !r.data.group_instance_id, + "Group instance id must be empty for dynamic members, request: {}", + r.data); // Only return MEMBER_ID_REQUIRED error if joinGroupRequest version // is >= 4 and groupInstanceId is configured to unknown. - if (r.version >= api_version(4) && !r.data.group_instance_id) { + if (r.version >= api_version(4)) { // If member id required (dynamic membership), register the // member in the pending member list and send back a response to // call for another join group request with allocated member id. @@ -548,13 +742,15 @@ group::join_group_known_member(join_group_request&& r) { kafka::member_id new_member_id = std::move(r.data.member_id); return add_member_and_rebalance(std::move(new_member_id), std::move(r)); - } else if (!contains_member(r.data.member_id)) { + } else if (auto ec = validate_existing_member( + r.data.member_id, r.data.group_instance_id, "join"); + ec != error_code::none) { vlog( _ctxlog.trace, - "Join rejected for unregistered member {}", - r.data.member_id); - return join_group_stages( - make_join_error(r.data.member_id, error_code::unknown_member_id)); + "Join rejected for invalid member {} - {}", + r.data.member_id, + ec); + return join_group_stages(make_join_error(r.data.member_id, ec)); } auto member = get_member(r.data.member_id); @@ -796,7 +992,7 @@ void group::complete_join() { // this is the old group->remove_unjoined_members(); const auto prev_leader = _leader; for (auto it = _members.begin(); it != _members.end();) { - if (!it->second->is_joining()) { + if (!it->second->is_joining() && !it->second->is_static()) { vlog(_ctxlog.trace, "Removing unjoined member {}", it->first); // cancel the heartbeat timer @@ -862,14 +1058,10 @@ void group::complete_join() { if (in_state(group_state::empty)) { vlog(_ctxlog.trace, "Checkpointing empty group {}", *this); - auto batch = checkpoint(assignments_type{}); - auto reader = model::make_memory_record_batch_reader( - std::move(batch)); - (void)_partition - ->replicate( - std::move(reader), - raft::replicate_options(raft::consistency_level::quorum_ack)) - .then([]([[maybe_unused]] result r) {}); + ssx::background + = store_group(checkpoint(assignments_type{})) + .then( + []([[maybe_unused]] result r) {}); } else { std::for_each( std::cbegin(_members), @@ -1028,6 +1220,9 @@ void group::remove_member(member_ptr member) { vassert(_num_members_joining >= 0, "negative members joining"); } } + if (it->second->group_instance_id()) { + _static_members.erase(it->second->group_instance_id().value()); + } _members.erase(it); } @@ -1083,13 +1278,15 @@ group::sync_group_stages group::handle_sync_group(sync_group_request&& r) { return sync_group_stages( sync_group_response(error_code::coordinator_not_available)); - } else if (!contains_member(r.data.member_id)) { + } else if (auto ec = validate_existing_member( + r.data.member_id, r.data.group_instance_id, "sync"); + ec != error_code::none) { vlog( _ctxlog.trace, - "Sync rejected for unregistered member {}", - r.data.member_id); - return sync_group_stages( - sync_group_response(error_code::unknown_member_id)); + "Sync rejected for invalid member {} - {}", + r.data.member_id, + ec); + return sync_group_stages(sync_group_response(ec)); } else if (r.data.generation_id != generation()) { vlog( @@ -1159,6 +1356,18 @@ model::record_batch group::checkpoint(const assignments_type& assignments) { [&assignments](const member_id& id) { return assignments.at(id); }); } +model::record_batch group::checkpoint() { + return do_checkpoint([this](const member_id& id) { + auto it = _members.find(id); + vassert( + it != _members.end(), + "Checkpointed member {} must be part of the group {}", + id, + *this); + return it->second->assignment(); + }); +} + group::sync_group_stages group::sync_group_completing_rebalance( member_ptr member, sync_group_request&& r) { // this response will be set by the leader when it arrives. the leader also @@ -1185,13 +1394,7 @@ group::sync_group_stages group::sync_group_completing_rebalance( auto assignments = std::move(r).member_assignments(); add_missing_assignments(assignments); - auto batch = checkpoint(assignments); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); - - auto f = _partition - ->replicate( - std::move(reader), - raft::replicate_options(raft::consistency_level::quorum_ack)) + auto f = store_group(checkpoint(assignments)) .then([this, response = std::move(response), expected_generation = generation(), @@ -1246,12 +1449,15 @@ ss::future group::handle_heartbeat(heartbeat_request&& r) { vlog(_ctxlog.trace, "Heartbeat rejected for group state {}", _state); return make_heartbeat_error(error_code::coordinator_not_available); - } else if (!contains_member(r.data.member_id)) { + } else if (auto ec = validate_existing_member( + r.data.member_id, r.data.group_instance_id, "heartbeat"); + ec != error_code::none) { vlog( _ctxlog.trace, - "Heartbeat rejected for unregistered member {}", - r.data.member_id); - return make_heartbeat_error(error_code::unknown_member_id); + "Heartbeat rejected for invalid member {} - {}", + r.data.member_id, + ec); + return make_heartbeat_error(ec); } else if (r.data.generation_id != generation()) { vlog( @@ -1294,30 +1500,72 @@ ss::future group::handle_heartbeat(heartbeat_request&& r) { ss::future group::handle_leave_group(leave_group_request&& r) { vlog(_ctxlog.trace, "Handling leave group request {}", r); - if (in_state(group_state::dead)) { vlog(_ctxlog.trace, "Leave rejected for group state {}", _state); return make_leave_error(error_code::coordinator_not_available); + } + /** + * Leave group prior to version 3 were requesting only a single member + * leave. + */ + if (r.version < api_version(3)) { + auto err = member_leave_group(r.data.member_id, std::nullopt); + return make_leave_error(err); + } - } else if (contains_pending_member(r.data.member_id)) { + leave_group_response response(error_code::none); + + response.data.members.reserve(r.data.members.size()); + for (auto& m : r.data.members) { + auto ec = member_leave_group(m.member_id, m.group_instance_id); + response.data.members.push_back(member_response{ + .member_id = m.member_id, + .group_instance_id = m.group_instance_id, + .error_code = ec}); + } + return ss::make_ready_future(std::move(response)); +} + +kafka::error_code group::member_leave_group( + const member_id& member_id, + const std::optional& group_instance_id) { + // The LeaveGroup API allows administrative removal of members by + // GroupInstanceId in which case we expect the MemberId to be + // undefined. + if (member_id == unknown_member_id) { + if (!group_instance_id) { + return error_code::unknown_member_id; + } + + auto it = _static_members.find(*group_instance_id); + if (it == _static_members.end()) { + return error_code::unknown_member_id; + } + // recurse with found mapping + return member_leave_group(it->second, group_instance_id); + } + + if (contains_pending_member(member_id)) { // if a pending member is leaving, it needs to be removed // from the pending list, heartbeat cancelled and if necessary, // prompt a JoinGroup completion. - remove_pending_member(r.data.member_id); - return make_leave_error(error_code::none); + remove_pending_member(member_id); + return error_code::none; - } else if (!contains_member(r.data.member_id)) { + } else if (auto ec = validate_existing_member( + member_id, group_instance_id, "leave"); + ec != error_code::none) { vlog( _ctxlog.trace, - "Leave rejected for unregistered member {}", - r.data.member_id); - return make_leave_error(error_code::unknown_member_id); - + "Leave rejected for invalid member {} - {}", + member_id, + ec); + return ec; } else { - auto member = get_member(r.data.member_id); + auto member = get_member(member_id); member->expire_timer().cancel(); remove_member(member); - return make_leave_error(error_code::none); + return error_code::none; } } @@ -2007,9 +2255,10 @@ group::handle_offset_commit(offset_commit_request&& r) { // The group is only using Kafka to store offsets. return store_offsets(std::move(r)); - } else if (!contains_member(r.data.member_id)) { - return offset_commit_stages( - offset_commit_response(r, error_code::unknown_member_id)); + } else if (auto ec = validate_existing_member( + r.data.member_id, r.data.group_instance_id, "offset-commit"); + ec != error_code::none) { + return offset_commit_stages(offset_commit_response(r, ec)); } else if (r.data.generation_id != generation()) { return offset_commit_stages( @@ -2292,6 +2541,40 @@ group::remove_topic_partitions(const std::vector& tps) { } } +ss::future> +group::store_group(model::record_batch batch) { + return _partition->replicate( + model::make_memory_record_batch_reader(std::move(batch)), + raft::replicate_options(raft::consistency_level::quorum_ack)); +} + +error_code group::validate_existing_member( + const member_id& member_id, + const std::optional& instance_id, + const ss::sstring& ctx) const { + if (instance_id) { + auto it = _static_members.find(*instance_id); + if (it == _static_members.end()) { + return error_code::unknown_member_id; + } else if (it->second != member_id) { + vlog( + _ctxlog.info, + "operation: {} - static member with instance id {} has different " + "member_id assigned, current: {}, requested: {}", + ctx, + *instance_id, + it->second, + member_id); + return error_code::fenced_instance_id; + } + } else { + if (!_members.contains(member_id)) { + return error_code::unknown_member_id; + } + } + return error_code::none; +} + std::ostream& operator<<(std::ostream& o, const group& g) { const auto ntp = [&g] { if (g._partition) { diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index f770b1536b7d..e31e69dcc3fc 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -13,6 +13,7 @@ #include "cluster/fwd.h" #include "cluster/logger.h" #include "cluster/partition.h" +#include "cluster/simple_batch_builder.h" #include "cluster/tx_utils.h" #include "kafka/group_probe.h" #include "kafka/protocol/fwd.h" @@ -309,6 +310,12 @@ class group { */ ss::future update_member( member_ptr member, std::vector&& new_protocols); + /** + * Same as update_member but without returning the join promise. Used when + * reverting member state after failed group checkpoint + */ + void update_member_no_join( + member_ptr member, std::vector&& new_protocols); /** * \brief Get the timeout duration for rebalancing. @@ -408,6 +415,22 @@ class group { join_group_stages update_member_and_rebalance( member_ptr member, join_group_request&& request); + /// Add new member with dynamic membership + group::join_group_stages + add_new_dynamic_member(member_id, join_group_request&&); + + /// Add new member with static membership + group::join_group_stages + add_new_static_member(member_id, join_group_request&&); + + /// Replaces existing static member with the new one + member_ptr replace_static_member( + const group_instance_id&, const member_id&, const member_id&); + + /// Updates existing static member and initiate rebalance if needed + group::join_group_stages update_static_member_and_rebalance( + member_id, member_id, join_group_request&&); + /// Transition to preparing rebalance if possible. void try_prepare_rebalance(); @@ -445,6 +468,10 @@ class group { ss::future handle_leave_group(leave_group_request&& r); + /// Handle leave group for single member + kafka::error_code member_leave_group( + const member_id&, const std::optional&); + std::optional offset(const model::topic_partition& tp) const { if (auto it = _offsets.find(tp); it != _offsets.end()) { @@ -549,6 +576,14 @@ class group { return _partition; } + ss::future> store_group(model::record_batch); + + // validates state of a member existing in a group + error_code validate_existing_member( + const member_id&, + const std::optional&, + const ss::sstring&) const; + // shutdown group. cancel all pending operations void shutdown(); @@ -637,6 +672,8 @@ class group { } model::record_batch checkpoint(const assignments_type& assignments); + model::record_batch checkpoint(); + template model::record_batch do_checkpoint(Func&& assignments_provider) { kafka::group_metadata_key key; @@ -680,6 +717,7 @@ class group { kafka::generation_id _generation; protocol_support _supported_protocols; member_map _members; + absl::node_hash_map _static_members; int _num_members_joining; absl::node_hash_map> _pending_members; diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index e8e425adeacc..1035d56a5a1f 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -643,7 +643,24 @@ group_manager::leave_group(leave_group_request&& r) { klog.trace, "Cannot handle leave group request for unknown group {}", r.data.group_id); - return make_leave_error(error_code::unknown_member_id); + if (r.version < api_version(3)) { + return make_leave_error(error_code::unknown_member_id); + } + // since version 3 we need to fill each member error code + leave_group_response response; + response.data.members.reserve(r.data.members.size()); + std::transform( + r.data.members.begin(), + r.data.members.end(), + std::back_inserter(response.data.members), + [](member_identity& mid) { + return member_response{ + .member_id = std::move(mid.member_id), + .group_instance_id = std::move(mid.group_instance_id), + .error_code = error_code::unknown_member_id, + }; + }); + return ss::make_ready_future(std::move(response)); } } diff --git a/src/v/kafka/server/handlers/leave_group.cc b/src/v/kafka/server/handlers/leave_group.cc index fe290a2dbf40..0f3cba1e5778 100644 --- a/src/v/kafka/server/handlers/leave_group.cc +++ b/src/v/kafka/server/handlers/leave_group.cc @@ -9,10 +9,12 @@ #include "kafka/server/handlers/leave_group.h" +#include "kafka/protocol/schemata/leave_group_request.h" #include "kafka/server/group_manager.h" #include "kafka/server/group_router.h" #include "kafka/server/request_context.h" #include "kafka/server/response.h" +#include "kafka/types.h" #include "utils/remote.h" #include "utils/to_string.h" @@ -21,11 +23,19 @@ namespace kafka { +namespace { +leave_group_request decode_request(request_context& ctx) { + leave_group_request request; + request.decode(ctx.reader(), ctx.header().version); + request.version = ctx.header().version; + return request; +} +} // namespace + template<> ss::future leave_group_handler::handle( request_context ctx, [[maybe_unused]] ss::smp_service_group g) { - leave_group_request request; - request.decode(ctx.reader(), ctx.header().version); + leave_group_request request = decode_request(ctx); if (!ctx.authorized(security::acl_operation::read, request.data.group_id)) { co_return co_await ctx.respond( diff --git a/src/v/kafka/server/handlers/leave_group.h b/src/v/kafka/server/handlers/leave_group.h index 19fcb5dba594..a959c6dc4ddd 100644 --- a/src/v/kafka/server/handlers/leave_group.h +++ b/src/v/kafka/server/handlers/leave_group.h @@ -14,6 +14,6 @@ namespace kafka { -using leave_group_handler = handler; +using leave_group_handler = handler; } diff --git a/src/v/kafka/server/member.h b/src/v/kafka/server/member.h index c4b396ea2504..d54c6a6f6630 100644 --- a/src/v/kafka/server/member.h +++ b/src/v/kafka/server/member.h @@ -82,6 +82,8 @@ class group_member { /// Get the member id. const kafka::member_id& id() const { return _state.id; } + void replace_id(member_id new_id) { _state.id = std::move(new_id); } + /// Get the id of the member's group. const kafka::group_id& group_id() const { return _group_id; } @@ -142,6 +144,9 @@ class group_member { /// Check if the member is syncing. bool is_syncing() const { return bool(_sync_promise); } + /// Check if member is static + bool is_static() const { return bool(_state.instance_id); } + /** * Get the sync response. * diff --git a/src/v/kafka/server/tests/CMakeLists.txt b/src/v/kafka/server/tests/CMakeLists.txt index 0d6f05fcb426..8f92a61f47f2 100644 --- a/src/v/kafka/server/tests/CMakeLists.txt +++ b/src/v/kafka/server/tests/CMakeLists.txt @@ -25,7 +25,6 @@ set(srcs create_topics_test.cc find_coordinator_test.cc list_offsets_test.cc - offset_commit_test.cc topic_recreate_test.cc fetch_session_test.cc alter_config_test.cc diff --git a/src/v/kafka/server/tests/group_test.cc b/src/v/kafka/server/tests/group_test.cc index 006d6febd934..5ae58b22faf3 100644 --- a/src/v/kafka/server/tests/group_test.cc +++ b/src/v/kafka/server/tests/group_test.cc @@ -63,7 +63,7 @@ static member_ptr get_group_member( return ss::make_lw_shared( kafka::member_id(id), kafka::group_id("g"), - kafka::group_instance_id("i"), + kafka::group_instance_id(fmt::format("i-{}", id)), kafka::client_id("client-id"), kafka::client_host("client-host"), std::chrono::seconds(1), @@ -167,7 +167,7 @@ SEASTAR_THREAD_TEST_CASE(rebalance_timeout) { auto m0 = ss::make_lw_shared( kafka::member_id("m"), kafka::group_id("g"), - kafka::group_instance_id("i"), + kafka::group_instance_id("i-1"), kafka::client_id("client-id"), kafka::client_host("client-host"), std::chrono::seconds(1), @@ -178,7 +178,7 @@ SEASTAR_THREAD_TEST_CASE(rebalance_timeout) { auto m1 = ss::make_lw_shared( kafka::member_id("n"), kafka::group_id("g"), - kafka::group_instance_id("i"), + kafka::group_instance_id("i-2"), kafka::client_id("client-id"), kafka::client_host("client-host"), std::chrono::seconds(1), @@ -382,7 +382,7 @@ SEASTAR_THREAD_TEST_CASE(supports_protocols) { auto m = ss::make_lw_shared( kafka::member_id("m"), kafka::group_id("g"), - kafka::group_instance_id("i"), + kafka::group_instance_id("i-1"), kafka::client_id("client-id"), kafka::client_host("client-host"), std::chrono::seconds(1), @@ -412,7 +412,7 @@ SEASTAR_THREAD_TEST_CASE(supports_protocols) { auto m2 = ss::make_lw_shared( kafka::member_id("n"), kafka::group_id("g"), - kafka::group_instance_id("i"), + kafka::group_instance_id("i-2"), kafka::client_id("client-id"), kafka::client_host("client-host"), std::chrono::seconds(1), diff --git a/src/v/kafka/server/tests/offset_commit_test.cc b/src/v/kafka/server/tests/offset_commit_test.cc deleted file mode 100644 index 8ac835a49a0c..000000000000 --- a/src/v/kafka/server/tests/offset_commit_test.cc +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2020 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -#include "kafka/protocol/offset_commit.h" -#include "redpanda/tests/fixture.h" -#include "resource_mgmt/io_priority.h" -#include "test_utils/async.h" - -#include - -#include -#include - -FIXTURE_TEST( - offset_commit_static_membership_not_supported, redpanda_thread_fixture) { - auto client = make_kafka_client().get0(); - client.connect().get(); - - kafka::offset_commit_request req; - req.data.group_instance_id = kafka::group_instance_id("g"); - req.data.topics = {{ - .name = model::topic("t"), - .partitions = {{}}, - }}; - - auto resp = client.dispatch(req, kafka::api_version(7)).get0(); - client.stop().then([&client] { client.shutdown(); }).get(); - - BOOST_TEST( - resp.data.topics[0].partitions[0].error_code - == kafka::error_code::unsupported_version); -} From 578b1bb46566be43011d39f875c3266a24a3e625 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 11:02:46 +0200 Subject: [PATCH 06/16] k/join_group: support static membership protocol in join group Signed-off-by: Michal Maslanka --- src/v/kafka/server/handlers/join_group.cc | 5 ----- src/v/kafka/server/handlers/join_group.h | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/src/v/kafka/server/handlers/join_group.cc b/src/v/kafka/server/handlers/join_group.cc index f1d82a47e443..f7bc53b80a4e 100644 --- a/src/v/kafka/server/handlers/join_group.cc +++ b/src/v/kafka/server/handlers/join_group.cc @@ -40,11 +40,6 @@ process_result_stages join_group_handler::handle( join_group_request request; decode_request(ctx, request); - if (request.data.group_instance_id) { - return process_result_stages::single_stage( - ctx.respond(join_group_response(error_code::unsupported_version))); - } - if (!ctx.authorized(security::acl_operation::read, request.data.group_id)) { return process_result_stages::single_stage(ctx.respond( join_group_response(error_code::group_authorization_failed))); diff --git a/src/v/kafka/server/handlers/join_group.h b/src/v/kafka/server/handlers/join_group.h index 69397afbb14b..1d3ec508350a 100644 --- a/src/v/kafka/server/handlers/join_group.h +++ b/src/v/kafka/server/handlers/join_group.h @@ -17,7 +17,7 @@ namespace kafka { struct join_group_handler { using api = join_group_api; static constexpr api_version min_supported = api_version(0); - static constexpr api_version max_supported = api_version(4); + static constexpr api_version max_supported = api_version(5); static process_result_stages handle(request_context, ss::smp_service_group); }; } // namespace kafka From 1a64134ced5f21b6793512e8320babc1835df5e7 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 11:03:53 +0200 Subject: [PATCH 07/16] k/group_manager: support static membership in group manager Signed-off-by: Michal Maslanka --- src/v/kafka/server/group_manager.cc | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 1035d56a5a1f..e28d8cd3c0ae 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -559,12 +559,6 @@ group::join_group_stages group_manager::join_group(join_group_request&& r) { } group::sync_group_stages group_manager::sync_group(sync_group_request&& r) { - if (r.data.group_instance_id) { - vlog(klog.trace, "Static group membership is not supported"); - return group::sync_group_stages( - sync_group_response(error_code::unsupported_version)); - } - auto error = validate_group_status( r.ntp, r.data.group_id, sync_group_api::key); if (error != error_code::none) { @@ -598,11 +592,6 @@ group::sync_group_stages group_manager::sync_group(sync_group_request&& r) { } ss::future group_manager::heartbeat(heartbeat_request&& r) { - if (r.data.group_instance_id) { - vlog(klog.trace, "Static group membership is not supported"); - return make_heartbeat_error(error_code::unsupported_version); - } - auto error = validate_group_status( r.ntp, r.data.group_id, heartbeat_api::key); if (error != error_code::none) { From b9295d90010927942d58ee34995b4a07d2d636ea Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 11:05:09 +0200 Subject: [PATCH 08/16] k/offset_commit: support static membership in offset commit handler Signed-off-by: Michal Maslanka --- src/v/kafka/server/handlers/offset_commit.cc | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/v/kafka/server/handlers/offset_commit.cc b/src/v/kafka/server/handlers/offset_commit.cc index 8aee22812476..ddaa68616aab 100644 --- a/src/v/kafka/server/handlers/offset_commit.cc +++ b/src/v/kafka/server/handlers/offset_commit.cc @@ -59,11 +59,6 @@ offset_commit_handler::handle(request_context ctx, ss::smp_service_group ssg) { request.decode(ctx.reader(), ctx.header().version); vlog(klog.trace, "Handling request {}", request); - if (request.data.group_instance_id) { - return process_result_stages::single_stage(ctx.respond( - offset_commit_response(request, error_code::unsupported_version))); - } - // check authorization for this group const auto group_authorized = ctx.authorized( security::acl_operation::read, request.data.group_id); From abbfabdceaa2312dac239162deafa4b10b829f99 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 11:05:41 +0200 Subject: [PATCH 09/16] k/describe_groups: support static membership in describe groups handler Signed-off-by: Michal Maslanka --- src/v/kafka/server/handlers/describe_groups.cc | 18 ++++++++++++++++-- src/v/kafka/server/handlers/describe_groups.h | 2 +- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/v/kafka/server/handlers/describe_groups.cc b/src/v/kafka/server/handlers/describe_groups.cc index 91954ab01166..aece947dbb3c 100644 --- a/src/v/kafka/server/handlers/describe_groups.cc +++ b/src/v/kafka/server/handlers/describe_groups.cc @@ -12,6 +12,7 @@ #include "kafka/protocol/errors.h" #include "kafka/server/group_manager.h" #include "kafka/server/group_router.h" +#include "kafka/server/handlers/details/security.h" #include "kafka/server/request_context.h" #include "kafka/server/response.h" #include "model/namespace.h" @@ -21,6 +22,17 @@ #include namespace kafka { +namespace { +ss::future describe_group( + group_id group, bool include_authorized_operations, request_context& ctx) { + auto res = co_await ctx.groups().describe_group(group); + if (include_authorized_operations) { + res.authorized_operations = details::to_bit_field( + details::authorized_operations(ctx, group)); + } + co_return res; +} +} // namespace template<> ss::future @@ -48,8 +60,10 @@ describe_groups_handler::handle(request_context ctx, ss::smp_service_group) { std::vector> described; described.reserve(request.data.groups.size()); for (auto& group_id : request.data.groups) { - described.push_back( - ctx.groups().describe_group(std::move(group_id))); + described.push_back(describe_group( + std::move(group_id), + request.data.include_authorized_operations, + ctx)); } response.data.groups = co_await ss::when_all_succeed( described.begin(), described.end()); diff --git a/src/v/kafka/server/handlers/describe_groups.h b/src/v/kafka/server/handlers/describe_groups.h index f93110d323c5..6f804548b893 100644 --- a/src/v/kafka/server/handlers/describe_groups.h +++ b/src/v/kafka/server/handlers/describe_groups.h @@ -14,6 +14,6 @@ namespace kafka { -using describe_groups_handler = handler; +using describe_groups_handler = handler; } From 745f999346faa5a18cdacdb6e78c4e3d8a28faa6 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 11:07:47 +0200 Subject: [PATCH 10/16] k/tests: added simple consumer group join test Signed-off-by: Michal Maslanka --- src/v/kafka/server/tests/CMakeLists.txt | 1 + .../server/tests/consumer_groups_test.cc | 103 ++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 src/v/kafka/server/tests/consumer_groups_test.cc diff --git a/src/v/kafka/server/tests/CMakeLists.txt b/src/v/kafka/server/tests/CMakeLists.txt index 8f92a61f47f2..3630ba7e10ff 100644 --- a/src/v/kafka/server/tests/CMakeLists.txt +++ b/src/v/kafka/server/tests/CMakeLists.txt @@ -14,6 +14,7 @@ rp_test( ) set(srcs + consumer_groups_test.cc member_test.cc group_test.cc read_write_roundtrip_test.cc diff --git a/src/v/kafka/server/tests/consumer_groups_test.cc b/src/v/kafka/server/tests/consumer_groups_test.cc new file mode 100644 index 000000000000..cb99a9c323bd --- /dev/null +++ b/src/v/kafka/server/tests/consumer_groups_test.cc @@ -0,0 +1,103 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/controller_api.h" +#include "cluster/feature_table.h" +#include "kafka/client/client.h" +#include "kafka/protocol/describe_groups.h" +#include "kafka/protocol/errors.h" +#include "kafka/protocol/find_coordinator.h" +#include "kafka/protocol/join_group.h" +#include "kafka/protocol/schemata/join_group_request.h" +#include "kafka/types.h" +#include "model/fundamental.h" +#include "model/namespace.h" +#include "model/timeout_clock.h" +#include "redpanda/tests/fixture.h" +#include "test_utils/async.h" + +#include +#include + +#include + +using namespace kafka; +join_group_request make_join_group_request( + ss::sstring member_id, + ss::sstring gr, + std::vector protocols, + ss::sstring protocol_type) { + join_group_request req; + req.data.group_id = kafka::group_id(std::move(gr)); + req.data.member_id = kafka::member_id(std::move(member_id)); + req.data.protocol_type = kafka::protocol_type(std::move(protocol_type)); + for (auto& p : protocols) { + req.data.protocols.push_back(join_group_request_protocol{ + .name = protocol_name(std::move(p)), .metadata = bytes{}}); + } + req.data.session_timeout_ms = 10s; + return req; +} +struct consumer_offsets_fixture : public redpanda_thread_fixture { + void wait_for_consumer_offsets_topic() { + app.controller->get_feature_table() + .local() + .await_feature( + cluster::feature::consumer_offsets, + app.controller->get_abort_source().local()) + .get(); + + auto client = make_kafka_client().get0(); + + client.connect().get(); + kafka::find_coordinator_request req("key"); + req.data.key_type = kafka::coordinator_type::group; + client.dispatch(std::move(req), kafka::api_version(1)).get(); + + app.controller->get_api() + .local() + .wait_for_topic( + model::kafka_consumer_offsets_nt, model::timeout_clock::now() + 30s) + .get(); + + tests::cooperative_spin_wait_with_timeout(30s, [&client] { + kafka::describe_groups_request req; + req.data.groups.emplace_back("key"); + return client.dispatch(std::move(req), kafka::api_version(1)) + .then([](kafka::describe_groups_response response) { + return response.data.groups.front().error_code + == kafka::error_code::none; + }); + }).get(); + + client.stop().get(); + client.shutdown(); + } +}; + +FIXTURE_TEST(join_empty_group_static_member, consumer_offsets_fixture) { + wait_for_consumer_offsets_topic(); + auto client = make_kafka_client().get0(); + auto deferred = ss::defer([&client] { + client.stop().then([&client] { client.shutdown(); }).get(); + }); + client.connect().get(); + + auto req = make_join_group_request( + unknown_member_id, "group-test", {"p1", "p2"}, "random"); + // set group instance id + req.data.group_instance_id = kafka::group_instance_id("instance-1"); + auto resp = client.dispatch(std::move(req), kafka::api_version(5)).get0(); + + info("response: {}", resp.data); + + // static we expect coordinator to assign member_id + BOOST_REQUIRE_EQUAL(resp.data.error_code, kafka::error_code::none); + BOOST_REQUIRE_NE(resp.data.member_id, unknown_member_id); +} From c550f60396107df9d4816c61013268602a8bc9e2 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 11:08:11 +0200 Subject: [PATCH 11/16] tests: added kafka v3.0.0 cli tools support Signed-off-by: Michal Maslanka --- tests/rptest/clients/kafka_cli_tools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rptest/clients/kafka_cli_tools.py b/tests/rptest/clients/kafka_cli_tools.py index de94392d4346..fe6e6b667195 100644 --- a/tests/rptest/clients/kafka_cli_tools.py +++ b/tests/rptest/clients/kafka_cli_tools.py @@ -34,7 +34,7 @@ class KafkaCliTools: """ # See tests/docker/Dockerfile to add new versions - VERSIONS = ("2.7.0", "2.5.0", "2.4.1", "2.3.1") + VERSIONS = ("3.0.0", "2.7.0", "2.5.0", "2.4.1", "2.3.1") def __init__(self, redpanda, version=None, user=None, passwd=None): self._redpanda = redpanda From 26f2c698e2ed337b6e268117c87cfc1914a116fc Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 11:08:54 +0200 Subject: [PATCH 12/16] tests/rpk: added group instance id to group describe output Signed-off-by: Michal Maslanka --- tests/rptest/clients/rpk.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/tests/rptest/clients/rpk.py b/tests/rptest/clients/rpk.py index 6ed71d322800..d84ee0e29976 100644 --- a/tests/rptest/clients/rpk.py +++ b/tests/rptest/clients/rpk.py @@ -72,6 +72,7 @@ class RpkGroupPartition(typing.NamedTuple): log_end_offset: Optional[int] lag: Optional[int] member_id: str + instance_id: str client_id: str host: str @@ -306,12 +307,21 @@ def parse_field(field_name, string): assert m is not None, f"Field string '{string}' does not match the pattern" return m['value'] - partition_pattern = re.compile( - "(?P.+) +(?P\d+) +(?P\d+|-) +(?P\d+|-) +(?P-?\d+|-) *(?P.*)? *(?P.*)? *(?P.*)?" + static_member_pattern = re.compile("^([^\s]+\s+){8}[^\s]+$") + + partition_pattern_static_member = re.compile( + "(?P.+) +(?P\d+) +(?P\d+|-) +(?P\d+|-) +(?P-?\d+|-) *(?P[^\s]*)? *(?P[^\s]*) *(?P[^\s]*)? *(?P[^\s]*)?" + ) + partition_pattern_dynamic_member = re.compile( + "(?P.+) +(?P\d+) +(?P\d+|-) +(?P\d+|-) +(?P-?\d+|-) *(?P[^\s]*)? *(?P[^\s]*)? *(?P[^\s]*)?" ) def parse_partition(string): - m = partition_pattern.match(string) + + pattern = partition_pattern_dynamic_member + if static_member_pattern.match(string): + pattern = partition_pattern_static_member + m = pattern.match(string) # Check to see if info for the partition was queried during a change in leadership. # if it was we'd expect to see a partition string ending in; @@ -319,8 +329,6 @@ def parse_partition(string): if m is None and string.find('NOT_LEADER_FOR_PARTITION') != -1: return None - assert m is not None, f"Partition string '{string}' does not match the pattern" - # Account for negative numbers and '-' value all_digits = lambda x: x.lstrip('-').isdigit() @@ -334,6 +342,8 @@ def parse_partition(string): log_end_offset=log_end, lag=lag, member_id=m['member_id'], + instance_id=m.groupdict().get( + 'instance_id', None), client_id=m['client_id'], host=m['host']) From ad3ac60678f27aea9c9d2c86a1a0cab2696fc394 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 11:09:29 +0200 Subject: [PATCH 13/16] tests/kafka_cli_consumer: added formatter properties Added `formatter_properties` parameter to control how messages are formatted when using Kafka CLI tools consumer. Signed-off-by: Michal Maslanka --- tests/rptest/services/kafka_cli_consumer.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/rptest/services/kafka_cli_consumer.py b/tests/rptest/services/kafka_cli_consumer.py index f74f1eb4025b..d891128cbdbc 100644 --- a/tests/rptest/services/kafka_cli_consumer.py +++ b/tests/rptest/services/kafka_cli_consumer.py @@ -25,7 +25,8 @@ def __init__(self, partitions=None, isolation_level=None, from_beginning=False, - consumer_properties={}): + consumer_properties={}, + formatter_properties={}): super(KafkaCliConsumer, self).__init__(context, num_nodes=1) self._redpanda = redpanda self._topic = topic @@ -35,6 +36,7 @@ def __init__(self, self._isolation_level = isolation_level self._from_beginning = from_beginning self._consumer_properties = consumer_properties + self._formatter_properties = formatter_properties self._stopping = threading.Event() assert self._partitions is not None or self._group is not None, "either partitions or group have to be set" @@ -62,6 +64,8 @@ def _worker(self, _, node): cmd += ["--from-beginning"] for k, v in self._consumer_properties.items(): cmd += ['--consumer-property', f"{k}={v}"] + for k, v in self._formatter_properties.items(): + cmd += ['--property', f"{k}={v}"] cmd += ["--bootstrap-server", self._redpanda.brokers()] From 20d7ba5cff0dc82a78d6477a3d4cfc6cf3ea6aaf Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 11:10:21 +0200 Subject: [PATCH 14/16] tests/kafka_cli_consumer: using graceful shutdown when stopping consumer When consumer is being stopped it may send `LeaveGroup` request to the broker. Using clean shutdown (SIGTERM) to give the consumer process oportunity to finish gracefully. Signed-off-by: Michal Maslanka --- tests/rptest/services/kafka_cli_consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rptest/services/kafka_cli_consumer.py b/tests/rptest/services/kafka_cli_consumer.py index d891128cbdbc..16778094be82 100644 --- a/tests/rptest/services/kafka_cli_consumer.py +++ b/tests/rptest/services/kafka_cli_consumer.py @@ -93,4 +93,4 @@ def wait_for_messages(self, messages, timeout=30): def stop_node(self, node): self._stopping.set() - node.account.kill_process("java", clean_shutdown=False) + node.account.kill_process("java", clean_shutdown=True) From e37d1845c15e8dba7aaaab049b84a853f66c25a2 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 11:12:38 +0200 Subject: [PATCH 15/16] tests/kafka_cli_consumer: added wait_for_started method Signed-off-by: Michal Maslanka --- tests/rptest/services/kafka_cli_consumer.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/rptest/services/kafka_cli_consumer.py b/tests/rptest/services/kafka_cli_consumer.py index 16778094be82..ee54865a5f26 100644 --- a/tests/rptest/services/kafka_cli_consumer.py +++ b/tests/rptest/services/kafka_cli_consumer.py @@ -91,6 +91,15 @@ def wait_for_messages(self, messages, timeout=30): timeout, backoff_sec=2) + def wait_for_started(self, timeout=10): + def all_started(): + return all([ + len(node.account.java_pids("ConsoleConsumer")) == 1 + for node in self.nodes + ]) + + wait_until(all_started, timeout, backoff_sec=1) + def stop_node(self, node): self._stopping.set() node.account.kill_process("java", clean_shutdown=True) From a91767eb50de66944b1b9767c8ed6b5771506ebd Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 11 May 2022 11:13:12 +0200 Subject: [PATCH 16/16] tests: added consumer group handling tests Added tests validating basic consumer group operations when using dynamic and static membership protocols. Signed-off-by: Michal Maslanka --- tests/rptest/tests/consumer_group_test.py | 305 ++++++++++++++++++++++ 1 file changed, 305 insertions(+) create mode 100644 tests/rptest/tests/consumer_group_test.py diff --git a/tests/rptest/tests/consumer_group_test.py b/tests/rptest/tests/consumer_group_test.py new file mode 100644 index 000000000000..fbe2b847d48d --- /dev/null +++ b/tests/rptest/tests/consumer_group_test.py @@ -0,0 +1,305 @@ +# Copyright 2020 Redpanda Data, Inc. +# Copyright 2020 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from collections import defaultdict +from rptest.services.cluster import cluster + +from rptest.clients.rpk import RpkTool +from rptest.clients.types import TopicSpec +from rptest.services.kafka_cli_consumer import KafkaCliConsumer +from rptest.services.rpk_producer import RpkProducer +from rptest.tests.redpanda_test import RedpandaTest +from ducktape.utils.util import wait_until +from ducktape.mark import parametrize + + +class ConsumerGroupTest(RedpandaTest): + def __init__(self, test_ctx, *args, **kwargs): + self._ctx = test_ctx + self.producer = None + super(ConsumerGroupTest, self).__init__( + test_ctx, + num_brokers=3, + *args, + # disable leader balancer to make sure that group will not be realoaded because of leadership changes + extra_rp_conf={"enable_leader_balancer": False}, + **kwargs) + + def make_consumer_properties(base_properties, instance_id=None): + properties = {} + properties.update(base_properties) + if instance_id: + properties['group.instance.id'] = instance_id + return properties + + def create_consumer(self, + topic, + group, + instance_id=None, + consumer_properties={}): + return KafkaCliConsumer( + self.test_context, + self.redpanda, + topic=topic, + group=group, + from_beginning=True, + formatter_properties={ + 'print.value': 'false', + 'print.key': 'false', + 'print.partition': 'true', + 'print.offset': 'true', + }, + consumer_properties=ConsumerGroupTest.make_consumer_properties( + consumer_properties, instance_id)) + + def create_consumers(self, + consumer_count, + topic, + group, + static_members, + consumer_properties={}): + + consumers = [] + for i in range(0, consumer_count): + instance_id = f"panda-consumer-{i}" if static_members else None + consumers.append( + self.create_consumer(topic, + group=group, + instance_id=instance_id, + consumer_properties=consumer_properties)) + + for c in consumers: + c.start() + return consumers + + def consumed_at_least(consumers, count): + return all([len(c._messages) > count for c in consumers]) + + def validate_group_state(self, group, expected_state, static_members): + rpk = RpkTool(self.redpanda) + # validate group state + rpk_group = rpk.group_describe(group) + + assert rpk_group.members == 2 + assert rpk_group.state == expected_state + + for p in rpk_group.partitions: + if static_members: + assert 'panda-consumer' in p.instance_id + else: + assert p.instance_id is None + + def setup_producer(self, p_cnt): + # create topic + self.topic_spec = TopicSpec(partition_count=p_cnt, + replication_factor=3) + self.client().create_topic(specs=self.topic_spec) + # produce some messages to the topic + self.producer = RpkProducer(self._ctx, self.redpanda, + self.topic_spec.name, 128, 5000, -1) + self.producer.start() + + @cluster(num_nodes=6) + @parametrize(static_members=True) + @parametrize(static_members=False) + def test_basic_group_join(self, static_members): + """ + Test validating that consumers are able to join the group and consume topic + """ + + self.setup_producer(20) + group = 'test-gr-1' + # use 2 consumers + consumers = self.create_consumers(2, + self.topic_spec.name, + group, + static_members=static_members) + + # wait for some messages + wait_until(lambda: ConsumerGroupTest.consumed_at_least(consumers, 50), + 30, 2) + self.validate_group_state(group, + expected_state="Stable", + static_members=static_members) + + self.producer.wait() + self.producer.free() + + for c in consumers: + c.stop() + c.wait() + c.free() + + @cluster(num_nodes=6) + def test_mixed_consumers_join(self): + """ + Test validating that dynamic and static consumers may exists in the same group + """ + self.setup_producer(20) + group = 'test-gr-1' + consumers = [] + consumers.append( + self.create_consumer(self.topic_spec.name, group, + "panda-instance")) + consumers.append( + self.create_consumer(self.topic_spec.name, group, None)) + + for c in consumers: + c.start() + + # wait for some messages + wait_until(lambda: ConsumerGroupTest.consumed_at_least(consumers, 50), + 30, 2) + + rpk = RpkTool(self.redpanda) + # validate group state + rpk_group = rpk.group_describe(group) + + assert rpk_group.members == 2 + assert rpk_group.state == "Stable" + + static_members = set() + dynamic_members = set() + + for p in rpk_group.partitions: + if p.instance_id: + static_members.add(p.client_id) + else: + dynamic_members.add(p.client_id) + + assert len(static_members) == 1 + assert len(dynamic_members) == 1 + + self.producer.wait() + self.producer.free() + + for c in consumers: + c.stop() + c.wait() + c.free() + + def wait_for_members(self, group, members_count): + rpk = RpkTool(self.redpanda) + + def group_stable(): + rpk_group = rpk.group_describe(group) + return rpk_group.members == members_count and rpk_group.state == "Stable" + + return wait_until(group_stable, 30, 2) + + @cluster(num_nodes=6) + @parametrize(static_members=True) + @parametrize(static_members=False) + def test_consumer_rejoin(self, static_members): + """ + Test validating that re-joining static member will not casuse rebalance + """ + self.setup_producer(20) + group = 'test-gr-1' + + consumers = self.create_consumers( + 2, + self.topic_spec.name, + group, + static_members=static_members, + consumer_properties={"session.timeout.ms": 40000}) + + # wait for some messages + wait_until(lambda: ConsumerGroupTest.consumed_at_least(consumers, 50), + 30, 2) + rpk = RpkTool(self.redpanda) + # at this point we have 2 consumers in stable group + self.validate_group_state(group, + expected_state="Stable", + static_members=static_members) + + # stop one of the consumers + consumers[0].stop() + consumers[0].wait() + + rpk_group = rpk.group_describe(group) + if static_members: + # with static members group should still be in stable state + assert rpk_group.state == "Stable" + assert rpk_group.members == 2 + else: + # consumer will request group leave when shutdown gracefully and it is dynamic + self.wait_for_members(group, 1) + + # start the consumer again + consumers[0].start() + consumers[0].wait_for_started() + # wait for consumer to start + if static_members: + # with static members group should be stable immediately as the + # consumer is rejoining with the same instance id + self.validate_group_state(group, + expected_state="Stable", + static_members=static_members) + else: + # group should get back to its original 2 members state + self.wait_for_members(group, 2) + + self.producer.wait() + self.producer.free() + + for c in consumers: + c.stop() + c.wait() + c.free() + + @cluster(num_nodes=6) + @parametrize(static_members=True) + @parametrize(static_members=False) + def test_consumer_is_removed_when_timedout(self, static_members): + """ + Test validating that consumer is evicted if it failed to deliver heartbeat to the broker + """ + self.setup_producer(20) + group = 'test-gr-1' + # using short session timeout to make the test finish faster + consumers = self.create_consumers( + 2, + self.topic_spec.name, + group, + static_members=static_members, + consumer_properties={"session.timeout.ms": 6000}) + + # wait for some messages + wait_until(lambda: ConsumerGroupTest.consumed_at_least(consumers, 50), + 30, 2) + rpk = RpkTool(self.redpanda) + # at this point we have 2 consumers in stable group + self.validate_group_state(group, + expected_state="Stable", + static_members=static_members) + + # stop one of the consumers + consumers[0].stop() + + # wait for rebalance + self.wait_for_members(group, 1) + + # start the consumer again + consumers[0].start() + + # group should get back to its original 2 members state + self.wait_for_members(group, 2) + self.validate_group_state(group, + expected_state="Stable", + static_members=static_members) + + self.producer.wait() + self.producer.free() + + for c in consumers: + c.stop() + c.wait() + c.free()