Skip to content

Commit

Permalink
Merge pull request #10914 from tbg/api
Browse files Browse the repository at this point in the history
raft: allow use of joint quorums
  • Loading branch information
tbg committed Jul 23, 2019
2 parents fe86a78 + b9c051e commit d118342
Show file tree
Hide file tree
Showing 25 changed files with 1,484 additions and 297 deletions.
5 changes: 2 additions & 3 deletions clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"time"

bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/etcdserver/api/membership"
Expand All @@ -43,8 +44,6 @@ import (
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/wal"
"go.etcd.io/etcd/wal/walpb"

bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -482,7 +481,7 @@ func (s *v3Manager) saveWALAndSnap() error {
Index: commit,
Term: term,
ConfState: raftpb.ConfState{
Nodes: nodeIDs,
Voters: nodeIDs,
},
},
}
Expand Down
3 changes: 1 addition & 2 deletions etcdserver/api/snap/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ import (
"testing"

"go.etcd.io/etcd/raft/raftpb"

"go.uber.org/zap"
)

var testSnap = &raftpb.Snapshot{
Data: []byte("some snapshot"),
Metadata: raftpb.SnapshotMetadata{
ConfState: raftpb.ConfState{
Nodes: []uint64{1, 2, 3},
Voters: []uint64{1, 2, 3},
},
Index: 1,
Term: 1,
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
func getIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
ids := make(map[uint64]bool)
if snap != nil {
for _, id := range snap.Metadata.ConfState.Nodes {
for _, id := range snap.Metadata.ConfState.Voters {
ids[id] = true
}
}
Expand Down
16 changes: 8 additions & 8 deletions etcdserver/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ func TestGetIDs(t *testing.T) {
widSet []uint64
}{
{nil, []raftpb.Entry{}, []uint64{}},
{&raftpb.ConfState{Nodes: []uint64{1}},
{&raftpb.ConfState{Voters: []uint64{1}},
[]raftpb.Entry{}, []uint64{1}},
{&raftpb.ConfState{Nodes: []uint64{1}},
{&raftpb.ConfState{Voters: []uint64{1}},
[]raftpb.Entry{addEntry}, []uint64{1, 2}},
{&raftpb.ConfState{Nodes: []uint64{1}},
{&raftpb.ConfState{Voters: []uint64{1}},
[]raftpb.Entry{addEntry, removeEntry}, []uint64{1}},
{&raftpb.ConfState{Nodes: []uint64{1}},
{&raftpb.ConfState{Voters: []uint64{1}},
[]raftpb.Entry{addEntry, normalEntry}, []uint64{1, 2}},
{&raftpb.ConfState{Nodes: []uint64{1}},
{&raftpb.ConfState{Voters: []uint64{1}},
[]raftpb.Entry{addEntry, normalEntry, updateEntry}, []uint64{1, 2}},
{&raftpb.ConfState{Nodes: []uint64{1}},
{&raftpb.ConfState{Voters: []uint64{1}},
[]raftpb.Entry{addEntry, removeEntry, normalEntry}, []uint64{1}},
}

Expand Down Expand Up @@ -178,8 +178,8 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
}
}

// TestConfgChangeBlocksApply ensures apply blocks if committed entries contain config-change.
func TestConfgChangeBlocksApply(t *testing.T) {
// TestConfigChangeBlocksApply ensures apply blocks if committed entries contain config-change.
func TestConfigChangeBlocksApply(t *testing.T) {
n := newNopReadyNode()

r := newRaftNode(raftNodeConfig{
Expand Down
37 changes: 26 additions & 11 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"testing"
"time"

"go.uber.org/zap"

"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/etcdserver/api/rafthttp"
"go.etcd.io/etcd/etcdserver/api/snap"
Expand All @@ -49,6 +47,7 @@ import (
"go.etcd.io/etcd/pkg/wait"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.uber.org/zap"
)

// TestDoLocalAction tests requests which do not need to go through raft to be applied,
Expand Down Expand Up @@ -1017,7 +1016,7 @@ func TestSnapshot(t *testing.T) {
}
}()

srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}})
<-ch
<-ch
}
Expand Down Expand Up @@ -1632,7 +1631,7 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
n.Record(testutil.Action{Name: "Propose", Params: []interface{}{data}})
return nil
}
func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error {
n.Record(testutil.Action{Name: "ProposeConfChange"})
return nil
}
Expand All @@ -1645,7 +1644,7 @@ func (n *nodeRecorder) Ready() <-chan raft.Ready
func (n *nodeRecorder) TransferLeadership(ctx context.Context, lead, transferee uint64) {}
func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil }
func (n *nodeRecorder) Advance() {}
func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState {
n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
return &raftpb.ConfState{}
}
Expand Down Expand Up @@ -1706,21 +1705,37 @@ func newNodeConfChangeCommitterStream() *nodeConfChangeCommitterRecorder {
return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0}
}

func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
data, err := conf.Marshal()
func confChangeActionName(conf raftpb.ConfChangeI) string {
var s string
if confV1, ok := conf.AsV1(); ok {
s = confV1.Type.String()
} else {
for i, chg := range conf.AsV2().Changes {
if i > 0 {
s += "/"
}
s += chg.Type.String()
}
}
return s
}

func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error {
typ, data, err := raftpb.MarshalConfChange(conf)
if err != nil {
return err
}

n.index++
n.Record(testutil.Action{Name: "ProposeConfChange:" + conf.Type.String()})
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: raftpb.EntryConfChange, Data: data}}}
n.Record(testutil.Action{Name: "ProposeConfChange:" + confChangeActionName(conf)})
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: typ, Data: data}}}
return nil
}
func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
return n.readyc
}
func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
n.Record(testutil.Action{Name: "ApplyConfChange:" + conf.Type.String()})
func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState {
n.Record(testutil.Action{Name: "ApplyConfChange:" + confChangeActionName(conf)})
return &raftpb.ConfState{}
}

