Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check pending conf change before campaign #225

Merged
merged 4 commits into from
Apr 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/single_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ fn send_propose(sender: mpsc::Sender<Msg>) {
cb: Box::new(move || {
s1.send(0).unwrap();
}),
}).unwrap();
})
.unwrap();

let n = r1.recv().unwrap();
assert_eq!(n, 0);
Expand Down
86 changes: 52 additions & 34 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,10 +572,7 @@ impl<T: Storage> Raft<T> {
self.bcast_heartbeat_with_ctx(ctx)
}

#[cfg_attr(
feature = "cargo-clippy",
allow(clippy::needless_pass_by_value)
)]
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))]
fn bcast_heartbeat_with_ctx(&mut self, ctx: Option<Vec<u8>>) {
let self_id = self.id;
let mut prs = self.take_prs();
Expand Down Expand Up @@ -1002,35 +999,7 @@ impl<T: Storage> Raft<T> {
fail_point!("before_step");

match m.get_msg_type() {
MessageType::MsgHup => if self.state != StateRole::Leader {
let ents = self
.raft_log
.slice(
self.raft_log.applied + 1,
self.raft_log.committed + 1,
raft_log::NO_LIMIT,
).expect("unexpected error getting unapplied entries");
let n = self.num_pending_conf(&ents);
if n != 0 && self.raft_log.committed > self.raft_log.applied {
warn!(
"{} cannot campaign at term {} since there are still {} pending \
configuration changes to apply",
self.tag, self.term, n
);
return Ok(());
}
info!(
"{} is starting a new election at term {}",
self.tag, self.term
);
if self.pre_vote {
self.campaign(CAMPAIGN_PRE_ELECTION);
} else {
self.campaign(CAMPAIGN_ELECTION);
}
} else {
debug!("{} ignoring MsgHup because already leader", self.tag);
},
MessageType::MsgHup => self.hup(false),
MessageType::MsgRequestVote | MessageType::MsgRequestPreVote => {
// We can vote if this is a repeat of a vote we've already cast...
let can_vote = (self.vote == m.get_from()) ||
Expand Down Expand Up @@ -1079,6 +1048,55 @@ impl<T: Storage> Raft<T> {
Ok(())
}

fn hup(&mut self, transfer_leader: bool) {
if self.state == StateRole::Leader {
debug!("{} ignoring MsgHup because already leader", self.tag);
return;
}

// If there is a pending snapshot, its index will be returned by
// `maybe_first_index`. Note that snapshot updates configuration
// already, so as long as pending entries don't contain conf change
// it's safe to start campaign.
let first_index = match self.raft_log.unstable.maybe_first_index() {
Some(idx) => idx,
None => self.raft_log.applied + 1,
};

let ents = self
.raft_log
.slice(first_index, self.raft_log.committed + 1, raft_log::NO_LIMIT)
.unwrap_or_else(|e| {
panic!(
"{} unexpected error getting unapplied entries [{}, {}): {:?}",
self.tag,
first_index,
self.raft_log.committed + 1,
e
);
});
let n = self.num_pending_conf(&ents);
if n != 0 {
warn!(
"{} cannot campaign at term {} since there are still {} pending \
configuration changes to apply",
self.tag, self.term, n
);
return;
}
info!(
"{} is starting a new election at term {}",
self.tag, self.term
);
if transfer_leader {
self.campaign(CAMPAIGN_TRANSFER);
} else if self.pre_vote {
self.campaign(CAMPAIGN_PRE_ELECTION);
} else {
self.campaign(CAMPAIGN_ELECTION);
}
}

fn log_vote_approve(&self, m: &Message) {
info!(
"{} [logterm: {}, index: {}, vote: {}] cast {:?} for {} [logterm: {}, index: {}] \
Expand Down Expand Up @@ -1632,7 +1650,7 @@ impl<T: Storage> Raft<T> {
// Leadership transfers never use pre-vote even if self.pre_vote is true; we
// know we are not recovering from a partition so there is no need for the
// extra round trip.
self.campaign(CAMPAIGN_TRANSFER);
self.hup(true);
} else {
info!(
"{} received MsgTimeoutNow from {} but is not promotable",
Expand Down
8 changes: 3 additions & 5 deletions src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ impl Ready {
(match since_idx {
None => raft.raft_log.next_entries(),
Some(idx) => raft.raft_log.next_entries_since(idx),
}).unwrap_or_else(Vec::new),
})
.unwrap_or_else(Vec::new),
);
let ss = raft.soft_state();
if &ss != prev_ss {
Expand Down Expand Up @@ -286,10 +287,7 @@ impl<T: Storage> RawNode<T> {
}

/// ProposeConfChange proposes a config change.
#[cfg_attr(
feature = "cargo-clippy",
allow(clippy::needless_pass_by_value)
)]
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))]
pub fn propose_conf_change(&mut self, context: Vec<u8>, cc: ConfChange) -> Result<()> {
let data = protobuf::Message::write_to_bytes(&cc)?;
let mut m = Message::new();
Expand Down
3 changes: 2 additions & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ pub fn limit_size<T: Message + Clone>(entries: &mut Vec<T>, max: u64) {
size += u64::from(Message::compute_size(e));
size <= max
}
}).count();
})
.count();

