Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: fix bug in unbounded log growth prevention mechanism #10199

Merged
merged 1 commit into from
Oct 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
const maxEntries = 16
data := []byte("testdata")
testEntry := raftpb.Entry{Data: data}
maxEntrySize := uint64(maxEntries * testEntry.Size())
maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))

s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
Expand Down
36 changes: 27 additions & 9 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,17 +635,27 @@ func (r *raft) reset(term uint64) {
r.readOnly = newReadOnly(r.readOnly.option)
}

func (r *raft) appendEntry(es ...pb.Entry) {
func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
li := r.raftLog.lastIndex()
for i := range es {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
}
// Track the size of this uncommitted proposal.
if !r.increaseUncommittedSize(es) {
r.logger.Debugf(
"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
r.id,
)
// Drop the proposal.
return false
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.getProgress(r.id).maybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
}

// tickElection is run by followers and candidates after r.electionTimeout.
Expand Down Expand Up @@ -739,7 +749,16 @@ func (r *raft) becomeLeader() {
// could be expensive.
r.pendingConfIndex = r.raftLog.lastIndex()

r.appendEntry(pb.Entry{Data: nil})
emptyEnt := pb.Entry{Data: nil}
if !r.appendEntry(emptyEnt) {
// This won't happen because we just called reset() above.
r.logger.Panic("empty entry was dropped")
}
// As a special case, don't count the initial empty entry towards the
// uncommitted log quota. This is because we want to preserve the
// behavior of allowing one entry larger than quota if the current
// usage is zero.
r.reduceUncommittedSize([]pb.Entry{emptyEnt})
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

Expand Down Expand Up @@ -970,10 +989,6 @@ func stepLeader(r *raft, m pb.Message) error {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return ErrProposalDropped
}
if !r.increaseUncommittedSize(m.Entries) {
r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id)
return ErrProposalDropped
}

for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
Expand All @@ -986,7 +1001,10 @@ func stepLeader(r *raft, m pb.Message) error {
}
}
}
r.appendEntry(m.Entries...)