Expand Down
2 changes: 1 addition & 1 deletion raft/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {
// the invariant that committed < unstable?
rn.raft.raftLog.committed = uint64(len(ents))
for _, peer := range peers {
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
}
return nil
}
22 changes: 13 additions & 9 deletions raft/confchange/confchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ type Changer struct {
// to
// (1 2 3)&&(1 2 3).
//
// The supplied ConfChanges are then applied to the incoming majority config,
// The supplied changes are then applied to the incoming majority config,
// resulting in a joint configuration that in terms of the Raft thesis[1]
// (Section 4.3) corresponds to `C_{new,old}`.
//
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
func (c Changer) EnterJoint(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressMap, error) {
func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
cfg, prs, err := c.checkAndCopy()
if err != nil {
return c.err(err)
Expand Down Expand Up @@ -74,7 +74,7 @@ func (c Changer) EnterJoint(ccs ...pb.ConfChange) (tracker.Config, tracker.Progr
if err := c.apply(&cfg, prs, ccs...); err != nil {
return c.err(err)
}

cfg.AutoLeave = autoLeave
return checkAndReturn(cfg, prs)
}

Expand Down Expand Up @@ -120,6 +120,7 @@ func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
}
}
*outgoingPtr(&cfg.Voters) = nil
cfg.AutoLeave = false

return checkAndReturn(cfg, prs)
}
Expand All @@ -129,7 +130,7 @@ func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
// will return an error if that is not the case, if the resulting quorum is
// zero, or if the configuration is in a joint state (i.e. if there is an
// outgoing configuration).
func (c Changer) Simple(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressMap, error) {
func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
cfg, prs, err := c.checkAndCopy()
if err != nil {
return c.err(err)
Expand All @@ -142,7 +143,7 @@ func (c Changer) Simple(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressM
return c.err(err)
}
if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 {
return tracker.Config{}, nil, errors.New("more than voter changed without entering joint config")
return tracker.Config{}, nil, errors.New("more than one voter changed without entering joint config")
}
if err := checkInvariants(cfg, prs); err != nil {
return tracker.Config{}, tracker.ProgressMap{}, nil
Expand All @@ -151,14 +152,14 @@ func (c Changer) Simple(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressM
return checkAndReturn(cfg, prs)
}

// apply a ConfChange to the configuration. By convention, changes to voters are
// apply a change to the configuration. By convention, changes to voters are
// always made to the incoming majority config Voters[0]. Voters[1] is either
// empty or preserves the outgoing majority configuration while in a joint state.
func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.ConfChange) error {
func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.ConfChangeSingle) error {
for _, cc := range ccs {
if cc.NodeID == 0 {
// etcd replaces the NodeID with zero if it decides (downstream of
// raft) to not apply a ConfChange, so we have to have explicit code
// raft) to not apply a change, so we have to have explicit code
// here to ignore these.
continue
}
Expand Down Expand Up @@ -327,6 +328,9 @@ func checkInvariants(cfg tracker.Config, prs tracker.ProgressMap) error {
if cfg.LearnersNext != nil {
return fmt.Errorf("LearnersNext must be nil when not joint")
}
if cfg.AutoLeave {
return fmt.Errorf("AutoLeave must be false when not joint")
}
}

return nil
Expand Down Expand Up @@ -408,7 +412,7 @@ func outgoingPtr(voters *quorum.JointConfig) *quorum.MajorityConfig { return &vo

// Describe prints the type and NodeID of the configuration changes as a
// space-delimited string.
func Describe(ccs ...pb.ConfChange) string {
func Describe(ccs ...pb.ConfChangeSingle) string {
var buf strings.Builder
for _, cc := range ccs {
if buf.Len() > 0 {
Expand Down
10 changes: 7 additions & 3 deletions raft/confchange/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestConfChangeDataDriven(t *testing.T) {
defer func() {
c.LastIndex++
}()
var ccs []pb.ConfChange
var ccs []pb.ConfChangeSingle
toks := strings.Split(strings.TrimSpace(d.Input), " ")
if toks[0] == "" {
toks = nil
Expand All @@ -57,7 +57,7 @@ func TestConfChangeDataDriven(t *testing.T) {
if len(tok) < 2 {
return fmt.Sprintf("unknown token %s", tok)
}
var cc pb.ConfChange
var cc pb.ConfChangeSingle
switch tok[0] {
case 'v':
cc.Type = pb.ConfChangeAddNode
Expand Down Expand Up @@ -85,7 +85,11 @@ func TestConfChangeDataDriven(t *testing.T) {
case "simple":
cfg, prs, err = c.Simple(ccs...)
case "enter-joint":
cfg, prs, err = c.EnterJoint(ccs...)
var autoLeave bool
if len(d.CmdArgs) > 0 {
d.ScanArgs(t, "autoleave", &autoLeave)
}
cfg, prs, err = c.EnterJoint(autoLeave, ccs...)
case "leave-joint":
if len(ccs) > 0 {
err = errors.New("this command takes no input")
Expand Down
Loading

0 comments on commit d118342

Please sign in to comment.