entries.truncate(limit);
}
62 changes: 62 additions & 0 deletions tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4049,3 +4049,65 @@ fn test_prevote_with_check_quorum() {
assert_eq!(network.peers[&2].state, StateRole::Leader, "peer 2 state",);
assert_eq!(network.peers[&3].state, StateRole::Follower, "peer 3 state",);
}

/// Tests if unapplied conf change is checked before campaign.
#[test]
fn test_conf_change_check_before_campaign() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't call self.hub(true), the test can fail?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The case was written before the fix, and it failed.

setup_for_test();
let mut nt = Network::new(vec![None, None, None]);
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);
assert_eq!(nt.peers[&1].state, StateRole::Leader);

let mut m = new_message(1, 1, MessageType::MsgPropose, 0);
let mut e = Entry::new();
e.set_entry_type(EntryType::EntryConfChange);
let mut cc = ConfChange::new();
cc.set_change_type(ConfChangeType::RemoveNode);
cc.set_node_id(3);
e.set_data(protobuf::Message::write_to_bytes(&cc).unwrap());
m.mut_entries().push(e);
nt.send(vec![m]);

// trigger campaign in node 2
nt.peers
.get_mut(&2)
.unwrap()
.reset_randomized_election_timeout();
let timeout = nt.peers[&2].get_randomized_election_timeout();
for _ in 0..timeout {
nt.peers.get_mut(&2).unwrap().tick();
}
// It's still follower because committed conf change is not applied.
assert_eq!(nt.peers[&2].state, StateRole::Follower);

// Transfer leadership to peer 2.
nt.send(vec![new_message(2, 1, MessageType::MsgTransferLeader, 0)]);
assert_eq!(nt.peers[&1].state, StateRole::Leader);
// It's still follower because committed conf change is not applied.
assert_eq!(nt.peers[&2].state, StateRole::Follower);
// Abort transfer leader.
nt.peers.get_mut(&1).unwrap().abort_leader_transfer();

let committed = nt.peers[&2].raft_log.committed;
nt.peers.get_mut(&2).unwrap().raft_log.applied_to(committed);
nt.peers.get_mut(&2).unwrap().remove_node(3);

// transfer leadership to peer 2 again.
nt.send(vec![new_message(2, 1, MessageType::MsgTransferLeader, 0)]);
assert_eq!(nt.peers[&1].state, StateRole::Follower);
assert_eq!(nt.peers[&2].state, StateRole::Leader);

nt.peers.get_mut(&1).unwrap().raft_log.applied_to(committed);
nt.peers.get_mut(&1).unwrap().remove_node(3);

// trigger campaign in node 1
nt.peers
.get_mut(&1)
.unwrap()
.reset_randomized_election_timeout();
let timeout = nt.peers[&1].get_randomized_election_timeout();
for _ in 0..timeout {
nt.peers.get_mut(&1).unwrap().tick();
}
assert_eq!(nt.peers[&1].state, StateRole::Candidate);
}
6 changes: 4 additions & 2 deletions tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ fn new_raw_node(
&new_test_config(id, peers, election, heartbeat),
storage,
peer_nodes,
).unwrap()
)
.unwrap()
}

// test_raw_node_step ensures that RawNode.Step ignore local message.
Expand All @@ -101,7 +102,8 @@ fn test_raw_node_step() {
MessageType::MsgHup,
MessageType::MsgUnreachable,
MessageType::MsgSnapStatus,
].contains(msg_t)
]
.contains(msg_t)
{
assert_eq!(res, Err(Error::StepLocalMsg));
}
Expand Down
6 changes: 4 additions & 2 deletions tests/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,12 @@ impl Network {
.get(&Connem {
from: m.get_from(),
to: m.get_to(),
}).cloned()
})
.cloned()
.unwrap_or(0f64);
rand::random::<f64>() >= perc
}).collect()
})
.collect()
}

pub fn send(&mut self, msgs: Vec<Message>) {
Expand Down