if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
return nil
case pb.MsgReadIndex:
Expand Down Expand Up @@ -1490,7 +1508,7 @@ func (r *raft) abortLeaderTransfer() {
func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
var s uint64
for _, e := range ents {
s += uint64(e.Size())
s += uint64(PayloadSize(e))
}

if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
Expand All @@ -1513,7 +1531,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) {

var s uint64
for _, e := range ents {
s += uint64(e.Size())
s += uint64(PayloadSize(e))
}
if s > r.uncommittedSize {
// uncommittedSize may underestimate the size of the uncommitted Raft
Expand Down
2 changes: 1 addition & 1 deletion raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestLeaderBcastBeat(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
for i := 0; i < 10; i++ {
r.appendEntry(pb.Entry{Index: uint64(i) + 1})
mustAppendEntry(r, pb.Entry{Index: uint64(i) + 1})
}

for i := 0; i < hi; i++ {
Expand Down
37 changes: 29 additions & 8 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
return ents
}

func mustAppendEntry(r *raft, ents ...pb.Entry) {
if !r.appendEntry(ents...) {
panic("entry unexpectedly dropped")
}
}

type stateMachine interface {
Step(m pb.Message) error
readMessages() []pb.Message
Expand Down Expand Up @@ -363,15 +369,24 @@ func TestProgressFlowControl(t *testing.T) {
}

func TestUncommittedEntryLimit(t *testing.T) {
const maxEntries = 16
// Use a relatively large number of entries here to prevent regression of a
// bug which computed the size before it was fixed. This test would fail
// with the bug, either because we'd get dropped proposals earlier than we
// expect them, or because the final tally ends up nonzero. (At the time of
// writing, the former).
const maxEntries = 1024
testEntry := pb.Entry{Data: []byte("testdata")}
maxEntrySize := maxEntries * testEntry.Size()
maxEntrySize := maxEntries * PayloadSize(testEntry)

cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())
cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize)
cfg.MaxInflightMsgs = 2 * 1024 // avoid interference
r := newRaft(cfg)
r.becomeCandidate()
r.becomeLeader()
if n := r.uncommittedSize; n != 0 {
t.Fatalf("expected zero uncommitted size, got %d bytes", n)
}

// Set the two followers to the replicate state. Commit to tail of log.
const numFollowers = 2
Expand Down Expand Up @@ -401,6 +416,9 @@ func TestUncommittedEntryLimit(t *testing.T) {
t.Fatalf("expected %d messages, got %d", e, len(ms))
}
r.reduceUncommittedSize(propEnts)
if r.uncommittedSize != 0 {
t.Fatalf("committed everything, but still tracking %d", r.uncommittedSize)
}

// Send a single large proposal to r1. Should be accepted even though it
// pushes us above the limit because we were beneath it before the proposal.
Expand All @@ -425,6 +443,9 @@ func TestUncommittedEntryLimit(t *testing.T) {
t.Fatalf("expected %d messages, got %d", e, len(ms))
}
r.reduceUncommittedSize(propEnts)
if n := r.uncommittedSize; n != 0 {
t.Fatalf("expected zero uncommitted size, got %d", n)
}
}

func TestLeaderElection(t *testing.T) {
Expand Down Expand Up @@ -2585,7 +2606,7 @@ func TestBcastBeat(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()
for i := 0; i < 10; i++ {
sm.appendEntry(pb.Entry{Index: uint64(i) + 1})
mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1})
}
// slow follower
sm.prs[2].Match, sm.prs[2].Next = 5, 6
Expand Down Expand Up @@ -2709,7 +2730,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
// we expect that raft will only send out one msgAPP on the first
// loop. After that, the follower is paused until a heartbeat response is
// received.
r.appendEntry(pb.Entry{Data: []byte("somedata")})
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
msg := r.readMessages()
if len(msg) != 1 {
Expand All @@ -2724,7 +2745,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
t.Errorf("paused = %v, want true", r.prs[2].Paused)
}
for j := 0; j < 10; j++ {
r.appendEntry(pb.Entry{Data: []byte("somedata")})
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
if l := len(r.readMessages()); l != 0 {
t.Errorf("len(msg) = %d, want %d", l, 0)
Expand Down Expand Up @@ -2771,7 +2792,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
r.prs[2].becomeReplicate()

for i := 0; i < 10; i++ {
r.appendEntry(pb.Entry{Data: []byte("somedata")})
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
msgs := r.readMessages()
if len(msgs) != 1 {
Expand All @@ -2788,7 +2809,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
r.prs[2].becomeSnapshot(10)

for i := 0; i < 10; i++ {
r.appendEntry(pb.Entry{Data: []byte("somedata")})
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
msgs := r.readMessages()
if len(msgs) != 0 {
Expand Down Expand Up @@ -3182,7 +3203,7 @@ func TestNewLeaderPendingConfig(t *testing.T) {
for i, tt := range tests {
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
if tt.addEntry {
r.appendEntry(pb.Entry{Type: pb.EntryNormal})
mustAppendEntry(r, pb.Entry{Type: pb.EntryNormal})
}
r.becomeCandidate()
r.becomeLeader()
Expand Down
2 changes: 1 addition & 1 deletion raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
const maxEntries = 16
data := []byte("testdata")
testEntry := raftpb.Entry{Data: data}
maxEntrySize := uint64(maxEntries * testEntry.Size())
maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))

s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
Expand Down
6 changes: 6 additions & 0 deletions raft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string {
return buf.String()
}

// PayloadSize is the size of the payload of this Entry. Notably, it does not
// depend on its Index or Term.
func PayloadSize(e pb.Entry) int {
return len(e.Data)
}

// DescribeEntry returns a concise human-readable description of an
// Entry for debugging.
func DescribeEntry(e pb.Entry, f EntryFormatter) string {
Expand Down