diff --git a/examples/single_mem_node/main.rs b/examples/single_mem_node/main.rs index 2b62a7286..cae1ec80f 100644 --- a/examples/single_mem_node/main.rs +++ b/examples/single_mem_node/main.rs @@ -196,7 +196,8 @@ fn send_propose(sender: mpsc::Sender) { cb: Box::new(move || { s1.send(0).unwrap(); }), - }).unwrap(); + }) + .unwrap(); let n = r1.recv().unwrap(); assert_eq!(n, 0); diff --git a/src/raft.rs b/src/raft.rs index 6413455b9..307c058a6 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -572,10 +572,7 @@ impl Raft { self.bcast_heartbeat_with_ctx(ctx) } - #[cfg_attr( - feature = "cargo-clippy", - allow(clippy::needless_pass_by_value) - )] + #[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))] fn bcast_heartbeat_with_ctx(&mut self, ctx: Option>) { let self_id = self.id; let mut prs = self.take_prs(); @@ -1002,35 +999,7 @@ impl Raft { fail_point!("before_step"); match m.get_msg_type() { - MessageType::MsgHup => if self.state != StateRole::Leader { - let ents = self - .raft_log - .slice( - self.raft_log.applied + 1, - self.raft_log.committed + 1, - raft_log::NO_LIMIT, - ).expect("unexpected error getting unapplied entries"); - let n = self.num_pending_conf(&ents); - if n != 0 && self.raft_log.committed > self.raft_log.applied { - warn!( - "{} cannot campaign at term {} since there are still {} pending \ - configuration changes to apply", - self.tag, self.term, n - ); - return Ok(()); - } - info!( - "{} is starting a new election at term {}", - self.tag, self.term - ); - if self.pre_vote { - self.campaign(CAMPAIGN_PRE_ELECTION); - } else { - self.campaign(CAMPAIGN_ELECTION); - } - } else { - debug!("{} ignoring MsgHup because already leader", self.tag); - }, + MessageType::MsgHup => self.hup(false), MessageType::MsgRequestVote | MessageType::MsgRequestPreVote => { // We can vote if this is a repeat of a vote we've already cast... let can_vote = (self.vote == m.get_from()) || @@ -1079,6 +1048,55 @@ impl Raft { Ok(()) } + fn hup(&mut self, transfer_leader: bool) { + if self.state == StateRole::Leader { + debug!("{} ignoring MsgHup because already leader", self.tag); + return; + } + + // If there is a pending snapshot, its index will be returned by + // `maybe_first_index`. Note that snapshot updates configuration + // already, so as long as pending entries don't contain conf change + // it's safe to start campaign. + let first_index = match self.raft_log.unstable.maybe_first_index() { + Some(idx) => idx, + None => self.raft_log.applied + 1, + }; + + let ents = self + .raft_log + .slice(first_index, self.raft_log.committed + 1, raft_log::NO_LIMIT) + .unwrap_or_else(|e| { + panic!( + "{} unexpected error getting unapplied entries [{}, {}): {:?}", + self.tag, + first_index, + self.raft_log.committed + 1, + e + ); + }); + let n = self.num_pending_conf(&ents); + if n != 0 { + warn!( + "{} cannot campaign at term {} since there are still {} pending \ + configuration changes to apply", + self.tag, self.term, n + ); + return; + } + info!( + "{} is starting a new election at term {}", + self.tag, self.term + ); + if transfer_leader { + self.campaign(CAMPAIGN_TRANSFER); + } else if self.pre_vote { + self.campaign(CAMPAIGN_PRE_ELECTION); + } else { + self.campaign(CAMPAIGN_ELECTION); + } + } + fn log_vote_approve(&self, m: &Message) { info!( "{} [logterm: {}, index: {}, vote: {}] cast {:?} for {} [logterm: {}, index: {}] \ @@ -1632,7 +1650,7 @@ impl Raft { // Leadership transfers never use pre-vote even if self.pre_vote is true; we // know we are not recovering from a partition so there is no need for the // extra round trip. - self.campaign(CAMPAIGN_TRANSFER); + self.hup(true); } else { info!( "{} received MsgTimeoutNow from {} but is not promotable", diff --git a/src/raw_node.rs b/src/raw_node.rs index 43524a430..09f4fb625 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -153,7 +153,8 @@ impl Ready { (match since_idx { None => raft.raft_log.next_entries(), Some(idx) => raft.raft_log.next_entries_since(idx), - }).unwrap_or_else(Vec::new), + }) + .unwrap_or_else(Vec::new), ); let ss = raft.soft_state(); if &ss != prev_ss { @@ -286,10 +287,7 @@ impl RawNode { } /// ProposeConfChange proposes a config change. - #[cfg_attr( - feature = "cargo-clippy", - allow(clippy::needless_pass_by_value) - )] + #[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))] pub fn propose_conf_change(&mut self, context: Vec, cc: ConfChange) -> Result<()> { let data = protobuf::Message::write_to_bytes(&cc)?; let mut m = Message::new(); diff --git a/src/util.rs b/src/util.rs index e5f1491ea..f74a76390 100644 --- a/src/util.rs +++ b/src/util.rs @@ -64,7 +64,8 @@ pub fn limit_size(entries: &mut Vec, max: u64) { size += u64::from(Message::compute_size(e)); size <= max } - }).count(); + }) + .count(); entries.truncate(limit); } diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index 1365eeade..480347c72 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -4049,3 +4049,65 @@ fn test_prevote_with_check_quorum() { assert_eq!(network.peers[&2].state, StateRole::Leader, "peer 2 state",); assert_eq!(network.peers[&3].state, StateRole::Follower, "peer 3 state",); } + +/// Tests if unapplied conf change is checked before campaign. +#[test] +fn test_conf_change_check_before_campaign() { + setup_for_test(); + let mut nt = Network::new(vec![None, None, None]); + nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); + assert_eq!(nt.peers[&1].state, StateRole::Leader); + + let mut m = new_message(1, 1, MessageType::MsgPropose, 0); + let mut e = Entry::new(); + e.set_entry_type(EntryType::EntryConfChange); + let mut cc = ConfChange::new(); + cc.set_change_type(ConfChangeType::RemoveNode); + cc.set_node_id(3); + e.set_data(protobuf::Message::write_to_bytes(&cc).unwrap()); + m.mut_entries().push(e); + nt.send(vec![m]); + + // trigger campaign in node 2 + nt.peers + .get_mut(&2) + .unwrap() + .reset_randomized_election_timeout(); + let timeout = nt.peers[&2].get_randomized_election_timeout(); + for _ in 0..timeout { + nt.peers.get_mut(&2).unwrap().tick(); + } + // It's still follower because committed conf change is not applied. + assert_eq!(nt.peers[&2].state, StateRole::Follower); + + // Transfer leadership to peer 2. + nt.send(vec![new_message(2, 1, MessageType::MsgTransferLeader, 0)]); + assert_eq!(nt.peers[&1].state, StateRole::Leader); + // It's still follower because committed conf change is not applied. + assert_eq!(nt.peers[&2].state, StateRole::Follower); + // Abort transfer leader. + nt.peers.get_mut(&1).unwrap().abort_leader_transfer(); + + let committed = nt.peers[&2].raft_log.committed; + nt.peers.get_mut(&2).unwrap().raft_log.applied_to(committed); + nt.peers.get_mut(&2).unwrap().remove_node(3); + + // transfer leadership to peer 2 again. + nt.send(vec![new_message(2, 1, MessageType::MsgTransferLeader, 0)]); + assert_eq!(nt.peers[&1].state, StateRole::Follower); + assert_eq!(nt.peers[&2].state, StateRole::Leader); + + nt.peers.get_mut(&1).unwrap().raft_log.applied_to(committed); + nt.peers.get_mut(&1).unwrap().remove_node(3); + + // trigger campaign in node 1 + nt.peers + .get_mut(&1) + .unwrap() + .reset_randomized_election_timeout(); + let timeout = nt.peers[&1].get_randomized_election_timeout(); + for _ in 0..timeout { + nt.peers.get_mut(&1).unwrap().tick(); + } + assert_eq!(nt.peers[&1].state, StateRole::Candidate); +} diff --git a/tests/integration_cases/test_raw_node.rs b/tests/integration_cases/test_raw_node.rs index f74b98507..ee620c36f 100644 --- a/tests/integration_cases/test_raw_node.rs +++ b/tests/integration_cases/test_raw_node.rs @@ -85,7 +85,8 @@ fn new_raw_node( &new_test_config(id, peers, election, heartbeat), storage, peer_nodes, - ).unwrap() + ) + .unwrap() } // test_raw_node_step ensures that RawNode.Step ignore local message. @@ -101,7 +102,8 @@ fn test_raw_node_step() { MessageType::MsgHup, MessageType::MsgUnreachable, MessageType::MsgSnapStatus, - ].contains(msg_t) + ] + .contains(msg_t) { assert_eq!(res, Err(Error::StepLocalMsg)); } diff --git a/tests/test_util/mod.rs b/tests/test_util/mod.rs index d9b433f74..2c1564d47 100644 --- a/tests/test_util/mod.rs +++ b/tests/test_util/mod.rs @@ -299,10 +299,12 @@ impl Network { .get(&Connem { from: m.get_from(), to: m.get_to(), - }).cloned() + }) + .cloned() .unwrap_or(0f64); rand::random::() >= perc - }).collect() + }) + .collect() } pub fn send(&mut self, msgs: Vec) {