Skip to content

Commit

Permalink
raft: Make flow control more aggressive
Browse files Browse the repository at this point in the history
We allow multiple in-flight append messages, but prior to this change
the only way we'd ever send them is if there is a steady stream of new
proposals. Catching up a follower that is far behind would be
unnecessarily slow (this is exacerbated by a quirk of CockroachDB's
use of raft which limits our ability to catch up via snapshot in some
cases).

See cockroachdb/cockroach#27983
  • Loading branch information
bdarnell committed Aug 7, 2018
1 parent 93be31d commit a5ea496
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 16 deletions.
40 changes: 28 additions & 12 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,30 +441,37 @@ func (r *raft) getProgress(id uint64) *Progress {
return r.learnerPrs[id]
}

// sendAppend sends RPC, with entries to the given peer.
func (r *raft) sendAppend(to uint64) {
// sendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) sendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.getProgress(to)
if pr.IsPaused() {
return
return false
}
m := pb.Message{}
m.To = to

term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
if len(ents) == 0 && !sendIfEmpty {
return false
}

if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return
return false
}

m.Type = pb.MsgSnap
snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return
return false
}
panic(err) // TODO(bdarnell)
}
Expand Down Expand Up @@ -498,6 +505,7 @@ func (r *raft) sendAppend(to uint64) {
}
}
r.send(m)
return true
}

// sendHeartbeat sends an empty MsgApp
Expand Down Expand Up @@ -537,7 +545,7 @@ func (r *raft) bcastAppend() {
return
}

r.sendAppend(id)
r.sendAppend(id, true)
})
}

Expand Down Expand Up @@ -1002,7 +1010,7 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.sendAppend(m.From)
r.sendAppend(m.From, true)
}
} else {
oldPaused := pr.IsPaused()
Expand All @@ -1020,9 +1028,17 @@ func stepLeader(r *raft, m pb.Message) error {
if r.maybeCommit() {
r.bcastAppend()
} else if oldPaused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
r.sendAppend(m.From)
// If we were paused before, this node may be missing the
// latest commit index, so send it.
r.sendAppend(m.From, true)
}
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
for r.sendAppend(m.From, false) {
}
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
Expand All @@ -1040,7 +1056,7 @@ func stepLeader(r *raft, m pb.Message) error {
pr.ins.freeFirstOne()
}
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
r.sendAppend(m.From, true)
}

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
Expand Down Expand Up @@ -1113,7 +1129,7 @@ func stepLeader(r *raft, m pb.Message) error {
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
r.sendAppend(leadTransferee)
r.sendAppend(leadTransferee, true)
}
}
return nil
Expand Down
77 changes: 73 additions & 4 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"math/rand"
"reflect"
"strings"
"testing"

pb "github.com/coreos/etcd/raft/raftpb"
Expand Down Expand Up @@ -293,6 +294,74 @@ func TestProgressPaused(t *testing.T) {
}
}

func TestProgressFlowControl(t *testing.T) {
cfg := newTestConfig(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
cfg.MaxInflightMsgs = 3
cfg.MaxSizePerMsg = 2048
r := newRaft(cfg)
r.becomeCandidate()
r.becomeLeader()

// Throw away all the messages relating to the initial election.
r.readMessages()

// While node 2 is in probe state, propose a bunch of entries.
r.prs[2].becomeProbe()
blob := []byte(strings.Repeat("a", 1000))
for i := 0; i < 10; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
}

ms := r.readMessages()
// First append has two entries: the empty entry to confirm the
// election, and the first proposal (only one proposal gets sent
// because we're in probe state).
if len(ms) != 1 || ms[0].Type != pb.MsgApp {
t.Fatalf("expected 1 MsgApp, got %v", ms)
}
if len(ms[0].Entries) != 2 {
t.Fatalf("expected 2 entries, got %d", len(ms[0].Entries))
}
if len(ms[0].Entries[0].Data) != 0 || len(ms[0].Entries[1].Data) != 1000 {
t.Fatalf("unexpected entry sizes: %v", ms[0].Entries)
}

// When this append is acked, we change to replicate state and can
// send multiple messages at once.
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index})
ms = r.readMessages()
if len(ms) != 3 {
t.Fatalf("expected 3 messages, got %d", len(ms))
}
for i, m := range ms {
if m.Type != pb.MsgApp {
t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
}
if len(m.Entries) != 2 {
t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries))
}
}

// Ack all three of those messages together and get the last two
// messages (containing three entries).
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index})
ms = r.readMessages()
if len(ms) != 2 {
t.Fatalf("expected 2 messages, got %d", len(ms))
}
for i, m := range ms {
if m.Type != pb.MsgApp {
t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
}
}
if len(ms[0].Entries) != 2 {
t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries))
}
if len(ms[1].Entries) != 1 {
t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries))
}
}

func TestLeaderElection(t *testing.T) {
testLeaderElection(t, false)
}
Expand Down Expand Up @@ -2576,7 +2645,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
// loop. After that, the follower is paused until a heartbeat response is
// received.
r.appendEntry(pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.sendAppend(2, true)
msg := r.readMessages()
if len(msg) != 1 {
t.Errorf("len(msg) = %d, want %d", len(msg), 1)
Expand All @@ -2591,7 +2660,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
}
for j := 0; j < 10; j++ {
r.appendEntry(pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.sendAppend(2, true)
if l := len(r.readMessages()); l != 0 {
t.Errorf("len(msg) = %d, want %d", l, 0)
}
Expand Down Expand Up @@ -2638,7 +2707,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {

for i := 0; i < 10; i++ {
r.appendEntry(pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.sendAppend(2, true)
msgs := r.readMessages()
if len(msgs) != 1 {
t.Errorf("len(msg) = %d, want %d", len(msgs), 1)
Expand All @@ -2655,7 +2724,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {

for i := 0; i < 10; i++ {
r.appendEntry(pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.sendAppend(2, true)
msgs := r.readMessages()
if len(msgs) != 0 {
t.Errorf("len(msg) = %d, want %d", len(msgs), 0)
Expand Down

0 comments on commit a5ea496

Please sign in to comment.