Skip to content

Commit

Permalink
Merge pull request #4902 from VadimPlh/fix-logging-migration
Browse files Browse the repository at this point in the history
Fix logging in group_metadata_migration
  • Loading branch information
VadimPlh committed May 24, 2022
2 parents 006f3d6 + fb3c334 commit d5757a9
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 8 deletions.
11 changes: 11 additions & 0 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2669,4 +2669,15 @@ void group::add_pending_member(
res.first->second.arm(timeout);
}

std::ostream& operator<<(std::ostream& o, const group::offset_metadata& md) {
fmt::print(
o,
"{log_offset:{}, offset:{}, metadata:{}, committed_leader_epoch:{}}",
md.log_offset,
md.offset,
md.metadata,
md.committed_leader_epoch);
return o;
}

} // namespace kafka
2 changes: 2 additions & 0 deletions src/v/kafka/server/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ class group {
model::offset offset;
ss::sstring metadata;
kafka::leader_epoch committed_leader_epoch;

friend std::ostream& operator<<(std::ostream&, const offset_metadata&);
};

struct offset_metadata_with_probe {
Expand Down
57 changes: 49 additions & 8 deletions src/v/kafka/server/group_metadata_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,29 +191,35 @@ bool are_offsets_equal(
bool are_stms_equivalent(const group_stm& source, const group_stm& target) {
if (source.get_metadata() != target.get_metadata()) {
vlog(
mlog.debug,
mlog.info,
"source and target group metadata does not match, source: {}, "
"target: {}",
source.get_metadata(),
target.get_metadata());
return false;
}
if (source.offsets().size() != target.offsets().size()) {
vlog(
mlog.info,
"Offsets hash_maps for group_stm have a different size. Source: {}, "
"target: {}",
source.offsets().size(),
target.offsets().size());
return false;
}
for (auto& [tp, o] : source.offsets()) {
auto it = target.offsets().find(tp);
if (it == target.offsets().end()) {
vlog(
mlog.debug,
mlog.info,
"unable to find offset for {} in target group offsets",
tp);
return false;
}
if (it->second.metadata != o.metadata) {
vlog(
mlog.debug,
"{} offsets does not match. source: {}, target {}",
mlog.info,
"{} offsets does not match. source: {}, target: {}",
tp,
o.metadata,
it->second.metadata);
Expand All @@ -222,36 +228,65 @@ bool are_stms_equivalent(const group_stm& source, const group_stm& target) {
}

if (source.fences() != target.fences()) {
vlog(mlog.debug, "group fences does not match");
vlog(mlog.info, "group fences does not match");
return false;
}

if (source.prepared_txs().size() != target.prepared_txs().size()) {
vlog(
mlog.info,
"Prepared txs hash_maps have different size. Source: {}, target: {}",
source.prepared_txs().size(),
target.prepared_txs().size());
return false;
}

for (auto& [tp, tx] : source.prepared_txs()) {
auto it = target.prepared_txs().find(tp);
if (it == target.prepared_txs().end()) {
vlog(
mlog.debug,
mlog.info,
"unable to find preppared txs for {} in target group state",
tp);
return false;
}
if (it->second.tx_seq != tx.tx_seq || tx.pid != it->second.pid) {
vlog(
mlog.info,
"Inconsistent states for prepared transaction. Source:(pid: {}, "
"seq_num: {}), target(pid: {}, seq_num: {})",
tx.pid,
tx.tx_seq,
it->second.pid,
it->second.tx_seq);
return false;
}
if (tx.offsets.size() != it->second.offsets.size()) {
vlog(
mlog.info,
"Transaction has different offsets size. Source: {}, target: {}",
tx.offsets.size(),
it->second.offsets.size());
return false;
}

for (auto& [offset_tp, o] : tx.offsets) {
auto oit = it->second.offsets.find(offset_tp);
if (oit == it->second.offsets.end()) {
vlog(
mlog.info,
"Can not find offset for topic ({}) in target prepared "
"transaction",
offset_tp);
return false;
}
if (!are_offsets_equal(oit->second, o)) {
vlog(
mlog.info,
"Offsets are not equal for prepared transaction. Source: {}, "
"target: {}",
o,
oit->second);
return false;
}
}
Expand All @@ -264,16 +299,22 @@ bool are_states_equivalent(
const group_recovery_consumer_state& source,
const group_recovery_consumer_state& target) {
if (source.groups.size() != target.groups.size()) {
vlog(
mlog.info,
"Groups have a different size. Source: {}, Target: {}",
source.groups.size(),
target.groups.size());
return false;
}

for (auto& [g, stm] : source.groups) {
auto it = target.groups.find(g);
if (it == target.groups.end()) {
vlog(mlog.debug, "group {} not found in target group state", g);
vlog(mlog.info, "group {} not found in target group state", g);
return false;
}
if (!are_stms_equivalent(stm, it->second)) {
vlog(mlog.info, "State machines are not equivalent");
return false;
}
}
Expand Down Expand Up @@ -504,7 +545,7 @@ ss::future<> wait_for_stable_group_topic(
result.error());
} else {
vlog(
mlog.debug,
mlog.info,
"waiting for partition operations to finish, current state: {}",
result.value());
}
Expand Down

0 comments on commit d5757a9

Please sign in to comment.