diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index d0fcd5cee0ac..d0554f417978 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -2669,4 +2669,15 @@ void group::add_pending_member( res.first->second.arm(timeout); } +std::ostream& operator<<(std::ostream& o, const group::offset_metadata& md) { + fmt::print( + o, + "{log_offset:{}, offset:{}, metadata:{}, committed_leader_epoch:{}}", + md.log_offset, + md.offset, + md.metadata, + md.committed_leader_epoch); + return o; +} + } // namespace kafka diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index e31e69dcc3fc..7f8d75ef1360 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -150,6 +150,8 @@ class group { model::offset offset; ss::sstring metadata; kafka::leader_epoch committed_leader_epoch; + + friend std::ostream& operator<<(std::ostream&, const offset_metadata&); }; struct offset_metadata_with_probe { diff --git a/src/v/kafka/server/group_metadata_migration.cc b/src/v/kafka/server/group_metadata_migration.cc index a44ef5456667..0b240d395523 100644 --- a/src/v/kafka/server/group_metadata_migration.cc +++ b/src/v/kafka/server/group_metadata_migration.cc @@ -191,7 +191,7 @@ bool are_offsets_equal( bool are_stms_equivalent(const group_stm& source, const group_stm& target) { if (source.get_metadata() != target.get_metadata()) { vlog( - mlog.debug, + mlog.info, "source and target group metadata does not match, source: {}, " "target: {}", source.get_metadata(), @@ -199,21 +199,27 @@ bool are_stms_equivalent(const group_stm& source, const group_stm& target) { return false; } if (source.offsets().size() != target.offsets().size()) { + vlog( + mlog.info, + "Offsets hash_maps for group_stm have a different size. Source: {}, " + "target: {}", + source.offsets().size(), + target.offsets().size()); return false; } for (auto& [tp, o] : source.offsets()) { auto it = target.offsets().find(tp); if (it == target.offsets().end()) { vlog( - mlog.debug, + mlog.info, "unable to find offset for {} in target group offsets", tp); return false; } if (it->second.metadata != o.metadata) { vlog( - mlog.debug, - "{} offsets does not match. source: {}, target {}", + mlog.info, + "{} offsets does not match. source: {}, target: {}", tp, o.metadata, it->second.metadata); @@ -222,11 +228,16 @@ bool are_stms_equivalent(const group_stm& source, const group_stm& target) { } if (source.fences() != target.fences()) { - vlog(mlog.debug, "group fences does not match"); + vlog(mlog.info, "group fences does not match"); return false; } if (source.prepared_txs().size() != target.prepared_txs().size()) { + vlog( + mlog.info, + "Prepared txs hash_maps have different size. Source: {}, target: {}", + source.prepared_txs().size(), + target.prepared_txs().size()); return false; } @@ -234,24 +245,48 @@ bool are_stms_equivalent(const group_stm& source, const group_stm& target) { auto it = target.prepared_txs().find(tp); if (it == target.prepared_txs().end()) { vlog( - mlog.debug, + mlog.info, "unable to find preppared txs for {} in target group state", tp); return false; } if (it->second.tx_seq != tx.tx_seq || tx.pid != it->second.pid) { + vlog( + mlog.info, + "Inconsistent states for prepared transaction. Source:(pid: {}, " + "seq_num: {}), target(pid: {}, seq_num: {})", + tx.pid, + tx.tx_seq, + it->second.pid, + it->second.tx_seq); return false; } if (tx.offsets.size() != it->second.offsets.size()) { + vlog( + mlog.info, + "Transaction has different offsets size. Source: {}, target: {}", + tx.offsets.size(), + it->second.offsets.size()); return false; } for (auto& [offset_tp, o] : tx.offsets) { auto oit = it->second.offsets.find(offset_tp); if (oit == it->second.offsets.end()) { + vlog( + mlog.info, + "Can not find offset for topic ({}) in target prepared " + "transaction", + offset_tp); return false; } if (!are_offsets_equal(oit->second, o)) { + vlog( + mlog.info, + "Offsets are not equal for prepared transaction. Source: {}, " + "target: {}", + o, + oit->second); return false; } } @@ -264,16 +299,22 @@ bool are_states_equivalent( const group_recovery_consumer_state& source, const group_recovery_consumer_state& target) { if (source.groups.size() != target.groups.size()) { + vlog( + mlog.info, + "Groups have a different size. Source: {}, Target: {}", + source.groups.size(), + target.groups.size()); return false; } for (auto& [g, stm] : source.groups) { auto it = target.groups.find(g); if (it == target.groups.end()) { - vlog(mlog.debug, "group {} not found in target group state", g); + vlog(mlog.info, "group {} not found in target group state", g); return false; } if (!are_stms_equivalent(stm, it->second)) { + vlog(mlog.info, "State machines are not equivalent"); return false; } } @@ -504,7 +545,7 @@ ss::future<> wait_for_stable_group_topic( result.error()); } else { vlog( - mlog.debug, + mlog.info, "waiting for partition operations to finish, current state: {}", result.value()); }