Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
  • Loading branch information
BusyJay committed Feb 27, 2020
1 parent fe10576 commit 1ecd4b8
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 88 deletions.
155 changes: 91 additions & 64 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,17 @@ pub fn quorum(total: usize) -> usize {
total / 2 + 1
}

#[derive(Default)]
pub struct HandleResponseContext {
maybe_commit: bool,
has_reply: bool,
send_append: bool,
loop_append: bool,
transfer_leader: bool,
old_paused: bool,
more_to_send: Vec<Message>,
}

impl<T: Storage> Raft<T> {
/// Creates a new raft for use on the node.
pub fn new(c: &Config, store: T) -> Raft<T> {
Expand Down Expand Up @@ -521,31 +532,45 @@ impl<T: Storage> Raft<T> {
}
}

/// Sends RPC, with entries to the given peer.
/// Sends an append RPC with new entries (if any) and the
/// current commit index to the given peer.
pub fn send_append(&mut self, to: u64, pr: &mut Progress) {
self.maybe_send_append(to, pr, true);
}

/// Sends an append RPC with new entries to the given peer,
/// if necessary. Returns true if a message was sent. The allow_empty
/// argument controls whether messages with no entries will be sent
/// ("empty" messages are useful to convey updated Commit indexes, but
/// are undesirable when we're sending multiple messages in a batch).
fn maybe_send_append(&mut self, to: u64, pr: &mut Progress, allow_empty: bool) -> bool {
if pr.is_paused() {
return;
return false;
}
let mut m = Message::new();
m.set_to(to);
if pr.pending_request_snapshot != INVALID_INDEX {
// Check pending request snapshot first to avoid unnecessary loading entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
return false;
}
} else {
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
if !allow_empty && ents.as_ref().ok().map_or(true, |e| e.is_empty()) {
return false;
}
let term = self.raft_log.term(pr.next_idx - 1);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
return false;
}
} else {
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap());
}
}
self.send(m);
true
}

// send_heartbeat sends an empty MsgAppend
Expand Down Expand Up @@ -1155,9 +1180,7 @@ impl<T: Storage> Raft<T> {
&mut self,
m: &Message,
prs: &mut ProgressSet,
old_paused: &mut bool,
send_append: &mut bool,
maybe_commit: &mut bool,
ctx: &mut HandleResponseContext,
) {
let pr = prs.get_mut(m.get_from()).unwrap();
pr.recent_active = true;
Expand All @@ -1181,56 +1204,63 @@ impl<T: Storage> Raft<T> {
if pr.state == ProgressState::Replicate {
pr.become_probe();
}
*send_append = true;
ctx.send_append = true;
ctx.has_reply = true;
}
return;
}

*old_paused = pr.is_paused();
ctx.old_paused = pr.is_paused();
if !pr.maybe_update(m.get_index()) {
return;
}

match pr.state {
ProgressState::Probe => pr.become_replicate(),
ProgressState::Snapshot => {
if pr.maybe_snapshot_abort() {
debug!(
"{} snapshot aborted, resumed sending replication messages to {} \
[{:?}]",
self.tag,
m.get_from(),
pr
);
pr.become_probe();
}
}
ProgressState::Replicate => pr.ins.free_to(m.get_index()),
}
ctx.maybe_commit = true;
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
ctx.loop_append = true;
ctx.has_reply = true;

