Skip to content

Commit

Permalink
Merge pull request #5719 from mmaslankaprv/fix-4623
Browse files Browse the repository at this point in the history
r/heartbeat_manager: validate received heartbeat response group
  • Loading branch information
jcsp committed Aug 2, 2022
2 parents 157f418 + 21e8d13 commit a2ea18e
Showing 1 changed file with 37 additions and 10 deletions.
47 changes: 37 additions & 10 deletions src/v/raft/heartbeat_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ static heartbeat_requests requests_for_range(
if (rni == ptr->self()) {
auto hb_metadata = ptr->meta();
pending_beats[rni.id()].emplace_back(
heartbeat_metadata{hb_metadata, rni},
heartbeat_metadata{
.meta = hb_metadata, .node_id = rni, .target_node_id = rni},
heartbeat_manager::follower_request_meta(
ptr, follower_req_seq(0), hb_metadata.prev_log_index, rni));
return;
Expand Down Expand Up @@ -284,16 +285,17 @@ void heartbeat_manager::process_reply(
vlog(hbeatlog.error, "cannot find consensus group:{}", g);
continue;
}
auto consensus = *it;

(*it)->update_heartbeat_status(req_meta.follower_vnode, false);
consensus->update_heartbeat_status(req_meta.follower_vnode, false);

// propagate error
(*it)->process_append_entries_reply(
consensus->process_append_entries_reply(
n,
result<append_entries_reply>(r.error()),
req_meta.seq,
req_meta.dirty_offset);
(*it)->get_probe().heartbeat_request_error();
consensus->get_probe().heartbeat_request_error();
}
return;
}
Expand All @@ -306,15 +308,40 @@ void heartbeat_manager::process_reply(
m.group);
continue;
}
auto consensus = *it;
vlog(hbeatlog.trace, "Heartbeat reply from node: {} - {}", n, m);
auto meta = std::move(groups.find(m.group)->second);
(*it)->update_heartbeat_status(meta.follower_vnode, true);

(*it)->process_append_entries_reply(
if (unlikely(m.target_node_id != consensus->self())) {
vlog(
hbeatlog.warn,
"Heartbeat response addressed to different node: {}, current "
"node: {}, response: {}",
m.target_node_id,
consensus->self(),
m);
continue;
}

auto meta_it = groups.find(m.group);
if (unlikely(meta_it == groups.end())) {
vlog(
hbeatlog.warn,
"Unexpected heartbeat reply for group {} from node {}, skipping: "
"{}",
m.group,
n,
m);
continue;
}

consensus->update_heartbeat_status(
meta_it->second.follower_vnode, true);

consensus->process_append_entries_reply(
n,
result<append_entries_reply>(std::move(m)),
meta.seq,
meta.dirty_offset);
result<append_entries_reply>(m),
meta_it->second.seq,
meta_it->second.dirty_offset);
}
}

Expand Down

0 comments on commit a2ea18e

Please sign in to comment.