diff --git a/src/v/raft/heartbeat_manager.cc b/src/v/raft/heartbeat_manager.cc index 266bab861278..d398c416fe8e 100644 --- a/src/v/raft/heartbeat_manager.cc +++ b/src/v/raft/heartbeat_manager.cc @@ -102,7 +102,8 @@ static heartbeat_requests requests_for_range( if (rni == ptr->self()) { auto hb_metadata = ptr->meta(); pending_beats[rni.id()].emplace_back( - heartbeat_metadata{hb_metadata, rni}, + heartbeat_metadata{ + .meta = hb_metadata, .node_id = rni, .target_node_id = rni}, heartbeat_manager::follower_request_meta( ptr, follower_req_seq(0), hb_metadata.prev_log_index, rni)); return; @@ -284,16 +285,17 @@ void heartbeat_manager::process_reply( vlog(hbeatlog.error, "cannot find consensus group:{}", g); continue; } + auto consensus = *it; - (*it)->update_heartbeat_status(req_meta.follower_vnode, false); + consensus->update_heartbeat_status(req_meta.follower_vnode, false); // propagate error - (*it)->process_append_entries_reply( + consensus->process_append_entries_reply( n, result(r.error()), req_meta.seq, req_meta.dirty_offset); - (*it)->get_probe().heartbeat_request_error(); + consensus->get_probe().heartbeat_request_error(); } return; } @@ -306,15 +308,40 @@ void heartbeat_manager::process_reply( m.group); continue; } + auto consensus = *it; vlog(hbeatlog.trace, "Heartbeat reply from node: {} - {}", n, m); - auto meta = std::move(groups.find(m.group)->second); - (*it)->update_heartbeat_status(meta.follower_vnode, true); - (*it)->process_append_entries_reply( + if (unlikely(m.target_node_id != consensus->self())) { + vlog( + hbeatlog.warn, + "Heartbeat response addressed to different node: {}, current " + "node: {}, response: {}", + m.target_node_id, + consensus->self(), + m); + continue; + } + + auto meta_it = groups.find(m.group); + if (unlikely(meta_it == groups.end())) { + vlog( + hbeatlog.warn, + "Unexpected heartbeat reply for group {} from node {}, skipping: " + "{}", + m.group, + n, + m); + continue; + } + + consensus->update_heartbeat_status( + meta_it->second.follower_vnode, true); + + consensus->process_append_entries_reply( n, - result(std::move(m)), - meta.seq, - meta.dirty_offset); + result(m), + meta_it->second.seq, + meta_it->second.dirty_offset); } }