diff --git a/src/v/kafka/client/test/fixture.h b/src/v/kafka/client/test/fixture.h index 074266e75788..b564eedd4552 100644 --- a/src/v/kafka/client/test/fixture.h +++ b/src/v/kafka/client/test/fixture.h @@ -20,7 +20,8 @@ namespace kc = kafka::client; class kafka_client_fixture : public redpanda_thread_fixture { public: void restart() { - app.shutdown(); + shutdown(); + app_signal = std::make_unique<::stop_signal>(); ss::smp::invoke_on_all([this] { auto& config = config::shard_local_cfg(); config.get("disable_metrics").set_value(false); @@ -29,8 +30,7 @@ class kafka_client_fixture : public redpanda_thread_fixture { app.check_environment(); app.configure_admin_server(); app.wire_up_services(); - ::stop_signal app_signal; - app.start(app_signal); + app.start(*app_signal); } kc::client make_client() { return kc::client{proxy_client_config()}; } diff --git a/src/v/kafka/protocol/leave_group.h b/src/v/kafka/protocol/leave_group.h index b6c4903c63df..28926897619e 100644 --- a/src/v/kafka/protocol/leave_group.h +++ b/src/v/kafka/protocol/leave_group.h @@ -37,6 +37,8 @@ struct leave_group_request final { // set during request processing after mapping group to ntp model::ntp ntp; + // additional context, set in decode + api_version version; void encode(response_writer& writer, api_version version) { data.encode(writer, version); diff --git a/src/v/kafka/protocol/schemata/generator.py b/src/v/kafka/protocol/schemata/generator.py index 554b8ffb592b..da405c9615b2 100644 --- a/src/v/kafka/protocol/schemata/generator.py +++ b/src/v/kafka/protocol/schemata/generator.py @@ -81,6 +81,7 @@ }, }, "MemberId": ("kafka::member_id", "string"), + "GroupInstanceId": ("kafka::group_instance_id", "string"), }, "AddPartitionsToTxnRequestData": { "Topics": { diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 6a58c2aad3b6..d0fcd5cee0ac 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -25,6 +25,7 @@ #include "likely.h" #include "model/fundamental.h" #include "raft/errc.h" +#include "ssx/future-util.h" #include "storage/record_batch_builder.h" #include "utils/to_string.h" #include "vassert.h" @@ -190,6 +191,17 @@ bool group::supports_protocols(const join_group_request& r) const { } void group::add_member_no_join(member_ptr member) { + if (member->group_instance_id()) { + auto [_, success] = _static_members.emplace( + *member->group_instance_id(), member->id()); + if (!success) { + throw std::runtime_error(fmt::format( + "group already contains member with group instance id: {}, group " + "state: {}", + member, + *this)); + } + } if (_members.empty()) { _protocol_type = member->protocol_type(); } @@ -219,7 +231,7 @@ ss::future group::add_member(member_ptr member) { return member->get_join_response(); } -ss::future group::update_member( +void group::update_member_no_join( member_ptr member, std::vector&& new_protocols) { vlog( _ctxlog.trace, @@ -246,6 +258,11 @@ ss::future group::update_member( for (auto& p : member->protocols()) { _supported_protocols[p.name]++; } +} + +ss::future group::update_member( + member_ptr member, std::vector&& new_protocols) { + update_member_no_join(member, std::move(new_protocols)); if (!member->is_joining()) { _num_members_joining++; @@ -509,10 +526,187 @@ group::join_group_unknown_member(join_group_request&& r) { } auto new_member_id = group::generate_member_id(r); + if (r.data.group_instance_id) { + return add_new_static_member(std::move(new_member_id), std::move(r)); + } + return add_new_dynamic_member(std::move(new_member_id), std::move(r)); +} + +group::join_group_stages +group::add_new_static_member(member_id new_member_id, join_group_request&& r) { + auto it = _static_members.find(r.data.group_instance_id.value()); + + if (it != _static_members.end()) { + vlog( + _ctxlog.debug, + "Static member with unknown member id and instance id {} joins the " + "group, replacing previously mapped {} member id with new member_id " + "{}", + r.data.group_instance_id.value(), + it->second, + new_member_id); + return update_static_member_and_rebalance( + it->second, std::move(new_member_id), std::move(r)); + } + vlog( + _ctxlog.debug, + "Static member with unknown member id and instance id {} joins the " + "group new member id: {}", + r.data.group_instance_id.value(), + new_member_id); + return add_member_and_rebalance(std::move(new_member_id), std::move(r)); +} + +group::join_group_stages group::update_static_member_and_rebalance( + member_id old_member_id, member_id new_member_id, join_group_request&& r) { + auto member = replace_static_member( + *r.data.group_instance_id, old_member_id, new_member_id); + /* + * Heartbeat of old member id will expire without effect since the + * group no longer contains that member id. New heartbeat shall be scheduled + * with new member id. + */ + schedule_next_heartbeat_expiration(member); + auto f = update_member(member, r.native_member_protocols()); + auto old_protocols = _members.at(new_member_id)->protocols(); + switch (state()) { + case group_state::stable: { + auto next_gen_protocol = select_protocol(); + if (_protocol == next_gen_protocol) { + vlog( + _ctxlog.trace, + "static member {} joins in stable state with the protocol that " + "does not require group protocol update, will not trigger " + "rebalance", + r.data.group_instance_id); + ssx::background + = store_group(checkpoint()) + .then([this, + member, + instance_id = *r.data.group_instance_id, + new_member_id = std::move(new_member_id), + old_member_id = std::move(old_member_id), + old_protocols = std::move(old_protocols)]( + result result) mutable { + if (!result) { + vlog( + _ctxlog.warn, + "unable to store group checkpoint - {}", + result.error()); + // Failed to persist member.id of the + // given static member, revert the update of + // the static member in the group. + auto member = replace_static_member( + instance_id, new_member_id, old_member_id); + update_member_no_join( + member, std::move(old_protocols)); + schedule_next_heartbeat_expiration(member); + try_finish_joining_member( + member, + make_join_error( + unknown_member_id, + map_store_offset_error_code(result.error()))); + return; + } + // leader -> member metadata + // followers -> [] + std::vector md; + if (is_leader(new_member_id)) { + md = member_metadata(); + } + + auto reply = join_group_response( + error_code::none, + generation(), + protocol().value_or(kafka::protocol_name()), + leader().value_or(kafka::member_id()), + new_member_id, + std::move(md)); + try_finish_joining_member(member, std::move(reply)); + }); + return join_group_stages(std::move(f)); + } else { + vlog( + _ctxlog.trace, + "static member {} joins in stable state with the protocol that " + "requires group protocol update, trying to trigger rebalance", + r.data.group_instance_id); + try_prepare_rebalance(); + } + return join_group_stages(std::move(f)); + } + case group_state::preparing_rebalance: + return join_group_stages(std::move(f)); + case group_state::completing_rebalance: + vlog( + _ctxlog.trace, + "updating static member {} (member_id: {}) metadata", + member->group_instance_id().value(), + member->id()); + try_prepare_rebalance(); + return join_group_stages(std::move(f)); + case group_state::empty: + [[fallthrough]]; + case group_state::dead: + throw std::runtime_error(fmt::format( + "group was not supposed to be in {} state when the unknown static " + "member {} rejoins, group state: {}", + state(), + r.data.group_instance_id, + *this)); + } +} + +member_ptr group::replace_static_member( + const group_instance_id& instance_id, + const member_id& old_member_id, + const member_id& new_member_id) { + auto it = _members.find(old_member_id); + if (it == _members.end()) { + throw std::runtime_error(fmt::format( + "can not replace not existing member with id {}, group state: {}", + old_member_id, + *this)); + } + auto member = it->second; + _members.erase(it); + member->replace_id(new_member_id); + _members.emplace(new_member_id, member); + if (member->is_joining()) { + try_finish_joining_member( + member, + make_join_error(old_member_id, error_code::fenced_instance_id)); + } + if (member->is_syncing()) { + member->set_sync_response( + sync_group_response(error_code::fenced_instance_id)); + } + if (is_leader(old_member_id)) { + _leader = member->id(); + } + + auto static_it = _static_members.find(instance_id); + vassert( + static_it != _static_members.end(), + "Static member that is going to be replaced must existing in group " + "static members map. instance_id: {}, group_state: {}", + instance_id, + *this); + static_it->second = new_member_id; + + return member; +} + +group::join_group_stages +group::add_new_dynamic_member(member_id new_member_id, join_group_request&& r) { + vassert( + !r.data.group_instance_id, + "Group instance id must be empty for dynamic members, request: {}", + r.data); // Only return MEMBER_ID_REQUIRED error if joinGroupRequest version // is >= 4 and groupInstanceId is configured to unknown. - if (r.version >= api_version(4) && !r.data.group_instance_id) { + if (r.version >= api_version(4)) { // If member id required (dynamic membership), register the // member in the pending member list and send back a response to // call for another join group request with allocated member id. @@ -548,13 +742,15 @@ group::join_group_known_member(join_group_request&& r) { kafka::member_id new_member_id = std::move(r.data.member_id); return add_member_and_rebalance(std::move(new_member_id), std::move(r)); - } else if (!contains_member(r.data.member_id)) { + } else if (auto ec = validate_existing_member( + r.data.member_id, r.data.group_instance_id, "join"); + ec != error_code::none) { vlog( _ctxlog.trace, - "Join rejected for unregistered member {}", - r.data.member_id); - return join_group_stages( - make_join_error(r.data.member_id, error_code::unknown_member_id)); + "Join rejected for invalid member {} - {}", + r.data.member_id, + ec); + return join_group_stages(make_join_error(r.data.member_id, ec)); } auto member = get_member(r.data.member_id); @@ -796,7 +992,7 @@ void group::complete_join() { // this is the old group->remove_unjoined_members(); const auto prev_leader = _leader; for (auto it = _members.begin(); it != _members.end();) { - if (!it->second->is_joining()) { + if (!it->second->is_joining() && !it->second->is_static()) { vlog(_ctxlog.trace, "Removing unjoined member {}", it->first); // cancel the heartbeat timer @@ -862,14 +1058,10 @@ void group::complete_join() { if (in_state(group_state::empty)) { vlog(_ctxlog.trace, "Checkpointing empty group {}", *this); - auto batch = checkpoint(assignments_type{}); - auto reader = model::make_memory_record_batch_reader( - std::move(batch)); - (void)_partition - ->replicate( - std::move(reader), - raft::replicate_options(raft::consistency_level::quorum_ack)) - .then([]([[maybe_unused]] result r) {}); + ssx::background + = store_group(checkpoint(assignments_type{})) + .then( + []([[maybe_unused]] result r) {}); } else { std::for_each( std::cbegin(_members), @@ -1028,6 +1220,9 @@ void group::remove_member(member_ptr member) { vassert(_num_members_joining >= 0, "negative members joining"); } } + if (it->second->group_instance_id()) { + _static_members.erase(it->second->group_instance_id().value()); + } _members.erase(it); } @@ -1083,13 +1278,15 @@ group::sync_group_stages group::handle_sync_group(sync_group_request&& r) { return sync_group_stages( sync_group_response(error_code::coordinator_not_available)); - } else if (!contains_member(r.data.member_id)) { + } else if (auto ec = validate_existing_member( + r.data.member_id, r.data.group_instance_id, "sync"); + ec != error_code::none) { vlog( _ctxlog.trace, - "Sync rejected for unregistered member {}", - r.data.member_id); - return sync_group_stages( - sync_group_response(error_code::unknown_member_id)); + "Sync rejected for invalid member {} - {}", + r.data.member_id, + ec); + return sync_group_stages(sync_group_response(ec)); } else if (r.data.generation_id != generation()) { vlog( @@ -1155,35 +1352,20 @@ group::sync_group_stages group::handle_sync_group(sync_group_request&& r) { } model::record_batch group::checkpoint(const assignments_type& assignments) { - group_metadata_key key; - key.group_id = _id; - group_metadata_value metadata; - - metadata.protocol_type = protocol_type().value_or(kafka::protocol_type("")); - metadata.generation = generation(); - metadata.protocol = protocol(); - metadata.leader = leader(); - metadata.state_timestamp = _state_timestamp; - - for (const auto& it : _members) { - auto& member = it.second; - auto state = it.second->state().copy(); - // this is not coming from the member itself because the checkpoint - // occurs right before the members go live and get their assignments. - state.assignment = bytes_to_iobuf(assignments.at(member->id())); - state.subscription = bytes_to_iobuf( - it.second->get_protocol_metadata(_protocol.value())); - metadata.members.push_back(std::move(state)); - } - - cluster::simple_batch_builder builder( - model::record_batch_type::raft_data, model::offset(0)); - - auto kv = _md_serializer.to_kv( - group_metadata_kv{.key = std::move(key), .value = std::move(metadata)}); - builder.add_raw_kv(std::move(kv.key), std::move(kv.value)); + return do_checkpoint( + [&assignments](const member_id& id) { return assignments.at(id); }); +} - return std::move(builder).build(); +model::record_batch group::checkpoint() { + return do_checkpoint([this](const member_id& id) { + auto it = _members.find(id); + vassert( + it != _members.end(), + "Checkpointed member {} must be part of the group {}", + id, + *this); + return it->second->assignment(); + }); } group::sync_group_stages group::sync_group_completing_rebalance( @@ -1212,13 +1394,7 @@ group::sync_group_stages group::sync_group_completing_rebalance( auto assignments = std::move(r).member_assignments(); add_missing_assignments(assignments); - auto batch = checkpoint(assignments); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); - - auto f = _partition - ->replicate( - std::move(reader), - raft::replicate_options(raft::consistency_level::quorum_ack)) + auto f = store_group(checkpoint(assignments)) .then([this, response = std::move(response), expected_generation = generation(), @@ -1273,12 +1449,15 @@ ss::future group::handle_heartbeat(heartbeat_request&& r) { vlog(_ctxlog.trace, "Heartbeat rejected for group state {}", _state); return make_heartbeat_error(error_code::coordinator_not_available); - } else if (!contains_member(r.data.member_id)) { + } else if (auto ec = validate_existing_member( + r.data.member_id, r.data.group_instance_id, "heartbeat"); + ec != error_code::none) { vlog( _ctxlog.trace, - "Heartbeat rejected for unregistered member {}", - r.data.member_id); - return make_heartbeat_error(error_code::unknown_member_id); + "Heartbeat rejected for invalid member {} - {}", + r.data.member_id, + ec); + return make_heartbeat_error(ec); } else if (r.data.generation_id != generation()) { vlog( @@ -1321,30 +1500,72 @@ ss::future group::handle_heartbeat(heartbeat_request&& r) { ss::future group::handle_leave_group(leave_group_request&& r) { vlog(_ctxlog.trace, "Handling leave group request {}", r); - if (in_state(group_state::dead)) { vlog(_ctxlog.trace, "Leave rejected for group state {}", _state); return make_leave_error(error_code::coordinator_not_available); + } + /** + * Leave group prior to version 3 were requesting only a single member + * leave. + */ + if (r.version < api_version(3)) { + auto err = member_leave_group(r.data.member_id, std::nullopt); + return make_leave_error(err); + } - } else if (contains_pending_member(r.data.member_id)) { + leave_group_response response(error_code::none); + + response.data.members.reserve(r.data.members.size()); + for (auto& m : r.data.members) { + auto ec = member_leave_group(m.member_id, m.group_instance_id); + response.data.members.push_back(member_response{ + .member_id = m.member_id, + .group_instance_id = m.group_instance_id, + .error_code = ec}); + } + return ss::make_ready_future(std::move(response)); +} + +kafka::error_code group::member_leave_group( + const member_id& member_id, + const std::optional& group_instance_id) { + // The LeaveGroup API allows administrative removal of members by + // GroupInstanceId in which case we expect the MemberId to be + // undefined. + if (member_id == unknown_member_id) { + if (!group_instance_id) { + return error_code::unknown_member_id; + } + + auto it = _static_members.find(*group_instance_id); + if (it == _static_members.end()) { + return error_code::unknown_member_id; + } + // recurse with found mapping + return member_leave_group(it->second, group_instance_id); + } + + if (contains_pending_member(member_id)) { // if a pending member is leaving, it needs to be removed // from the pending list, heartbeat cancelled and if necessary, // prompt a JoinGroup completion. - remove_pending_member(r.data.member_id); - return make_leave_error(error_code::none); + remove_pending_member(member_id); + return error_code::none; - } else if (!contains_member(r.data.member_id)) { + } else if (auto ec = validate_existing_member( + member_id, group_instance_id, "leave"); + ec != error_code::none) { vlog( _ctxlog.trace, - "Leave rejected for unregistered member {}", - r.data.member_id); - return make_leave_error(error_code::unknown_member_id); - + "Leave rejected for invalid member {} - {}", + member_id, + ec); + return ec; } else { - auto member = get_member(r.data.member_id); + auto member = get_member(member_id); member->expire_timer().cancel(); remove_member(member); - return make_leave_error(error_code::none); + return error_code::none; } } @@ -2034,9 +2255,10 @@ group::handle_offset_commit(offset_commit_request&& r) { // The group is only using Kafka to store offsets. return store_offsets(std::move(r)); - } else if (!contains_member(r.data.member_id)) { - return offset_commit_stages( - offset_commit_response(r, error_code::unknown_member_id)); + } else if (auto ec = validate_existing_member( + r.data.member_id, r.data.group_instance_id, "offset-commit"); + ec != error_code::none) { + return offset_commit_stages(offset_commit_response(r, ec)); } else if (r.data.generation_id != generation()) { return offset_commit_stages( @@ -2319,6 +2541,40 @@ group::remove_topic_partitions(const std::vector& tps) { } } +ss::future> +group::store_group(model::record_batch batch) { + return _partition->replicate( + model::make_memory_record_batch_reader(std::move(batch)), + raft::replicate_options(raft::consistency_level::quorum_ack)); +} + +error_code group::validate_existing_member( + const member_id& member_id, + const std::optional& instance_id, + const ss::sstring& ctx) const { + if (instance_id) { + auto it = _static_members.find(*instance_id); + if (it == _static_members.end()) { + return error_code::unknown_member_id; + } else if (it->second != member_id) { + vlog( + _ctxlog.info, + "operation: {} - static member with instance id {} has different " + "member_id assigned, current: {}, requested: {}", + ctx, + *instance_id, + it->second, + member_id); + return error_code::fenced_instance_id; + } + } else { + if (!_members.contains(member_id)) { + return error_code::unknown_member_id; + } + } + return error_code::none; +} + std::ostream& operator<<(std::ostream& o, const group& g) { const auto ntp = [&g] { if (g._partition) { diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index 887356658d04..e31e69dcc3fc 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -13,6 +13,7 @@ #include "cluster/fwd.h" #include "cluster/logger.h" #include "cluster/partition.h" +#include "cluster/simple_batch_builder.h" #include "cluster/tx_utils.h" #include "kafka/group_probe.h" #include "kafka/protocol/fwd.h" @@ -309,6 +310,12 @@ class group { */ ss::future update_member( member_ptr member, std::vector&& new_protocols); + /** + * Same as update_member but without returning the join promise. Used when + * reverting member state after failed group checkpoint + */ + void update_member_no_join( + member_ptr member, std::vector&& new_protocols); /** * \brief Get the timeout duration for rebalancing. @@ -408,6 +415,22 @@ class group { join_group_stages update_member_and_rebalance( member_ptr member, join_group_request&& request); + /// Add new member with dynamic membership + group::join_group_stages + add_new_dynamic_member(member_id, join_group_request&&); + + /// Add new member with static membership + group::join_group_stages + add_new_static_member(member_id, join_group_request&&); + + /// Replaces existing static member with the new one + member_ptr replace_static_member( + const group_instance_id&, const member_id&, const member_id&); + + /// Updates existing static member and initiate rebalance if needed + group::join_group_stages update_static_member_and_rebalance( + member_id, member_id, join_group_request&&); + /// Transition to preparing rebalance if possible. void try_prepare_rebalance(); @@ -445,6 +468,10 @@ class group { ss::future handle_leave_group(leave_group_request&& r); + /// Handle leave group for single member + kafka::error_code member_leave_group( + const member_id&, const std::optional&); + std::optional offset(const model::topic_partition& tp) const { if (auto it = _offsets.find(tp); it != _offsets.end()) { @@ -549,6 +576,14 @@ class group { return _partition; } + ss::future> store_group(model::record_batch); + + // validates state of a member existing in a group + error_code validate_existing_member( + const member_id&, + const std::optional&, + const ss::sstring&) const; + // shutdown group. cancel all pending operations void shutdown(); @@ -574,6 +609,11 @@ class group { log(ss::log_level::error, format, std::forward(args)...); } + template + void warn(const char* format, Args&&... args) const { + log(ss::log_level::warn, format, std::forward(args)...); + } + template void debug(const char* format, Args&&... args) const { log(ss::log_level::debug, format, std::forward(args)...); @@ -632,6 +672,41 @@ class group { } model::record_batch checkpoint(const assignments_type& assignments); + model::record_batch checkpoint(); + + template + model::record_batch do_checkpoint(Func&& assignments_provider) { + kafka::group_metadata_key key; + key.group_id = _id; + kafka::group_metadata_value metadata; + + metadata.protocol_type = protocol_type().value_or( + kafka::protocol_type("")); + metadata.generation = generation(); + metadata.protocol = protocol(); + metadata.leader = leader(); + metadata.state_timestamp = _state_timestamp; + + for (const auto& [id, member] : _members) { + auto state = member->state().copy(); + // this is not coming from the member itself because the checkpoint + // occurs right before the members go live and get their + // assignments. + state.assignment = bytes_to_iobuf(assignments_provider(id)); + state.subscription = bytes_to_iobuf( + member->get_protocol_metadata(_protocol.value())); + metadata.members.push_back(std::move(state)); + } + + cluster::simple_batch_builder builder( + model::record_batch_type::raft_data, model::offset(0)); + + auto kv = _md_serializer.to_kv(group_metadata_kv{ + .key = std::move(key), .value = std::move(metadata)}); + builder.add_raw_kv(std::move(kv.key), std::move(kv.value)); + + return std::move(builder).build(); + } cluster::abort_origin get_abort_origin(const model::producer_identity&, model::tx_seq) const; @@ -642,6 +717,7 @@ class group { kafka::generation_id _generation; protocol_support _supported_protocols; member_map _members; + absl::node_hash_map _static_members; int _num_members_joining; absl::node_hash_map> _pending_members; diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index e8e425adeacc..e28d8cd3c0ae 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -559,12 +559,6 @@ group::join_group_stages group_manager::join_group(join_group_request&& r) { } group::sync_group_stages group_manager::sync_group(sync_group_request&& r) { - if (r.data.group_instance_id) { - vlog(klog.trace, "Static group membership is not supported"); - return group::sync_group_stages( - sync_group_response(error_code::unsupported_version)); - } - auto error = validate_group_status( r.ntp, r.data.group_id, sync_group_api::key); if (error != error_code::none) { @@ -598,11 +592,6 @@ group::sync_group_stages group_manager::sync_group(sync_group_request&& r) { } ss::future group_manager::heartbeat(heartbeat_request&& r) { - if (r.data.group_instance_id) { - vlog(klog.trace, "Static group membership is not supported"); - return make_heartbeat_error(error_code::unsupported_version); - } - auto error = validate_group_status( r.ntp, r.data.group_id, heartbeat_api::key); if (error != error_code::none) { @@ -643,7 +632,24 @@ group_manager::leave_group(leave_group_request&& r) { klog.trace, "Cannot handle leave group request for unknown group {}", r.data.group_id); - return make_leave_error(error_code::unknown_member_id); + if (r.version < api_version(3)) { + return make_leave_error(error_code::unknown_member_id); + } + // since version 3 we need to fill each member error code + leave_group_response response; + response.data.members.reserve(r.data.members.size()); + std::transform( + r.data.members.begin(), + r.data.members.end(), + std::back_inserter(response.data.members), + [](member_identity& mid) { + return member_response{ + .member_id = std::move(mid.member_id), + .group_instance_id = std::move(mid.group_instance_id), + .error_code = error_code::unknown_member_id, + }; + }); + return ss::make_ready_future(std::move(response)); } } diff --git a/src/v/kafka/server/handlers/describe_groups.cc b/src/v/kafka/server/handlers/describe_groups.cc index 91954ab01166..aece947dbb3c 100644 --- a/src/v/kafka/server/handlers/describe_groups.cc +++ b/src/v/kafka/server/handlers/describe_groups.cc @@ -12,6 +12,7 @@ #include "kafka/protocol/errors.h" #include "kafka/server/group_manager.h" #include "kafka/server/group_router.h" +#include "kafka/server/handlers/details/security.h" #include "kafka/server/request_context.h" #include "kafka/server/response.h" #include "model/namespace.h" @@ -21,6 +22,17 @@ #include namespace kafka { +namespace { +ss::future describe_group( + group_id group, bool include_authorized_operations, request_context& ctx) { + auto res = co_await ctx.groups().describe_group(group); + if (include_authorized_operations) { + res.authorized_operations = details::to_bit_field( + details::authorized_operations(ctx, group)); + } + co_return res; +} +} // namespace template<> ss::future @@ -48,8 +60,10 @@ describe_groups_handler::handle(request_context ctx, ss::smp_service_group) { std::vector> described; described.reserve(request.data.groups.size()); for (auto& group_id : request.data.groups) { - described.push_back( - ctx.groups().describe_group(std::move(group_id))); + described.push_back(describe_group( + std::move(group_id), + request.data.include_authorized_operations, + ctx)); } response.data.groups = co_await ss::when_all_succeed( described.begin(), described.end()); diff --git a/src/v/kafka/server/handlers/describe_groups.h b/src/v/kafka/server/handlers/describe_groups.h index f93110d323c5..6f804548b893 100644 --- a/src/v/kafka/server/handlers/describe_groups.h +++ b/src/v/kafka/server/handlers/describe_groups.h @@ -14,6 +14,6 @@ namespace kafka { -using describe_groups_handler = handler; +using describe_groups_handler = handler; } diff --git a/src/v/kafka/server/handlers/join_group.cc b/src/v/kafka/server/handlers/join_group.cc index f1d82a47e443..f7bc53b80a4e 100644 --- a/src/v/kafka/server/handlers/join_group.cc +++ b/src/v/kafka/server/handlers/join_group.cc @@ -40,11 +40,6 @@ process_result_stages join_group_handler::handle( join_group_request request; decode_request(ctx, request); - if (request.data.group_instance_id) { - return process_result_stages::single_stage( - ctx.respond(join_group_response(error_code::unsupported_version))); - } - if (!ctx.authorized(security::acl_operation::read, request.data.group_id)) { return process_result_stages::single_stage(ctx.respond( join_group_response(error_code::group_authorization_failed))); diff --git a/src/v/kafka/server/handlers/join_group.h b/src/v/kafka/server/handlers/join_group.h index 69397afbb14b..1d3ec508350a 100644 --- a/src/v/kafka/server/handlers/join_group.h +++ b/src/v/kafka/server/handlers/join_group.h @@ -17,7 +17,7 @@ namespace kafka { struct join_group_handler { using api = join_group_api; static constexpr api_version min_supported = api_version(0); - static constexpr api_version max_supported = api_version(4); + static constexpr api_version max_supported = api_version(5); static process_result_stages handle(request_context, ss::smp_service_group); }; } // namespace kafka diff --git a/src/v/kafka/server/handlers/leave_group.cc b/src/v/kafka/server/handlers/leave_group.cc index fe290a2dbf40..0f3cba1e5778 100644 --- a/src/v/kafka/server/handlers/leave_group.cc +++ b/src/v/kafka/server/handlers/leave_group.cc @@ -9,10 +9,12 @@ #include "kafka/server/handlers/leave_group.h" +#include "kafka/protocol/schemata/leave_group_request.h" #include "kafka/server/group_manager.h" #include "kafka/server/group_router.h" #include "kafka/server/request_context.h" #include "kafka/server/response.h" +#include "kafka/types.h" #include "utils/remote.h" #include "utils/to_string.h" @@ -21,11 +23,19 @@ namespace kafka { +namespace { +leave_group_request decode_request(request_context& ctx) { + leave_group_request request; + request.decode(ctx.reader(), ctx.header().version); + request.version = ctx.header().version; + return request; +} +} // namespace + template<> ss::future leave_group_handler::handle( request_context ctx, [[maybe_unused]] ss::smp_service_group g) { - leave_group_request request; - request.decode(ctx.reader(), ctx.header().version); + leave_group_request request = decode_request(ctx); if (!ctx.authorized(security::acl_operation::read, request.data.group_id)) { co_return co_await ctx.respond( diff --git a/src/v/kafka/server/handlers/leave_group.h b/src/v/kafka/server/handlers/leave_group.h index 19fcb5dba594..a959c6dc4ddd 100644 --- a/src/v/kafka/server/handlers/leave_group.h +++ b/src/v/kafka/server/handlers/leave_group.h @@ -14,6 +14,6 @@ namespace kafka { -using leave_group_handler = handler; +using leave_group_handler = handler; } diff --git a/src/v/kafka/server/handlers/offset_commit.cc b/src/v/kafka/server/handlers/offset_commit.cc index 8aee22812476..ddaa68616aab 100644 --- a/src/v/kafka/server/handlers/offset_commit.cc +++ b/src/v/kafka/server/handlers/offset_commit.cc @@ -59,11 +59,6 @@ offset_commit_handler::handle(request_context ctx, ss::smp_service_group ssg) { request.decode(ctx.reader(), ctx.header().version); vlog(klog.trace, "Handling request {}", request); - if (request.data.group_instance_id) { - return process_result_stages::single_stage(ctx.respond( - offset_commit_response(request, error_code::unsupported_version))); - } - // check authorization for this group const auto group_authorized = ctx.authorized( security::acl_operation::read, request.data.group_id); diff --git a/src/v/kafka/server/member.h b/src/v/kafka/server/member.h index c4b396ea2504..d54c6a6f6630 100644 --- a/src/v/kafka/server/member.h +++ b/src/v/kafka/server/member.h @@ -82,6 +82,8 @@ class group_member { /// Get the member id. const kafka::member_id& id() const { return _state.id; } + void replace_id(member_id new_id) { _state.id = std::move(new_id); } + /// Get the id of the member's group. const kafka::group_id& group_id() const { return _group_id; } @@ -142,6 +144,9 @@ class group_member { /// Check if the member is syncing. bool is_syncing() const { return bool(_sync_promise); } + /// Check if member is static + bool is_static() const { return bool(_state.instance_id); } + /** * Get the sync response. * diff --git a/src/v/kafka/server/tests/CMakeLists.txt b/src/v/kafka/server/tests/CMakeLists.txt index 0d6f05fcb426..3630ba7e10ff 100644 --- a/src/v/kafka/server/tests/CMakeLists.txt +++ b/src/v/kafka/server/tests/CMakeLists.txt @@ -14,6 +14,7 @@ rp_test( ) set(srcs + consumer_groups_test.cc member_test.cc group_test.cc read_write_roundtrip_test.cc @@ -25,7 +26,6 @@ set(srcs create_topics_test.cc find_coordinator_test.cc list_offsets_test.cc - offset_commit_test.cc topic_recreate_test.cc fetch_session_test.cc alter_config_test.cc diff --git a/src/v/kafka/server/tests/consumer_groups_test.cc b/src/v/kafka/server/tests/consumer_groups_test.cc new file mode 100644 index 000000000000..cb99a9c323bd --- /dev/null +++ b/src/v/kafka/server/tests/consumer_groups_test.cc @@ -0,0 +1,103 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/controller_api.h" +#include "cluster/feature_table.h" +#include "kafka/client/client.h" +#include "kafka/protocol/describe_groups.h" +#include "kafka/protocol/errors.h" +#include "kafka/protocol/find_coordinator.h" +#include "kafka/protocol/join_group.h" +#include "kafka/protocol/schemata/join_group_request.h" +#include "kafka/types.h" +#include "model/fundamental.h" +#include "model/namespace.h" +#include "model/timeout_clock.h" +#include "redpanda/tests/fixture.h" +#include "test_utils/async.h" + +#include +#include + +#include + +using namespace kafka; +join_group_request make_join_group_request( + ss::sstring member_id, + ss::sstring gr, + std::vector protocols, + ss::sstring protocol_type) { + join_group_request req; + req.data.group_id = kafka::group_id(std::move(gr)); + req.data.member_id = kafka::member_id(std::move(member_id)); + req.data.protocol_type = kafka::protocol_type(std::move(protocol_type)); + for (auto& p : protocols) { + req.data.protocols.push_back(join_group_request_protocol{ + .name = protocol_name(std::move(p)), .metadata = bytes{}}); + } + req.data.session_timeout_ms = 10s; + return req; +} +struct consumer_offsets_fixture : public redpanda_thread_fixture { + void wait_for_consumer_offsets_topic() { + app.controller->get_feature_table() + .local() + .await_feature( + cluster::feature::consumer_offsets, + app.controller->get_abort_source().local()) + .get(); + + auto client = make_kafka_client().get0(); + + client.connect().get(); + kafka::find_coordinator_request req("key"); + req.data.key_type = kafka::coordinator_type::group; + client.dispatch(std::move(req), kafka::api_version(1)).get(); + + app.controller->get_api() + .local() + .wait_for_topic( + model::kafka_consumer_offsets_nt, model::timeout_clock::now() + 30s) + .get(); + + tests::cooperative_spin_wait_with_timeout(30s, [&client] { + kafka::describe_groups_request req; + req.data.groups.emplace_back("key"); + return client.dispatch(std::move(req), kafka::api_version(1)) + .then([](kafka::describe_groups_response response) { + return response.data.groups.front().error_code + == kafka::error_code::none; + }); + }).get(); + + client.stop().get(); + client.shutdown(); + } +}; + +FIXTURE_TEST(join_empty_group_static_member, consumer_offsets_fixture) { + wait_for_consumer_offsets_topic(); + auto client = make_kafka_client().get0(); + auto deferred = ss::defer([&client] { + client.stop().then([&client] { client.shutdown(); }).get(); + }); + client.connect().get(); + + auto req = make_join_group_request( + unknown_member_id, "group-test", {"p1", "p2"}, "random"); + // set group instance id + req.data.group_instance_id = kafka::group_instance_id("instance-1"); + auto resp = client.dispatch(std::move(req), kafka::api_version(5)).get0(); + + info("response: {}", resp.data); + + // static we expect coordinator to assign member_id + BOOST_REQUIRE_EQUAL(resp.data.error_code, kafka::error_code::none); + BOOST_REQUIRE_NE(resp.data.member_id, unknown_member_id); +} diff --git a/src/v/kafka/server/tests/group_test.cc b/src/v/kafka/server/tests/group_test.cc index 006d6febd934..5ae58b22faf3 100644 --- a/src/v/kafka/server/tests/group_test.cc +++ b/src/v/kafka/server/tests/group_test.cc @@ -63,7 +63,7 @@ static member_ptr get_group_member( return ss::make_lw_shared( kafka::member_id(id), kafka::group_id("g"), - kafka::group_instance_id("i"), + kafka::group_instance_id(fmt::format("i-{}", id)), kafka::client_id("client-id"), kafka::client_host("client-host"), std::chrono::seconds(1), @@ -167,7 +167,7 @@ SEASTAR_THREAD_TEST_CASE(rebalance_timeout) { auto m0 = ss::make_lw_shared( kafka::member_id("m"), kafka::group_id("g"), - kafka::group_instance_id("i"), + kafka::group_instance_id("i-1"), kafka::client_id("client-id"), kafka::client_host("client-host"), std::chrono::seconds(1), @@ -178,7 +178,7 @@ SEASTAR_THREAD_TEST_CASE(rebalance_timeout) { auto m1 = ss::make_lw_shared( kafka::member_id("n"), kafka::group_id("g"), - kafka::group_instance_id("i"), + kafka::group_instance_id("i-2"), kafka::client_id("client-id"), kafka::client_host("client-host"), std::chrono::seconds(1), @@ -382,7 +382,7 @@ SEASTAR_THREAD_TEST_CASE(supports_protocols) { auto m = ss::make_lw_shared( kafka::member_id("m"), kafka::group_id("g"), - kafka::group_instance_id("i"), + kafka::group_instance_id("i-1"), kafka::client_id("client-id"), kafka::client_host("client-host"), std::chrono::seconds(1), @@ -412,7 +412,7 @@ SEASTAR_THREAD_TEST_CASE(supports_protocols) { auto m2 = ss::make_lw_shared( kafka::member_id("n"), kafka::group_id("g"), - kafka::group_instance_id("i"), + kafka::group_instance_id("i-2"), kafka::client_id("client-id"), kafka::client_host("client-host"), std::chrono::seconds(1), diff --git a/src/v/kafka/server/tests/offset_commit_test.cc b/src/v/kafka/server/tests/offset_commit_test.cc deleted file mode 100644 index 8ac835a49a0c..000000000000 --- a/src/v/kafka/server/tests/offset_commit_test.cc +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2020 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -#include "kafka/protocol/offset_commit.h" -#include "redpanda/tests/fixture.h" -#include "resource_mgmt/io_priority.h" -#include "test_utils/async.h" - -#include - -#include -#include - -FIXTURE_TEST( - offset_commit_static_membership_not_supported, redpanda_thread_fixture) { - auto client = make_kafka_client().get0(); - client.connect().get(); - - kafka::offset_commit_request req; - req.data.group_instance_id = kafka::group_instance_id("g"); - req.data.topics = {{ - .name = model::topic("t"), - .partitions = {{}}, - }}; - - auto resp = client.dispatch(req, kafka::api_version(7)).get0(); - client.stop().then([&client] { client.shutdown(); }).get(); - - BOOST_TEST( - resp.data.topics[0].partitions[0].error_code - == kafka::error_code::unsupported_version); -} diff --git a/src/v/kafka/server/tests/topic_recreate_test.cc b/src/v/kafka/server/tests/topic_recreate_test.cc index 1a0d9835696a..94cb671b8377 100644 --- a/src/v/kafka/server/tests/topic_recreate_test.cc +++ b/src/v/kafka/server/tests/topic_recreate_test.cc @@ -122,7 +122,8 @@ class recreate_test_fixture : public redpanda_thread_fixture { } void restart() { - app.shutdown(); + shutdown(); + app_signal = std::make_unique<::stop_signal>(); ss::smp::invoke_on_all([this] { auto& config = config::shard_local_cfg(); config.get("disable_metrics").set_value(false); @@ -131,8 +132,7 @@ class recreate_test_fixture : public redpanda_thread_fixture { app.check_environment(); app.configure_admin_server(); app.wire_up_services(); - ::stop_signal app_signal; - app.start(app_signal); + app.start(*app_signal); } }; diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index d3a11869828d..c0d3cacfac2d 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -66,7 +66,8 @@ class redpanda_thread_fixture { , proxy_port(proxy_port) , schema_reg_port(schema_reg_port) , data_dir(std::move(base_dir)) - , remove_on_shutdown(remove_on_shutdown) { + , remove_on_shutdown(remove_on_shutdown) + , app_signal(std::make_unique<::stop_signal>()) { configure( node_id, kafka_port, @@ -82,8 +83,7 @@ class redpanda_thread_fixture { app.check_environment(); app.configure_admin_server(); app.wire_up_services(); - ::stop_signal app_signal; - app.start(app_signal); + app.start(*app_signal); // used by request context builder proto = std::make_unique( @@ -137,12 +137,19 @@ class redpanda_thread_fixture { true) {} ~redpanda_thread_fixture() { - app.shutdown(); + shutdown(); if (remove_on_shutdown) { std::filesystem::remove_all(data_dir); } } + void shutdown() { + if (!app_signal->abort_source().abort_requested()) { + app_signal->abort_source().request_abort(); + } + app.shutdown(); + } + config::configuration& lconf() { return config::shard_local_cfg(); } void configure( @@ -431,4 +438,5 @@ class redpanda_thread_fixture { std::filesystem::path data_dir; std::unique_ptr proto; bool remove_on_shutdown; + std::unique_ptr<::stop_signal> app_signal; }; diff --git a/tests/rptest/clients/kafka_cli_tools.py b/tests/rptest/clients/kafka_cli_tools.py index de94392d4346..fe6e6b667195 100644 --- a/tests/rptest/clients/kafka_cli_tools.py +++ b/tests/rptest/clients/kafka_cli_tools.py @@ -34,7 +34,7 @@ class KafkaCliTools: """ # See tests/docker/Dockerfile to add new versions - VERSIONS = ("2.7.0", "2.5.0", "2.4.1", "2.3.1") + VERSIONS = ("3.0.0", "2.7.0", "2.5.0", "2.4.1", "2.3.1") def __init__(self, redpanda, version=None, user=None, passwd=None): self._redpanda = redpanda diff --git a/tests/rptest/clients/rpk.py b/tests/rptest/clients/rpk.py index 6ed71d322800..d84ee0e29976 100644 --- a/tests/rptest/clients/rpk.py +++ b/tests/rptest/clients/rpk.py @@ -72,6 +72,7 @@ class RpkGroupPartition(typing.NamedTuple): log_end_offset: Optional[int] lag: Optional[int] member_id: str + instance_id: str client_id: str host: str @@ -306,12 +307,21 @@ def parse_field(field_name, string): assert m is not None, f"Field string '{string}' does not match the pattern" return m['value'] - partition_pattern = re.compile( - "(?P.+) +(?P\d+) +(?P\d+|-) +(?P\d+|-) +(?P-?\d+|-) *(?P.*)? *(?P.*)? *(?P.*)?" + static_member_pattern = re.compile("^([^\s]+\s+){8}[^\s]+$") + + partition_pattern_static_member = re.compile( + "(?P.+) +(?P\d+) +(?P\d+|-) +(?P\d+|-) +(?P-?\d+|-) *(?P[^\s]*)? *(?P[^\s]*) *(?P[^\s]*)? *(?P[^\s]*)?" + ) + partition_pattern_dynamic_member = re.compile( + "(?P.+) +(?P\d+) +(?P\d+|-) +(?P\d+|-) +(?P-?\d+|-) *(?P[^\s]*)? *(?P[^\s]*)? *(?P[^\s]*)?" ) def parse_partition(string): - m = partition_pattern.match(string) + + pattern = partition_pattern_dynamic_member + if static_member_pattern.match(string): + pattern = partition_pattern_static_member + m = pattern.match(string) # Check to see if info for the partition was queried during a change in leadership. # if it was we'd expect to see a partition string ending in; @@ -319,8 +329,6 @@ def parse_partition(string): if m is None and string.find('NOT_LEADER_FOR_PARTITION') != -1: return None - assert m is not None, f"Partition string '{string}' does not match the pattern" - # Account for negative numbers and '-' value all_digits = lambda x: x.lstrip('-').isdigit() @@ -334,6 +342,8 @@ def parse_partition(string): log_end_offset=log_end, lag=lag, member_id=m['member_id'], + instance_id=m.groupdict().get( + 'instance_id', None), client_id=m['client_id'], host=m['host']) diff --git a/tests/rptest/services/kafka_cli_consumer.py b/tests/rptest/services/kafka_cli_consumer.py index f74f1eb4025b..ee54865a5f26 100644 --- a/tests/rptest/services/kafka_cli_consumer.py +++ b/tests/rptest/services/kafka_cli_consumer.py @@ -25,7 +25,8 @@ def __init__(self, partitions=None, isolation_level=None, from_beginning=False, - consumer_properties={}): + consumer_properties={}, + formatter_properties={}): super(KafkaCliConsumer, self).__init__(context, num_nodes=1) self._redpanda = redpanda self._topic = topic @@ -35,6 +36,7 @@ def __init__(self, self._isolation_level = isolation_level self._from_beginning = from_beginning self._consumer_properties = consumer_properties + self._formatter_properties = formatter_properties self._stopping = threading.Event() assert self._partitions is not None or self._group is not None, "either partitions or group have to be set" @@ -62,6 +64,8 @@ def _worker(self, _, node): cmd += ["--from-beginning"] for k, v in self._consumer_properties.items(): cmd += ['--consumer-property', f"{k}={v}"] + for k, v in self._formatter_properties.items(): + cmd += ['--property', f"{k}={v}"] cmd += ["--bootstrap-server", self._redpanda.brokers()] @@ -87,6 +91,15 @@ def wait_for_messages(self, messages, timeout=30): timeout, backoff_sec=2) + def wait_for_started(self, timeout=10): + def all_started(): + return all([ + len(node.account.java_pids("ConsoleConsumer")) == 1 + for node in self.nodes + ]) + + wait_until(all_started, timeout, backoff_sec=1) + def stop_node(self, node): self._stopping.set() - node.account.kill_process("java", clean_shutdown=False) + node.account.kill_process("java", clean_shutdown=True) diff --git a/tests/rptest/tests/consumer_group_test.py b/tests/rptest/tests/consumer_group_test.py new file mode 100644 index 000000000000..fbe2b847d48d --- /dev/null +++ b/tests/rptest/tests/consumer_group_test.py @@ -0,0 +1,305 @@ +# Copyright 2020 Redpanda Data, Inc. +# Copyright 2020 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from collections import defaultdict +from rptest.services.cluster import cluster + +from rptest.clients.rpk import RpkTool +from rptest.clients.types import TopicSpec +from rptest.services.kafka_cli_consumer import KafkaCliConsumer +from rptest.services.rpk_producer import RpkProducer +from rptest.tests.redpanda_test import RedpandaTest +from ducktape.utils.util import wait_until +from ducktape.mark import parametrize + + +class ConsumerGroupTest(RedpandaTest): + def __init__(self, test_ctx, *args, **kwargs): + self._ctx = test_ctx + self.producer = None + super(ConsumerGroupTest, self).__init__( + test_ctx, + num_brokers=3, + *args, + # disable leader balancer to make sure that group will not be realoaded because of leadership changes + extra_rp_conf={"enable_leader_balancer": False}, + **kwargs) + + def make_consumer_properties(base_properties, instance_id=None): + properties = {} + properties.update(base_properties) + if instance_id: + properties['group.instance.id'] = instance_id + return properties + + def create_consumer(self, + topic, + group, + instance_id=None, + consumer_properties={}): + return KafkaCliConsumer( + self.test_context, + self.redpanda, + topic=topic, + group=group, + from_beginning=True, + formatter_properties={ + 'print.value': 'false', + 'print.key': 'false', + 'print.partition': 'true', + 'print.offset': 'true', + }, + consumer_properties=ConsumerGroupTest.make_consumer_properties( + consumer_properties, instance_id)) + + def create_consumers(self, + consumer_count, + topic, + group, + static_members, + consumer_properties={}): + + consumers = [] + for i in range(0, consumer_count): + instance_id = f"panda-consumer-{i}" if static_members else None + consumers.append( + self.create_consumer(topic, + group=group, + instance_id=instance_id, + consumer_properties=consumer_properties)) + + for c in consumers: + c.start() + return consumers + + def consumed_at_least(consumers, count): + return all([len(c._messages) > count for c in consumers]) + + def validate_group_state(self, group, expected_state, static_members): + rpk = RpkTool(self.redpanda) + # validate group state + rpk_group = rpk.group_describe(group) + + assert rpk_group.members == 2 + assert rpk_group.state == expected_state + + for p in rpk_group.partitions: + if static_members: + assert 'panda-consumer' in p.instance_id + else: + assert p.instance_id is None + + def setup_producer(self, p_cnt): + # create topic + self.topic_spec = TopicSpec(partition_count=p_cnt, + replication_factor=3) + self.client().create_topic(specs=self.topic_spec) + # produce some messages to the topic + self.producer = RpkProducer(self._ctx, self.redpanda, + self.topic_spec.name, 128, 5000, -1) + self.producer.start() + + @cluster(num_nodes=6) + @parametrize(static_members=True) + @parametrize(static_members=False) + def test_basic_group_join(self, static_members): + """ + Test validating that consumers are able to join the group and consume topic + """ + + self.setup_producer(20) + group = 'test-gr-1' + # use 2 consumers + consumers = self.create_consumers(2, + self.topic_spec.name, + group, + static_members=static_members) + + # wait for some messages + wait_until(lambda: ConsumerGroupTest.consumed_at_least(consumers, 50), + 30, 2) + self.validate_group_state(group, + expected_state="Stable", + static_members=static_members) + + self.producer.wait() + self.producer.free() + + for c in consumers: + c.stop() + c.wait() + c.free() + + @cluster(num_nodes=6) + def test_mixed_consumers_join(self): + """ + Test validating that dynamic and static consumers may exists in the same group + """ + self.setup_producer(20) + group = 'test-gr-1' + consumers = [] + consumers.append( + self.create_consumer(self.topic_spec.name, group, + "panda-instance")) + consumers.append( + self.create_consumer(self.topic_spec.name, group, None)) + + for c in consumers: + c.start() + + # wait for some messages + wait_until(lambda: ConsumerGroupTest.consumed_at_least(consumers, 50), + 30, 2) + + rpk = RpkTool(self.redpanda) + # validate group state + rpk_group = rpk.group_describe(group) + + assert rpk_group.members == 2 + assert rpk_group.state == "Stable" + + static_members = set() + dynamic_members = set() + + for p in rpk_group.partitions: + if p.instance_id: + static_members.add(p.client_id) + else: + dynamic_members.add(p.client_id) + + assert len(static_members) == 1 + assert len(dynamic_members) == 1 + + self.producer.wait() + self.producer.free() + + for c in consumers: + c.stop() + c.wait() + c.free() + + def wait_for_members(self, group, members_count): + rpk = RpkTool(self.redpanda) + + def group_stable(): + rpk_group = rpk.group_describe(group) + return rpk_group.members == members_count and rpk_group.state == "Stable" + + return wait_until(group_stable, 30, 2) + + @cluster(num_nodes=6) + @parametrize(static_members=True) + @parametrize(static_members=False) + def test_consumer_rejoin(self, static_members): + """ + Test validating that re-joining static member will not casuse rebalance + """ + self.setup_producer(20) + group = 'test-gr-1' + + consumers = self.create_consumers( + 2, + self.topic_spec.name, + group, + static_members=static_members, + consumer_properties={"session.timeout.ms": 40000}) + + # wait for some messages + wait_until(lambda: ConsumerGroupTest.consumed_at_least(consumers, 50), + 30, 2) + rpk = RpkTool(self.redpanda) + # at this point we have 2 consumers in stable group + self.validate_group_state(group, + expected_state="Stable", + static_members=static_members) + + # stop one of the consumers + consumers[0].stop() + consumers[0].wait() + + rpk_group = rpk.group_describe(group) + if static_members: + # with static members group should still be in stable state + assert rpk_group.state == "Stable" + assert rpk_group.members == 2 + else: + # consumer will request group leave when shutdown gracefully and it is dynamic + self.wait_for_members(group, 1) + + # start the consumer again + consumers[0].start() + consumers[0].wait_for_started() + # wait for consumer to start + if static_members: + # with static members group should be stable immediately as the + # consumer is rejoining with the same instance id + self.validate_group_state(group, + expected_state="Stable", + static_members=static_members) + else: + # group should get back to its original 2 members state + self.wait_for_members(group, 2) + + self.producer.wait() + self.producer.free() + + for c in consumers: + c.stop() + c.wait() + c.free() + + @cluster(num_nodes=6) + @parametrize(static_members=True) + @parametrize(static_members=False) + def test_consumer_is_removed_when_timedout(self, static_members): + """ + Test validating that consumer is evicted if it failed to deliver heartbeat to the broker + """ + self.setup_producer(20) + group = 'test-gr-1' + # using short session timeout to make the test finish faster + consumers = self.create_consumers( + 2, + self.topic_spec.name, + group, + static_members=static_members, + consumer_properties={"session.timeout.ms": 6000}) + + # wait for some messages + wait_until(lambda: ConsumerGroupTest.consumed_at_least(consumers, 50), + 30, 2) + rpk = RpkTool(self.redpanda) + # at this point we have 2 consumers in stable group + self.validate_group_state(group, + expected_state="Stable", + static_members=static_members) + + # stop one of the consumers + consumers[0].stop() + + # wait for rebalance + self.wait_for_members(group, 1) + + # start the consumer again + consumers[0].start() + + # group should get back to its original 2 members state + self.wait_for_members(group, 2) + self.validate_group_state(group, + expected_state="Stable", + static_members=static_members) + + self.producer.wait() + self.producer.free() + + for c in consumers: + c.stop() + c.wait() + c.free()