From ccf8cf99451f0caad4a891891d65d4deaa7a384b Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Tue, 26 Apr 2022 16:21:26 -0700 Subject: [PATCH] wip: new k/groups logger --- src/v/kafka/server/group.cc | 162 +++++++++--------- src/v/kafka/server/group.h | 1 + src/v/kafka/server/group_manager.cc | 38 ++-- src/v/kafka/server/group_recovery_consumer.cc | 8 +- src/v/kafka/server/handlers/fetch.cc | 2 +- src/v/kafka/server/handlers/list_groups.cc | 2 +- src/v/kafka/server/handlers/list_offsets.cc | 2 +- src/v/kafka/server/logger.cc | 1 + src/v/kafka/server/logger.h | 3 + src/v/kafka/server/rm_group_frontend.cc | 4 +- 10 files changed, 115 insertions(+), 108 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 949daa6dd381d..d5eb23e6bfc92 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -62,6 +62,7 @@ group::group( , _recovery_policy( config::shard_local_cfg().rm_violation_recovery_policy.value()) , _ctxlog(klog, *this) + , _ctx_glog(kgrouplog, *this) , _ctx_txlog(cluster::txlog, *this) , _md_serializer(std::move(serializer)) , _enable_group_metrics(group_metrics) {} @@ -81,6 +82,7 @@ group::group( , _recovery_policy( config::shard_local_cfg().rm_violation_recovery_policy.value()) , _ctxlog(klog, *this) + , _ctx_glog(klog, *this) , _ctx_txlog(cluster::txlog, *this) , _md_serializer(std::move(serializer)) , _enable_group_metrics(group_metrics) { @@ -99,7 +101,7 @@ group::group( .name = _protocol.value_or(protocol_name("")), .metadata = iobuf_to_bytes(m.subscription), }}); - vlog(_ctxlog.trace, "Initializing group with member {}", member); + vlog(_ctx_glog.trace, "Initializing group with member {}", member); add_member_no_join(member); } } @@ -152,14 +154,14 @@ group_state group::set_state(group_state s) { _id, _state, s); - vlog(_ctxlog.trace, "Changing state from {} to {}", _state, s); + vlog(_ctx_glog.trace, "Changing state from {} to {}", _state, s); _state_timestamp = model::timestamp::now(); return std::exchange(_state, s); } bool group::supports_protocols(const join_group_request& r) const { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Check protocol support type {} members {} req.type {} req.protos {} " "supported {}", _protocol_type, @@ -201,7 +203,7 @@ void group::add_member_no_join(member_ptr member) { auto res = _members.emplace(member->id(), member); if (!res.second) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Cannot add member with duplicate id {}", member->id()); throw std::runtime_error( @@ -222,7 +224,7 @@ ss::future group::add_member(member_ptr member) { ss::future group::update_member( member_ptr member, std::vector&& new_protocols) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Updating {}joining member {} with protocols {}", member->is_joining() ? "" : "non-", member, @@ -266,7 +268,7 @@ group::duration_type group::rebalance_timeout() const { if (likely(it != _members.end())) { return it->second->rebalance_timeout(); } else { - vlog(_ctxlog.trace, "Cannot compute rebalance timeout for empty group"); + vlog(_ctx_glog.trace, "Cannot compute rebalance timeout for empty group"); throw std::runtime_error("no members in group"); } } @@ -276,7 +278,7 @@ std::vector group::member_metadata() const { in_state(group_state::dead) || in_state(group_state::preparing_rebalance)) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Cannot collect member metadata in group state {}", _state); throw std::runtime_error( @@ -334,7 +336,7 @@ void group::advance_generation() { _protocol = select_protocol(); set_state(group_state::completing_rebalance); } - vlog(_ctxlog.trace, "Advanced generation with protocol {}", _protocol); + vlog(_ctx_glog.trace, "Advanced generation with protocol {}", _protocol); } /* @@ -355,7 +357,7 @@ kafka::protocol_name group::select_protocol() const { } }); - vlog(_ctxlog.trace, "Selecting protocol from candidates {}", candidates); + vlog(_ctx_glog.trace, "Selecting protocol from candidates {}", candidates); // collect votes from members protocol_support votes; @@ -366,7 +368,7 @@ kafka::protocol_name group::select_protocol() const { auto& choice = m.second->vote_for_protocol(candidates); auto total = ++votes[choice]; vlog( - _ctxlog.trace, + _ctx_glog.trace, "Member {} voting for protocol {} (total {})", m.first, choice, @@ -385,7 +387,7 @@ kafka::protocol_name group::select_protocol() const { // this is guaranteed to succeed because `member->vote` will throw if it // is unable to vote on some protocol candidate. - vlog(_ctxlog.trace, "Selected protocol {}", winner->first); + vlog(_ctx_glog.trace, "Selected protocol {}", winner->first); return winner->first; } @@ -398,7 +400,7 @@ void group::finish_syncing_members(error_code error) { if (member->is_syncing()) { auto reply = sync_group_response(error, member->assignment()); vlog( - _ctxlog.trace, + _ctx_glog.trace, "Completed syncing member {} with reply {}", member, reply); @@ -421,7 +423,7 @@ bool group::leader_rejoined() { auto leader = get_member(*_leader); if (leader->is_joining()) { - vlog(_ctxlog.trace, "Leader {} has rejoined", *_leader); + vlog(_ctx_glog.trace, "Leader {} has rejoined", *_leader); return true; } @@ -434,11 +436,11 @@ bool group::leader_rejoined() { }); if (it == _members.end()) { - vlog(_ctxlog.trace, "No replacement leader is available"); + vlog(_ctx_glog.trace, "No replacement leader is available"); return false; } else { _leader = it->first; - vlog(_ctxlog.trace, "Selected new leader {}", *_leader); + vlog(_ctx_glog.trace, "Selected new leader {}", *_leader); return true; } } @@ -446,7 +448,7 @@ bool group::leader_rejoined() { ss::future group::handle_join_group(join_group_request&& r, bool is_new_group) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Handling join request {} for {} group {}", r, (is_new_group ? "new" : "existing"), @@ -484,7 +486,7 @@ group::handle_join_group(join_group_request&& r, bool is_new_group) { * handles that before returning. */ if (all_members_joined()) { - vlog(_ctxlog.trace, "Finishing join with all members present"); + vlog(_ctx_glog.trace, "Finishing join with all members present"); _join_timer.cancel(); complete_join(); } @@ -497,7 +499,7 @@ group::handle_join_group(join_group_request&& r, bool is_new_group) { ss::future group::join_group_unknown_member(join_group_request&& r) { if (in_state(group_state::dead)) { - vlog(_ctxlog.trace, "Join rejected in state {}", _state); + vlog(_ctx_glog.trace, "Join rejected in state {}", _state); return make_join_error( unknown_member_id, error_code::coordinator_not_available); @@ -516,7 +518,7 @@ group::join_group_unknown_member(join_group_request&& r) { // call for another join group request with allocated member id. // vlog( - _ctxlog.trace, + _ctx_glog.trace, "Requesting rejoin for unknown member with new id {}", new_member_id); add_pending_member(new_member_id, r.data.session_timeout_ms); @@ -530,7 +532,7 @@ ss::future group::join_group_known_member(join_group_request&& r) { if (in_state(group_state::dead)) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Join rejected in state {} for {}", _state, r.data.member_id); @@ -547,7 +549,7 @@ group::join_group_known_member(join_group_request&& r) { } else if (!contains_member(r.data.member_id)) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Join rejected for unregistered member {}", r.data.member_id); return make_join_error(r.data.member_id, error_code::unknown_member_id); @@ -556,7 +558,7 @@ group::join_group_known_member(join_group_request&& r) { auto member = get_member(r.data.member_id); vlog( - _ctxlog.trace, + _ctx_glog.trace, "Handling join for {}leader member {}", is_leader(member->id()) ? "" : "non-", member); @@ -587,7 +589,7 @@ group::join_group_known_member(join_group_request&& r) { std::move(members)); vlog( - _ctxlog.trace, + _ctx_glog.trace, "Resending join response for member {} reply {}", member->id(), response); @@ -597,7 +599,7 @@ group::join_group_known_member(join_group_request&& r) { } else { // member has changed metadata, so force a rebalance - vlog(_ctxlog.trace, "Rebalancing due to protocol change"); + vlog(_ctx_glog.trace, "Rebalancing due to protocol change"); return update_member_and_rebalance(member, std::move(r)); } @@ -610,7 +612,7 @@ group::join_group_known_member(join_group_request&& r) { // trigger rebalances for changes affecting assignment which do not // affect the member metadata (such as topic metadata changes for // the consumer) - vlog(_ctxlog.trace, "Rebalancing due to leader or protocol change"); + vlog(_ctx_glog.trace, "Rebalancing due to leader or protocol change"); return update_member_and_rebalance(member, std::move(r)); } else { @@ -624,7 +626,7 @@ group::join_group_known_member(join_group_request&& r) { leader().value_or(member_id("")), std::move(r.data.member_id)); - vlog(_ctxlog.trace, "Handling idemponent group join {}", response); + vlog(_ctx_glog.trace, "Handling idemponent group join {}", response); return ss::make_ready_future( std::move(response)); @@ -692,7 +694,7 @@ ss::future group::add_member_and_rebalance( member->expire_timer().arm(deadline); vlog( - _ctxlog.trace, + _ctx_glog.trace, "Added member {} with join timeout {} ms to group {}", member, _conf.group_new_member_join_timeout(), @@ -712,7 +714,7 @@ group::update_member_and_rebalance(member_ptr member, join_group_request&& r) { void group::try_prepare_rebalance() { if (!valid_previous_state(group_state::preparing_rebalance)) { - vlog(_ctxlog.trace, "Cannot prepare rebalance in state {}", _state); + vlog(_ctx_glog.trace, "Cannot prepare rebalance in state {}", _state); return; } @@ -742,7 +744,7 @@ void group::try_prepare_rebalance() { remaining = std::max( remaining - prev_delay, duration_type(0)); vlog( - _ctxlog.trace, + _ctx_glog.trace, "Scheduling debounce join timer for {} ms remaining {} ms", delay, remaining); @@ -753,20 +755,20 @@ void group::try_prepare_rebalance() { }); vlog( - _ctxlog.trace, + _ctx_glog.trace, "Scheduling initial debounce join timer for {} ms", initial); _join_timer.arm(initial); } else if (all_members_joined()) { - vlog(_ctxlog.trace, "All members have joined"); + vlog(_ctx_glog.trace, "All members have joined"); complete_join(); } else { auto timeout = rebalance_timeout(); vlog( - _ctxlog.trace, + _ctx_glog.trace, "Join completion scheduled in {} ms. Current members {} waiting {} " "pending {}", timeout, @@ -786,14 +788,14 @@ void group::try_prepare_rebalance() { * is added. */ void group::complete_join() { - vlog(_ctxlog.trace, "Completing join for group {}", *this); + vlog(_ctx_glog.trace, "Completing join for group {}", *this); // remove dynamic members who haven't joined the group yet // this is the old group->remove_unjoined_members(); const auto prev_leader = _leader; for (auto it = _members.begin(); it != _members.end();) { if (!it->second->is_joining()) { - vlog(_ctxlog.trace, "Removing unjoined member {}", it->first); + vlog(_ctx_glog.trace, "Removing unjoined member {}", it->first); // cancel the heartbeat timer it->second->expire_timer().cancel(); @@ -827,14 +829,14 @@ void group::complete_join() { if (_leader != prev_leader) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Leadership changed to {} from {}", _leader, prev_leader); } if (in_state(group_state::dead)) { - vlog(_ctxlog.trace, "Cancelling join completion in state {}", _state); + vlog(_ctx_glog.trace, "Cancelling join completion in state {}", _state); } else if (!leader_rejoined() && has_members()) { // If all members are not rejoining, we will postpone the @@ -846,7 +848,7 @@ void group::complete_join() { // the initial delayed callback which implements debouncing. auto timeout = rebalance_timeout(); vlog( - _ctxlog.trace, + _ctx_glog.trace, "No members re-joined. Scheduling completion for {} ms", timeout); _join_timer.cancel(); @@ -857,7 +859,7 @@ void group::complete_join() { advance_generation(); if (in_state(group_state::empty)) { - vlog(_ctxlog.trace, "Checkpointing empty group {}", *this); + vlog(_ctx_glog.trace, "Checkpointing empty group {}", *this); auto batch = checkpoint(assignments_type{}); auto reader = model::make_memory_record_batch_reader( std::move(batch)); @@ -889,7 +891,7 @@ void group::complete_join() { std::move(md)); vlog( - _ctxlog.trace, + _ctx_glog.trace, "Completing join for {} with reply {}", member->id(), reply); @@ -906,20 +908,20 @@ void group::heartbeat_expire( kafka::member_id member_id, clock_type::time_point deadline) { if (in_state(group_state::dead)) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Ignoring heartbeat expiration for group state {}", _state); } else if (contains_pending_member(member_id)) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Handling expired heartbeat for pending member {}", member_id); remove_pending_member(member_id); } else if (!contains_member(member_id)) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Ignoring heartbeat expiration for unregistered member {}", member_id); @@ -928,7 +930,7 @@ void group::heartbeat_expire( const auto keep_alive = member->should_keep_alive( deadline, _conf.group_new_member_join_timeout()); vlog( - _ctxlog.trace, + _ctx_glog.trace, "Heartbeat expired for keep_alive={} member {}", keep_alive, member_id); @@ -942,7 +944,7 @@ void group::try_finish_joining_member( member_ptr member, join_group_response&& response) { if (member->is_joining()) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Finishing joining member {} with reply {}", member->id(), response); @@ -962,7 +964,7 @@ void group::schedule_next_heartbeat_expiration(member_ptr member) { heartbeat_expire(member_id, deadline); }); vlog( - _ctxlog.trace, + _ctx_glog.trace, "Scheduling heartbeat expiration {} ms for {}", member->session_timeout(), member->id()); @@ -971,7 +973,7 @@ void group::schedule_next_heartbeat_expiration(member_ptr member) { void group::remove_pending_member(kafka::member_id member_id) { _pending_members.erase(member_id); - vlog(_ctxlog.trace, "Removing pending member {}", member_id); + vlog(_ctx_glog.trace, "Removing pending member {}", member_id); if (in_state(group_state::preparing_rebalance)) { if (_join_timer.armed() && all_members_joined()) { _join_timer.cancel(); @@ -1001,7 +1003,7 @@ void group::shutdown() { } void group::remove_member(member_ptr member) { - vlog(_ctxlog.trace, "Removing member {}", member->id()); + vlog(_ctx_glog.trace, "Removing member {}", member->id()); // New members may timeout with a pending JoinGroup while the group // is still rebalancing, so we have to invoke the callback before removing @@ -1036,7 +1038,7 @@ void group::remove_member(member_ptr member) { } if (_leader != prev_leader) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Leadership changed to {} from {}", _leader, prev_leader); @@ -1073,22 +1075,22 @@ void group::remove_member(member_ptr member) { ss::future group::handle_sync_group(sync_group_request&& r) { - vlog(_ctxlog.trace, "Handling sync group request {}", r); + vlog(_ctx_glog.trace, "Handling sync group request {}", r); if (in_state(group_state::dead)) { - vlog(_ctxlog.trace, "Sync rejected for group state {}", _state); + vlog(_ctx_glog.trace, "Sync rejected for group state {}", _state); return make_sync_error(error_code::coordinator_not_available); } else if (!contains_member(r.data.member_id)) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Sync rejected for unregistered member {}", r.data.member_id); return make_sync_error(error_code::unknown_member_id); } else if (r.data.generation_id != generation()) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Sync rejected with out-of-date generation {} != {}", r.data.generation_id, generation()); @@ -1112,11 +1114,11 @@ group::handle_sync_group(sync_group_request&& r) { // and returns the assignment for itself. switch (state()) { case group_state::empty: - vlog(_ctxlog.trace, "Sync rejected for group state {}", _state); + vlog(_ctx_glog.trace, "Sync rejected for group state {}", _state); return make_sync_error(error_code::unknown_member_id); case group_state::preparing_rebalance: - vlog(_ctxlog.trace, "Sync rejected for group state {}", _state); + vlog(_ctx_glog.trace, "Sync rejected for group state {}", _state); return make_sync_error(error_code::rebalance_in_progress); case group_state::completing_rebalance: { @@ -1131,7 +1133,7 @@ group::handle_sync_group(sync_group_request&& r) { schedule_next_heartbeat_expiration(member); sync_group_response reply(error_code::none, member->assignment()); vlog( - _ctxlog.trace, + _ctx_glog.trace, "Handling idemponent group sync for member {} with reply {}", member, reply); @@ -1189,13 +1191,13 @@ ss::future group::sync_group_completing_rebalance( // wait for the leader to show up and fulfill the promise if (!is_leader(r.data.member_id)) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Non-leader member waiting for assignment {}", member->id()); return response; } - vlog(_ctxlog.trace, "Completing group sync with leader {}", member->id()); + vlog(_ctx_glog.trace, "Completing group sync with leader {}", member->id()); // construct a member assignment structure that will be persisted to the // underlying metadata topic for group recovery. the mapping is the @@ -1223,7 +1225,7 @@ ss::future group::sync_group_completing_rebalance( if ( !in_state(group_state::completing_rebalance) || expected_generation != generation()) { - vlog(_ctxlog.trace, "Group state changed while completing sync"); + vlog(_ctx_glog.trace, "Group state changed while completing sync"); return std::move(response); } @@ -1235,10 +1237,10 @@ ss::future group::sync_group_completing_rebalance( set_assignments(std::move(assignments)); finish_syncing_members(error_code::none); set_state(group_state::stable); - vlog(_ctxlog.trace, "Successfully completed group sync"); + vlog(_ctx_glog.trace, "Successfully completed group sync"); } else { vlog( - _ctxlog.trace, + _ctx_glog.trace, "An error occurred completing group sync {}", r.error()); // an error was encountered persisting the group state: @@ -1254,22 +1256,22 @@ ss::future group::sync_group_completing_rebalance( } ss::future group::handle_heartbeat(heartbeat_request&& r) { - vlog(_ctxlog.trace, "Handling heartbeat request {}", r); + vlog(_ctx_glog.trace, "Handling heartbeat request {}", r); if (in_state(group_state::dead)) { - vlog(_ctxlog.trace, "Heartbeat rejected for group state {}", _state); + vlog(_ctx_glog.trace, "Heartbeat rejected for group state {}", _state); return make_heartbeat_error(error_code::coordinator_not_available); } else if (!contains_member(r.data.member_id)) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Heartbeat rejected for unregistered member {}", r.data.member_id); return make_heartbeat_error(error_code::unknown_member_id); } else if (r.data.generation_id != generation()) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Heartbeat rejected with out-of-date generation {} != {}", r.data.generation_id, generation()); @@ -1278,11 +1280,11 @@ ss::future group::handle_heartbeat(heartbeat_request&& r) { switch (state()) { case group_state::empty: - vlog(_ctxlog.trace, "Heartbeat rejected for group state {}", _state); + vlog(_ctx_glog.trace, "Heartbeat rejected for group state {}", _state); return make_heartbeat_error(error_code::unknown_member_id); case group_state::completing_rebalance: - vlog(_ctxlog.trace, "Heartbeat rejected for group state {}", _state); + vlog(_ctx_glog.trace, "Heartbeat rejected for group state {}", _state); return make_heartbeat_error(error_code::rebalance_in_progress); case group_state::preparing_rebalance: { @@ -1307,10 +1309,10 @@ ss::future group::handle_heartbeat(heartbeat_request&& r) { ss::future group::handle_leave_group(leave_group_request&& r) { - vlog(_ctxlog.trace, "Handling leave group request {}", r); + vlog(_ctx_glog.trace, "Handling leave group request {}", r); if (in_state(group_state::dead)) { - vlog(_ctxlog.trace, "Leave rejected for group state {}", _state); + vlog(_ctx_glog.trace, "Leave rejected for group state {}", _state); return make_leave_error(error_code::coordinator_not_available); } else if (contains_pending_member(r.data.member_id)) { @@ -1322,7 +1324,7 @@ group::handle_leave_group(leave_group_request&& r) { } else if (!contains_member(r.data.member_id)) { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Leave rejected for unregistered member {}", r.data.member_id); return make_leave_error(error_code::unknown_member_id); @@ -1863,7 +1865,7 @@ group::offset_commit_stages group::store_offsets(offset_commit_request&& r) { auto error = error_code::none; if (!r) { vlog( - _ctxlog.info, + _ctx_glog.info, "Storing committed offset failed - {}", r.error().message()); error = map_store_offset_error_code(r.error()); @@ -2207,20 +2209,20 @@ ss::future group::remove() { raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { vlog( - klog.trace, + kgrouplog.trace, "Replicated group delete record {} at offset {}", _id, result.value().last_offset); } else { vlog( - klog.error, + kgrouplog.error, "Error occured replicating group {} delete records {}", _id, result.error()); } } catch (const std::exception& e) { vlog( - klog.error, + kgrouplog.error, "Exception occured replicating group {} delete records {}", _id, e); @@ -2247,7 +2249,7 @@ group::remove_topic_partitions(const std::vector& tps) { in_state(group_state::empty) && _pending_offset_commits.empty() && _offsets.empty()) { vlog( - klog.debug, + kgrouplog.debug, "Marking group {} as dead at {} generation", _id, generation()); @@ -2269,7 +2271,7 @@ group::remove_topic_partitions(const std::vector& tps) { // create deletion records for offsets from deleted partitions for (auto& offset : removed) { vlog( - klog.trace, "Removing offset for group {} tp {}", _id, offset.first); + kgrouplog.trace, "Removing offset for group {} tp {}", _id, offset.first); add_offset_tombstone_record(_id, offset.first, _md_serializer, builder); } @@ -2287,20 +2289,20 @@ group::remove_topic_partitions(const std::vector& tps) { raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { vlog( - klog.trace, + kgrouplog.trace, "Replicated group cleanup record {} at offset {}", _id, result.value().last_offset); } else { vlog( - klog.error, + kgrouplog.error, "Error occured replicating group {} cleanup records {}", _id, result.error()); } } catch (const std::exception& e) { vlog( - klog.error, + kgrouplog.error, "Exception occured replicating group {} cleanup records {}", _id, e); @@ -2381,7 +2383,7 @@ void group::add_pending_member( const kafka::member_id& member_id, duration_type timeout) { auto res = _pending_members.try_emplace(member_id, [this, member_id] { vlog( - _ctxlog.trace, + _ctx_glog.trace, "Handling expired heartbeat for pending member {}", member_id); remove_pending_member(member_id); @@ -2393,7 +2395,7 @@ void group::add_pending_member( } vlog( - _ctxlog.trace, + _ctx_glog.trace, "Scheduling heartbeat expiration {} ms for pending {}", timeout, member_id); diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index daf8281bf9b73..0c9c93308c941 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -652,6 +652,7 @@ class group { _offsets; model::violation_recovery_policy _recovery_policy; ctx_log _ctxlog; + ctx_log _ctx_glog; ctx_log _ctx_txlog; group_metadata_serializer _md_serializer; diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 982da739a7e66..fcde94ae15520 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -130,7 +130,7 @@ ss::future<> group_manager::stop() { } void group_manager::detach_partition(const model::ntp& ntp) { - klog.debug("detaching group metadata partition {}", ntp); + kgrouplog.debug("detaching group metadata partition {}", ntp); ssx::spawn_with_gate(_gate, [this, _ntp{ntp}]() -> ss::future<> { auto ntp(_ntp); auto it = _partitions.find(ntp); @@ -154,7 +154,7 @@ void group_manager::detach_partition(const model::ntp& ntp) { } void group_manager::attach_partition(ss::lw_shared_ptr p) { - klog.debug("attaching group metadata partition {}", p->ntp()); + kgrouplog.debug("attaching group metadata partition {}", p->ntp()); auto attached = ss::make_lw_shared(p); auto res = _partitions.try_emplace(p->ntp(), attached); // TODO: this is not a forever assertion. this should just generally never @@ -191,7 +191,7 @@ ss::future<> group_manager::cleanup_removed_topic_partitions( if (it->second != g) { return ss::now(); } - vlog(klog.trace, "Removed group {}", g); + vlog(kgrouplog.trace, "Removed group {}", g); _groups.erase(it); _groups.rehash(0); return ss::now(); @@ -228,7 +228,7 @@ void group_manager::handle_topic_delta( }); }) .handle_exception([](std::exception_ptr e) { - vlog(klog.warn, "Topic clean-up encountered error: {}", e); + vlog(kgrouplog.warn, "Topic clean-up encountered error: {}", e); }); } @@ -402,7 +402,7 @@ ss::future<> group_manager::recover_partition( if (group_stm.has_data()) { auto group = get_group(group_id); vlog( - klog.info, + kgrouplog.info, "Recovering {} - {}", group_id, group_stm.get_metadata()); @@ -468,7 +468,7 @@ ss::future<> group_manager::recover_partition( for (auto& [group_id, group_stm] : ctx.groups) { if (group_stm.is_removed()) { if (_groups.contains(group_id) && group_stm.offsets().size() > 0) { - klog.warn( + kgrouplog.warn( "Unexpected active group unload {} loading {}", group_id, p->partition->ntp()); @@ -493,7 +493,7 @@ group_manager::join_group(join_group_request&& r) { r.data.session_timeout_ms < _conf.group_min_session_timeout_ms() || r.data.session_timeout_ms > _conf.group_max_session_timeout_ms()) { vlog( - klog.trace, + kgrouplog.trace, "Join group {} rejected for invalid session timeout {} valid range " "[{},{}]. Request {}", r.data.group_id, @@ -513,7 +513,7 @@ group_manager::join_group(join_group_request&& r) { // not exist we should reject the request. if (r.data.member_id != unknown_member_id) { vlog( - klog.trace, + kgrouplog.trace, "Join group {} rejected for known member {} joining unknown " "group. Request {}", r.data.group_id, @@ -529,7 +529,7 @@ group_manager::join_group(join_group_request&& r) { // gone. this is generally not going to be a scenario that can // happen until we have rebalancing / partition deletion feature. vlog( - klog.trace, + kgrouplog.trace, "Join group {} rejected for unavailable ntp {}", r.data.group_id, r.ntp); @@ -548,7 +548,7 @@ group_manager::join_group(join_group_request&& r) { _groups.emplace(r.data.group_id, group); _groups.rehash(0); is_new_group = true; - vlog(klog.trace, "Created new group {} while joining", r.data.group_id); + vlog(kgrouplog.trace, "Created new group {} while joining", r.data.group_id); } return group->handle_join_group(std::move(r), is_new_group) @@ -558,7 +558,7 @@ group_manager::join_group(join_group_request&& r) { ss::future group_manager::sync_group(sync_group_request&& r) { if (r.data.group_instance_id) { - vlog(klog.trace, "Static group membership is not supported"); + vlog(kgrouplog.trace, "Static group membership is not supported"); return make_sync_error(error_code::unsupported_version); } @@ -583,7 +583,7 @@ group_manager::sync_group(sync_group_request&& r) { return group->handle_sync_group(std::move(r)).finally([group] {}); } else { vlog( - klog.trace, + kgrouplog.trace, "Cannot handle sync group request for unknown group {}", r.data.group_id); return make_sync_error(error_code::unknown_member_id); @@ -592,7 +592,7 @@ group_manager::sync_group(sync_group_request&& r) { ss::future group_manager::heartbeat(heartbeat_request&& r) { if (r.data.group_instance_id) { - vlog(klog.trace, "Static group membership is not supported"); + vlog(kgrouplog.trace, "Static group membership is not supported"); return make_heartbeat_error(error_code::unsupported_version); } @@ -613,7 +613,7 @@ ss::future group_manager::heartbeat(heartbeat_request&& r) { } vlog( - klog.trace, + kgrouplog.trace, "Cannot handle heartbeat request for unknown group {}", r.data.group_id); @@ -633,7 +633,7 @@ group_manager::leave_group(leave_group_request&& r) { return group->handle_leave_group(std::move(r)).finally([group] {}); } else { vlog( - klog.trace, + kgrouplog.trace, "Cannot handle leave group request for unknown group {}", r.data.group_id); return make_leave_error(error_code::unknown_member_id); @@ -1009,14 +1009,14 @@ error_code group_manager::validate_group_status( const model::ntp& ntp, const group_id& group, api_key api) { if (!valid_group_id(group, api)) { vlog( - klog.trace, "Group name {} is invalid for operation {}", group, api); + kgrouplog.trace, "Group name {} is invalid for operation {}", group, api); return error_code::invalid_group_id; } if (const auto it = _partitions.find(ntp); it != _partitions.end()) { if (!it->second->partition->is_leader()) { vlog( - klog.trace, + kgrouplog.trace, "Group {} operation {} sent to non-leader coordinator {}", group, api, @@ -1026,7 +1026,7 @@ error_code group_manager::validate_group_status( if (it->second->loading) { vlog( - klog.trace, + kgrouplog.trace, "Group {} operation {} sent to loading coordinator {}", group, api, @@ -1051,7 +1051,7 @@ error_code group_manager::validate_group_status( } vlog( - klog.trace, + kgrouplog.trace, "Group {} operation {} misdirected to non-coordinator {}", group, api, diff --git a/src/v/kafka/server/group_recovery_consumer.cc b/src/v/kafka/server/group_recovery_consumer.cc index 0852dbef8dc57..330aa32684d20 100644 --- a/src/v/kafka/server/group_recovery_consumer.cc +++ b/src/v/kafka/server/group_recovery_consumer.cc @@ -116,7 +116,7 @@ group_recovery_consumer::operator()(model::record_batch batch) { group_it->second.try_set_fence(cmd.pid.get_id(), cmd.pid.get_epoch()); co_return ss::stop_iteration::no; } else { - vlog(klog.trace, "ignoring batch with type {}", batch.header().type); + vlog(kgrouplog.trace, "ignoring batch with type {}", batch.header().type); co_return ss::stop_iteration::no; } } @@ -140,14 +140,14 @@ void group_recovery_consumer::handle_record(model::record r) { __builtin_unreachable(); } catch (...) { vlog( - klog.error, + kgrouplog.error, "error handling record record - {}", std::current_exception()); } } void group_recovery_consumer::handle_group_metadata(group_metadata_kv md) { - vlog(klog.trace, "Recovering group metadata {}", md.key.group_id); + vlog(kgrouplog.trace, "Recovering group metadata {}", md.key.group_id); if (md.value) { // until we switch over to a compacted topic or use raft snapshots, @@ -166,7 +166,7 @@ void group_recovery_consumer::handle_offset_metadata(offset_metadata_kv md) { model::topic_partition tp(md.key.topic, md.key.partition); if (md.value) { vlog( - klog.trace, + kgrouplog.trace, "Recovering offset {}/{} with metadata {}", md.key.topic, md.key.partition, diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index eac1b7a2f95a2..46d303d52902f 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -302,7 +302,7 @@ static ss::future> fetch_ntps_in_parallel( if (total_max_bytes > max_bytes_per_fetch) { auto per_partition = max_bytes_per_fetch / ntp_fetch_configs.size(); vlog( - klog.debug, + klog.warn, "Fetch requested very large response ({}), clamping each partition's " "max_bytes to {} bytes", total_max_bytes, diff --git a/src/v/kafka/server/handlers/list_groups.cc b/src/v/kafka/server/handlers/list_groups.cc index ed1dbc87658cb..c20b72e6e79ef 100644 --- a/src/v/kafka/server/handlers/list_groups.cc +++ b/src/v/kafka/server/handlers/list_groups.cc @@ -24,7 +24,7 @@ ss::future list_groups_handler::handle( request_context ctx, [[maybe_unused]] ss::smp_service_group g) { list_groups_request request{}; request.decode(ctx.reader(), ctx.header().version); - vlog(klog.trace, "Handling request {}", request); + vlog(kgrouplog.trace, "Handling request {}", request); auto&& [error, groups] = co_await ctx.groups().list_groups(); diff --git a/src/v/kafka/server/handlers/list_offsets.cc b/src/v/kafka/server/handlers/list_offsets.cc index bff48d3df4c1a..04ba5612f9728 100644 --- a/src/v/kafka/server/handlers/list_offsets.cc +++ b/src/v/kafka/server/handlers/list_offsets.cc @@ -247,7 +247,7 @@ list_offsets_handler::handle(request_context ctx, ss::smp_service_group ssg) { list_offsets_request request; request.decode(ctx.reader(), ctx.header().version); request.compute_duplicate_topics(); - vlog(klog.trace, "Handling request {}", request); + vlog(kgrouplog.trace, "Handling request {}", request); auto unauthorized_it = std::partition( request.data.topics.begin(), diff --git a/src/v/kafka/server/logger.cc b/src/v/kafka/server/logger.cc index 1713e23d2ea69..3f9b8947a116f 100644 --- a/src/v/kafka/server/logger.cc +++ b/src/v/kafka/server/logger.cc @@ -11,4 +11,5 @@ namespace kafka { ss::logger klog("kafka"); +ss::logger kgrouplog("k/groups"); } // namespace kafka diff --git a/src/v/kafka/server/logger.h b/src/v/kafka/server/logger.h index 94126cff35c23..fbce4091fc944 100644 --- a/src/v/kafka/server/logger.h +++ b/src/v/kafka/server/logger.h @@ -16,5 +16,8 @@ #include namespace kafka { +// logger for kafka requests other than group membership extern ss::logger klog; +// the logger for group membership +extern ss::logger kgrouplog; } // namespace kafka diff --git a/src/v/kafka/server/rm_group_frontend.cc b/src/v/kafka/server/rm_group_frontend.cc index 9cd74b8a10a10..01a873f0995cd 100644 --- a/src/v/kafka/server/rm_group_frontend.cc +++ b/src/v/kafka/server/rm_group_frontend.cc @@ -58,7 +58,7 @@ ss::future try_create_consumer_group_topic( vassert(res.size() == 1, "expected exactly one result"); if (res[0].ec != cluster::errc::success) { vlog( - klog.warn, + kgrouplog.warn, "can not create {}/{} topic - error: {}", mapper.ns()(), mapper.topic()(), @@ -69,7 +69,7 @@ ss::future try_create_consumer_group_topic( }) .handle_exception([&mapper](const std::exception_ptr& e) { vlog( - klog.warn, + kgrouplog.warn, "can not create {}/{} topic - exception: {}", mapper.ns()(), mapper.topic()(),