Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: port coreos/etcd#9985 #354

Merged
merged 1 commit into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,75 @@ fn test_progress_paused() {
assert_eq!(ms.len(), 1);
}

#[test]
fn test_progress_flow_control() {
let l = default_logger();
let mut cfg = new_test_config(1, 5, 1);
cfg.max_inflight_msgs = 3;
cfg.max_size_per_msg = 2048;
let s = MemStorage::new_with_conf_state((vec![1, 2], vec![]));
let mut r = new_test_raft_with_config(&cfg, s, &l);
r.become_candidate();
r.become_leader();

// Throw away all the messages relating to the initial election.
r.read_messages();

// While node 2 is in probe state, propose a bunch of entries.
r.mut_prs().get_mut(2).unwrap().become_probe();
let data: String = std::iter::repeat('a').take(1000).collect();
for _ in 0..10 {
let msg = new_message_with_entries(
1,
1,
MessageType::MsgPropose,
vec![new_entry(0, 0, Some(&data))],
);
r.step(msg).unwrap();
}

let mut ms = r.read_messages();
// First append has two entries: the empty entry to confirm the
// election, and the first proposal (only one proposal gets sent
// because we're in probe state).
assert_eq!(ms.len(), 1);
assert_eq!(ms[0].msg_type, MessageType::MsgAppend);
assert_eq!(ms[0].entries.len(), 2);
assert_eq!(ms[0].entries[0].data.len(), 0);
assert_eq!(ms[0].entries[1].data.len(), 1000);

// When this append is acked, we change to replicate state and can
// send multiple messages at once.
let mut msg = new_message(2, 1, MessageType::MsgAppendResponse, 0);
msg.index = ms[0].entries[1].index;
r.step(msg).unwrap();
ms = r.read_messages();
assert_eq!(ms.len(), 3);
for (i, m) in ms.iter().enumerate() {
if m.msg_type != MessageType::MsgAppend {
panic!("{}: expected MsgAppend, got {:?}", i, m.msg_type);
}
if m.entries.len() != 2 {
panic!("{}: expected 2 entries, got {}", i, m.entries.len());
}
}

// Ack all three of those messages together and get the last two
// messages (containing three entries).
let mut msg = new_message(2, 1, MessageType::MsgAppendResponse, 0);
msg.index = ms[2].entries[1].index;
r.step(msg).unwrap();
ms = r.read_messages();
assert_eq!(ms.len(), 2);
for (i, m) in ms.iter().enumerate() {
if m.msg_type != MessageType::MsgAppend {
panic!("{}: expected MsgAppend, got {:?}", i, m.msg_type);
}
}
assert_eq!(ms[0].entries.len(), 2);
assert_eq!(ms[1].entries.len(), 1);
}

#[test]
fn test_leader_election() {
let l = default_logger();
Expand Down
173 changes: 97 additions & 76 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ pub struct SoftState {
pub raft_state: StateRole,
}

#[derive(Default)]
pub struct HandleResponseContext {
maybe_commit: bool,
send_append: bool,
loop_append: bool,
transfer_leader: bool,
old_paused: bool,
more_to_send: Vec<Message>,
}

/// A struct that represents the raft consensus itself. Stores details concerning the current
/// and possible state the system can take.
#[derive(Getters)]
Expand Down Expand Up @@ -558,41 +568,57 @@ impl<T: Storage> Raft<T> {
is_batched
}

/// Sends RPC, with entries to the given peer.
/// Sends an append RPC with new entries (if any) and the current commit index to the given
/// peer.
pub fn send_append(&mut self, to: u64, pr: &mut Progress) {
self.maybe_send_append(to, pr, true);
}

/// Sends an append RPC with new entries to the given peer,
/// if necessary. Returns true if a message was sent. The allow_empty
/// argument controls whether messages with no entries will be sent
/// ("empty" messages are useful to convey updated Commit indexes, but
/// are undesirable when we're sending multiple messages in a batch).
fn maybe_send_append(&mut self, to: u64, pr: &mut Progress, allow_empty: bool) -> bool {
if pr.is_paused() {
trace!(
self.logger,
"Skipping sending to {to}, it's paused",
to = to;
"progress" => ?pr,
);
return;
return false;
}
let mut m = Message::default();
m.to = to;
if pr.pending_request_snapshot != INVALID_INDEX {
// Check pending request snapshot first to avoid unnecessary loading entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
return false;
}
} else {
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
if !allow_empty && ents.as_ref().ok().map_or(true, |e| e.is_empty()) {
return false;
}
let term = self.raft_log.term(pr.next_idx - 1);
match (term, ents) {
(Ok(term), Ok(mut ents)) => {
if self.batch_append && self.try_batching(to, pr, &mut ents) {
return true;
}
self.prepare_send_entries(&mut m, pr, term, ents)
}
} else {
let mut ents = ents.unwrap();
if self.batch_append && self.try_batching(to, pr, &mut ents) {
return;
_ => {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return false;
}
}
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents);
}
}
self.send(m);
true
}

