Skip to content
This repository has been archived by the owner on Jun 24, 2024. It is now read-only.

Commit

Permalink
Bump etcd
Browse files Browse the repository at this point in the history
  • Loading branch information
tbg committed Oct 22, 2018
1 parent e82d613 commit 50f622b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
36 changes: 27 additions & 9 deletions go.etcd.io/etcd/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,17 +635,27 @@ func (r *raft) reset(term uint64) {
r.readOnly = newReadOnly(r.readOnly.option)
}

func (r *raft) appendEntry(es ...pb.Entry) {
func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
li := r.raftLog.lastIndex()
for i := range es {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
}
// Track the size of this uncommitted proposal.
if !r.increaseUncommittedSize(es) {
r.logger.Debugf(
"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
r.id,
)
// Drop the proposal.
return false
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.getProgress(r.id).maybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
}

// tickElection is run by followers and candidates after r.electionTimeout.
Expand Down Expand Up @@ -739,7 +749,16 @@ func (r *raft) becomeLeader() {
// could be expensive.
r.pendingConfIndex = r.raftLog.lastIndex()

r.appendEntry(pb.Entry{Data: nil})
emptyEnt := pb.Entry{Data: nil}
if !r.appendEntry(emptyEnt) {
// This won't happen because we just called reset() above.
r.logger.Panic("empty entry was dropped")
}
// As a special case, don't count the initial empty entry towards the
// uncommitted log quota. This is because we want to preserve the
// behavior of allowing one entry larger than quota if the current
// usage is zero.
r.reduceUncommittedSize([]pb.Entry{emptyEnt})
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

Expand Down Expand Up @@ -970,10 +989,6 @@ func stepLeader(r *raft, m pb.Message) error {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return ErrProposalDropped
}
if !r.increaseUncommittedSize(m.Entries) {
r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id)
return ErrProposalDropped
}

for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
Expand All @@ -986,7 +1001,10 @@ func stepLeader(r *raft, m pb.Message) error {
}
}
}
r.appendEntry(m.Entries...)

if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
return nil
case pb.MsgReadIndex:
Expand Down Expand Up @@ -1490,7 +1508,7 @@ func (r *raft) abortLeaderTransfer() {
func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
var s uint64
for _, e := range ents {
s += uint64(e.Size())
s += uint64(PayloadSize(e))
}

if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
Expand All @@ -1513,7 +1531,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) {

var s uint64
for _, e := range ents {
s += uint64(e.Size())
s += uint64(PayloadSize(e))
}
if s > r.uncommittedSize {
// uncommittedSize may underestimate the size of the uncommitted Raft
Expand Down
6 changes: 6 additions & 0 deletions go.etcd.io/etcd/raft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string {
return buf.String()
}

// PayloadSize is the size of the payload of this Entry. Notably, it does not
// depend on its Index or Term.
func PayloadSize(e pb.Entry) int {
return len(e.Data)
}

// DescribeEntry returns a concise human-readable description of an
// Entry for debugging.
func DescribeEntry(e pb.Entry, f EntryFormatter) string {
Expand Down

0 comments on commit 50f622b

Please sign in to comment.