// Transfer leadership is in progress.
if let Some(lead_transferee) = self.lead_transferee {
if Some(m.get_from()) == self.lead_transferee {
let last_index = self.raft_log.last_index();
if m.get_from() == lead_transferee && pr.matched == last_index {
if pr.matched == last_index {
info!(
"{} sent MsgTimeoutNow to {} after received MsgAppResp",
self.tag,
m.get_from()
);
self.send_timeout_now(m.get_from());
}
}

match pr.state {
ProgressState::Probe => pr.become_replicate(),
ProgressState::Snapshot => {
if !pr.maybe_snapshot_abort() {
return;
}
debug!(
"{} snapshot aborted, resumed sending replication messages to {} \
[{:?}]",
self.tag,
m.get_from(),
pr
);
pr.become_probe();
ctx.transfer_leader = true;
}
ProgressState::Replicate => pr.ins.free_to(m.get_index()),
}
*maybe_commit = true;
}

fn handle_heartbeat_response(
&mut self,
m: &Message,
prs: &mut ProgressSet,
quorum: usize,
send_append: &mut bool,
more_to_send: &mut Option<Message>,
ctx: &mut HandleResponseContext,
) {
let pr = prs.get_mut(m.get_from()).unwrap();
pr.recent_active = true;
Expand All @@ -1242,7 +1272,8 @@ impl<T: Storage> Raft<T> {
}
// Does it request snapshot?
if pr.matched < self.raft_log.last_index() || pr.pending_request_snapshot != INVALID_INDEX {
*send_append = true;
ctx.send_append = true;
ctx.has_reply = true;
}

if self.read_only.option != ReadOnlyOption::Safe || m.get_context().is_empty() {
Expand All @@ -1269,7 +1300,7 @@ impl<T: Storage> Raft<T> {
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.set_index(rs.index);
to_send.set_entries(req.take_entries());
*more_to_send = Some(to_send);
ctx.more_to_send.push(to_send);
}
}
}
Expand Down Expand Up @@ -1353,14 +1384,7 @@ impl<T: Storage> Raft<T> {
}

/// Check message's progress to decide which action should be taken.
fn check_message_with_progress(
&mut self,
m: &mut Message,
send_append: &mut bool,
old_paused: &mut bool,
maybe_commit: &mut bool,
more_to_send: &mut Option<Message>,
) {
fn check_message_with_progress(&mut self, m: &mut Message, ctx: &mut HandleResponseContext) {
if self.prs().get(m.get_from()).is_none() {
debug!("{} no progress available for {}", self.tag, m.get_from());
return;
Expand All @@ -1369,11 +1393,11 @@ impl<T: Storage> Raft<T> {
let mut prs = self.take_prs();
match m.get_msg_type() {
MessageType::MsgAppendResponse => {
self.handle_append_response(m, &mut prs, old_paused, send_append, maybe_commit);
self.handle_append_response(m, &mut prs, ctx);
}
MessageType::MsgHeartbeatResponse => {
let quorum = quorum(prs.voters().len());
self.handle_heartbeat_response(m, &mut prs, quorum, send_append, more_to_send);
self.handle_heartbeat_response(m, &mut prs, quorum, ctx);
}
MessageType::MsgSnapStatus => {
let pr = prs.get_mut(m.get_from()).unwrap();
Expand Down Expand Up @@ -1526,37 +1550,40 @@ impl<T: Storage> Raft<T> {
_ => {}
}

let mut send_append = false;
let mut maybe_commit = false;
let mut old_paused = false;
let mut more_to_send = None;
self.check_message_with_progress(
&mut m,
&mut send_append,
&mut old_paused,
&mut maybe_commit,
&mut more_to_send,
);
if maybe_commit {
let mut ctx = HandleResponseContext::default();
self.check_message_with_progress(&mut m, &mut ctx);
if ctx.maybe_commit {
if self.maybe_commit() {
if self.should_bcast_commit() {
self.bcast_append();
}
} else if old_paused {
} else if ctx.old_paused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
send_append = true;
ctx.send_append = true;
ctx.has_reply = true;
}
}

if send_append {
if ctx.has_reply {
let from = m.get_from();
let mut prs = self.take_prs();
self.send_append(from, prs.get_mut(from).unwrap());
let pr = prs.get_mut(from).unwrap();
if ctx.send_append {
self.send_append(from, pr);
}
if ctx.loop_append {
while self.maybe_send_append(from, pr, false) {}
}
self.set_prs(prs);
}
if let Some(to_send) = more_to_send {
self.send(to_send)
if ctx.transfer_leader {
self.send_timeout_now(m.get_from());
}
if !ctx.more_to_send.is_empty() {
for m in ctx.more_to_send.drain(..) {
self.send(m);
}
}

Ok(())
Expand Down
Loading

0 comments on commit 1ecd4b8

Please sign in to comment.