// send_heartbeat sends an empty MsgAppend
Expand Down Expand Up @@ -1224,9 +1250,7 @@ impl<T: Storage> Raft<T> {
&mut self,
m: &Message,
prs: &mut ProgressSet,
old_paused: &mut bool,
send_append: &mut bool,
maybe_commit: &mut bool,
ctx: &mut HandleResponseContext,
) {
let pr = prs.get_mut(m.from).unwrap();
pr.recent_active = true;
Expand All @@ -1250,54 +1274,59 @@ impl<T: Storage> Raft<T> {
if pr.state == ProgressState::Replicate {
pr.become_probe();
}
*send_append = true;
ctx.send_append = true;
}
return;
}

*old_paused = pr.is_paused();
ctx.old_paused = pr.is_paused();
if !pr.maybe_update(m.index) {
return;
}

// Transfer leadership is in progress.
if let Some(lead_transferee) = self.lead_transferee {
let last_index = self.raft_log.last_index();
if m.from == lead_transferee && pr.matched == last_index {
info!(
self.logger,
"sent MsgTimeoutNow to {from} after received MsgAppResp",
from = m.from;
);
self.send_timeout_now(m.from);
}
}

match pr.state {
ProgressState::Probe => pr.become_replicate(),
ProgressState::Snapshot => {
if !pr.maybe_snapshot_abort() {
return;
if pr.maybe_snapshot_abort() {
debug!(
self.logger,
"snapshot aborted, resumed sending replication messages to {from}",
from = m.from;
"progress" => ?pr,
);
pr.become_probe();
}
debug!(
}
ProgressState::Replicate => pr.ins.free_to(m.get_index()),
}
ctx.maybe_commit = true;
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
ctx.loop_append = true;

// Transfer leadership is in progress.
if Some(m.from) == self.lead_transferee {
let last_index = self.raft_log.last_index();
if pr.matched == last_index {
info!(
self.logger,
"snapshot aborted, resumed sending replication messages to {from}",
"sent MsgTimeoutNow to {from} after received MsgAppResp",
from = m.from;
"progress" => ?pr,
);
pr.become_probe();
ctx.transfer_leader = true;
}
ProgressState::Replicate => pr.ins.free_to(m.index),
}
*maybe_commit = true;
}

fn handle_heartbeat_response(
&mut self,
m: &Message,
prs: &mut ProgressSet,
send_append: &mut bool,
more_to_send: &mut Vec<Message>,
ctx: &mut HandleResponseContext,
) {
// Update the node. Drop the value explicitly since we'll check the qourum after.
{
Expand All @@ -1313,7 +1342,7 @@ impl<T: Storage> Raft<T> {
if pr.matched < self.raft_log.last_index()
|| pr.pending_request_snapshot != INVALID_INDEX
{
*send_append = true;
ctx.send_append = true;
}

if self.read_only.option != ReadOnlyOption::Safe || m.context.is_empty() {
Expand Down Expand Up @@ -1341,7 +1370,7 @@ impl<T: Storage> Raft<T> {
to_send.to = req.from;
to_send.index = rs.index;
to_send.set_entries(req.take_entries());
more_to_send.push(to_send);
ctx.more_to_send.push(to_send);
}
}
}
Expand All @@ -1356,9 +1385,8 @@ impl<T: Storage> Raft<T> {
return;
}
let lead_transferee = from;
let last_lead_transferee = self.lead_transferee;
if last_lead_transferee.is_some() {
if last_lead_transferee.unwrap() == lead_transferee {
if let Some(last_lead_transferee) = self.lead_transferee {
if last_lead_transferee == lead_transferee {
info!(
self.logger,
"[term {term}] transfer leadership to {lead_transferee} is in progress, ignores request \
Expand All @@ -1373,7 +1401,7 @@ impl<T: Storage> Raft<T> {
self.logger,
"[term {term}] abort previous transferring leadership to {last_lead_transferee}",
term = self.term,
last_lead_transferee = last_lead_transferee.unwrap();
last_lead_transferee = last_lead_transferee;
);
}
if lead_transferee == self.id {
Expand Down Expand Up @@ -1434,14 +1462,7 @@ impl<T: Storage> Raft<T> {
}

/// Check message's progress to decide which action should be taken.
fn check_message_with_progress(
&mut self,
m: &mut Message,
send_append: &mut bool,
old_paused: &mut bool,
maybe_commit: &mut bool,
more_to_send: &mut Vec<Message>,
) {
fn check_message_with_progress(&mut self, m: &mut Message, ctx: &mut HandleResponseContext) {
if self.prs().get(m.from).is_none() {
debug!(
self.logger,
Expand All @@ -1454,10 +1475,10 @@ impl<T: Storage> Raft<T> {
let mut prs = self.take_prs();
match m.get_msg_type() {
MessageType::MsgAppendResponse => {
self.handle_append_response(m, &mut prs, old_paused, send_append, maybe_commit);
self.handle_append_response(m, &mut prs, ctx);
}
MessageType::MsgHeartbeatResponse => {
self.handle_heartbeat_response(m, &mut prs, send_append, more_to_send);
self.handle_heartbeat_response(m, &mut prs, ctx);
}
MessageType::MsgSnapStatus => {
let pr = prs.get_mut(m.from).unwrap();
Expand Down Expand Up @@ -1610,38 +1631,38 @@ impl<T: Storage> Raft<T> {
_ => {}
}

let mut send_append = false;
let mut maybe_commit = false;
let mut old_paused = false;
let mut more_to_send = vec![];
self.check_message_with_progress(
&mut m,
&mut send_append,
&mut old_paused,
&mut maybe_commit,
&mut more_to_send,
);
if maybe_commit {
let mut ctx = HandleResponseContext::default();
self.check_message_with_progress(&mut m, &mut ctx);
if ctx.maybe_commit {
if self.maybe_commit() {
if self.should_bcast_commit() {
self.bcast_append();
}
} else if old_paused {
} else if ctx.old_paused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
send_append = true;
ctx.send_append = true;
}
}

if send_append {
if ctx.send_append || ctx.loop_append {
let from = m.from;
let mut prs = self.take_prs();
self.send_append(from, prs.get_mut(from).unwrap());
let pr = prs.get_mut(from).unwrap();
if ctx.send_append {
self.send_append(from, pr);
}
if ctx.loop_append {
while self.maybe_send_append(from, pr, false) {}
}
self.set_prs(prs);
}
if !more_to_send.is_empty() {
for to_send in more_to_send.drain(..) {
self.send(to_send);
if ctx.transfer_leader {
self.send_timeout_now(m.get_from());
}
if !ctx.more_to_send.is_empty() {
for m in ctx.more_to_send.drain(..) {
self.send(m);
}
}

Expand Down