Skip to content

Commit

Permalink
kafka/server Add more logging to group_metadata_migration
Browse files Browse the repository at this point in the history
  • Loading branch information
VadimPlh committed May 24, 2022
1 parent f0cc9e4 commit fb3c334
Showing 1 changed file with 42 additions and 1 deletion.
43 changes: 42 additions & 1 deletion src/v/kafka/server/group_metadata_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ bool are_stms_equivalent(const group_stm& source, const group_stm& target) {
return false;
}
if (source.offsets().size() != target.offsets().size()) {
vlog(
mlog.info,
"Offsets hash_maps for group_stm have a different size. Source: {}, "
"target: {}",
source.offsets().size(),
target.offsets().size());
return false;
}
for (auto& [tp, o] : source.offsets()) {
Expand All @@ -213,7 +219,7 @@ bool are_stms_equivalent(const group_stm& source, const group_stm& target) {
if (it->second.metadata != o.metadata) {
vlog(
mlog.info,
"{} offsets does not match. source: {}, target {}",
"{} offsets does not match. source: {}, target: {}",
tp,
o.metadata,
it->second.metadata);
Expand All @@ -227,6 +233,11 @@ bool are_stms_equivalent(const group_stm& source, const group_stm& target) {
}

if (source.prepared_txs().size() != target.prepared_txs().size()) {
vlog(
mlog.info,
"Prepared txs hash_maps have different size. Source: {}, target: {}",
source.prepared_txs().size(),
target.prepared_txs().size());
return false;
}

Expand All @@ -240,18 +251,42 @@ bool are_stms_equivalent(const group_stm& source, const group_stm& target) {
return false;
}
if (it->second.tx_seq != tx.tx_seq || tx.pid != it->second.pid) {
vlog(
mlog.info,
"Inconsistent states for prepared transaction. Source:(pid: {}, "
"seq_num: {}), target(pid: {}, seq_num: {})",
tx.pid,
tx.tx_seq,
it->second.pid,
it->second.tx_seq);
return false;
}
if (tx.offsets.size() != it->second.offsets.size()) {
vlog(
mlog.info,
"Transaction has different offsets size. Source: {}, target: {}",
tx.offsets.size(),
it->second.offsets.size());
return false;
}

for (auto& [offset_tp, o] : tx.offsets) {
auto oit = it->second.offsets.find(offset_tp);
if (oit == it->second.offsets.end()) {
vlog(
mlog.info,
"Can not find offset for topic ({}) in target prepared "
"transaction",
offset_tp);
return false;
}
if (!are_offsets_equal(oit->second, o)) {
vlog(
mlog.info,
"Offsets are not equal for prepared transaction. Source: {}, "
"target: {}",
o,
oit->second);
return false;
}
}
Expand All @@ -264,6 +299,11 @@ bool are_states_equivalent(
const group_recovery_consumer_state& source,
const group_recovery_consumer_state& target) {
if (source.groups.size() != target.groups.size()) {
vlog(
mlog.info,
"Groups have a different size. Source: {}, Target: {}",
source.groups.size(),
target.groups.size());
return false;
}

Expand All @@ -274,6 +314,7 @@ bool are_states_equivalent(
return false;
}
if (!are_stms_equivalent(stm, it->second)) {
vlog(mlog.info, "State machines are not equivalent");
return false;
}
}
Expand Down

0 comments on commit fb3c334

Please sign in to comment.