diff --git a/clientv3/snapshot/v3_snapshot.go b/clientv3/snapshot/v3_snapshot.go index 70f8c3a8d4a..d68a5f65740 100644 --- a/clientv3/snapshot/v3_snapshot.go +++ b/clientv3/snapshot/v3_snapshot.go @@ -28,6 +28,7 @@ import ( "strings" "time" + bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/etcdserver/api/membership" @@ -43,8 +44,6 @@ import ( "go.etcd.io/etcd/raft/raftpb" "go.etcd.io/etcd/wal" "go.etcd.io/etcd/wal/walpb" - - bolt "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -482,7 +481,7 @@ func (s *v3Manager) saveWALAndSnap() error { Index: commit, Term: term, ConfState: raftpb.ConfState{ - Nodes: nodeIDs, + Voters: nodeIDs, }, }, } diff --git a/etcdserver/api/snap/snapshotter_test.go b/etcdserver/api/snap/snapshotter_test.go index 00e3e21dc3c..2f45bb48f66 100644 --- a/etcdserver/api/snap/snapshotter_test.go +++ b/etcdserver/api/snap/snapshotter_test.go @@ -24,7 +24,6 @@ import ( "testing" "go.etcd.io/etcd/raft/raftpb" - "go.uber.org/zap" ) @@ -32,7 +31,7 @@ var testSnap = &raftpb.Snapshot{ Data: []byte("some snapshot"), Metadata: raftpb.SnapshotMetadata{ ConfState: raftpb.ConfState{ - Nodes: []uint64{1, 2, 3}, + Voters: []uint64{1, 2, 3}, }, Index: 1, Term: 1, diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 5498630a595..8095288cefa 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -647,7 +647,7 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types func getIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { ids := make(map[uint64]bool) if snap != nil { - for _, id := range snap.Metadata.ConfState.Nodes { + for _, id := range snap.Metadata.ConfState.Voters { ids[id] = true } } diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index fa5830ef4bd..6c5164d48ff 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -46,17 +46,17 @@ func TestGetIDs(t *testing.T) { widSet []uint64 }{ {nil, []raftpb.Entry{}, []uint64{}}, - {&raftpb.ConfState{Nodes: []uint64{1}}, + {&raftpb.ConfState{Voters: []uint64{1}}, []raftpb.Entry{}, []uint64{1}}, - {&raftpb.ConfState{Nodes: []uint64{1}}, + {&raftpb.ConfState{Voters: []uint64{1}}, []raftpb.Entry{addEntry}, []uint64{1, 2}}, - {&raftpb.ConfState{Nodes: []uint64{1}}, + {&raftpb.ConfState{Voters: []uint64{1}}, []raftpb.Entry{addEntry, removeEntry}, []uint64{1}}, - {&raftpb.ConfState{Nodes: []uint64{1}}, + {&raftpb.ConfState{Voters: []uint64{1}}, []raftpb.Entry{addEntry, normalEntry}, []uint64{1, 2}}, - {&raftpb.ConfState{Nodes: []uint64{1}}, + {&raftpb.ConfState{Voters: []uint64{1}}, []raftpb.Entry{addEntry, normalEntry, updateEntry}, []uint64{1, 2}}, - {&raftpb.ConfState{Nodes: []uint64{1}}, + {&raftpb.ConfState{Voters: []uint64{1}}, []raftpb.Entry{addEntry, removeEntry, normalEntry}, []uint64{1}}, } @@ -178,8 +178,8 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { } } -// TestConfgChangeBlocksApply ensures apply blocks if committed entries contain config-change. -func TestConfgChangeBlocksApply(t *testing.T) { +// TestConfigChangeBlocksApply ensures apply blocks if committed entries contain config-change. +func TestConfigChangeBlocksApply(t *testing.T) { n := newNopReadyNode() r := newRaftNode(raftNodeConfig{ diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index e7cde53b4b8..13de2673808 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -28,8 +28,6 @@ import ( "testing" "time" - "go.uber.org/zap" - "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/etcdserver/api/rafthttp" "go.etcd.io/etcd/etcdserver/api/snap" @@ -49,6 +47,7 @@ import ( "go.etcd.io/etcd/pkg/wait" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" + "go.uber.org/zap" ) // TestDoLocalAction tests requests which do not need to go through raft to be applied, @@ -1017,7 +1016,7 @@ func TestSnapshot(t *testing.T) { } }() - srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}}) + srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}}) <-ch <-ch } @@ -1632,7 +1631,7 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error { n.Record(testutil.Action{Name: "Propose", Params: []interface{}{data}}) return nil } -func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { +func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error { n.Record(testutil.Action{Name: "ProposeConfChange"}) return nil } @@ -1645,7 +1644,7 @@ func (n *nodeRecorder) Ready() <-chan raft.Ready func (n *nodeRecorder) TransferLeadership(ctx context.Context, lead, transferee uint64) {} func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil } func (n *nodeRecorder) Advance() {} -func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState { +func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState { n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}}) return &raftpb.ConfState{} } @@ -1706,21 +1705,37 @@ func newNodeConfChangeCommitterStream() *nodeConfChangeCommitterRecorder { return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0} } -func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { - data, err := conf.Marshal() +func confChangeActionName(conf raftpb.ConfChangeI) string { + var s string + if confV1, ok := conf.AsV1(); ok { + s = confV1.Type.String() + } else { + for i, chg := range conf.AsV2().Changes { + if i > 0 { + s += "/" + } + s += chg.Type.String() + } + } + return s +} + +func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error { + typ, data, err := raftpb.MarshalConfChange(conf) if err != nil { return err } + n.index++ - n.Record(testutil.Action{Name: "ProposeConfChange:" + conf.Type.String()}) - n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: raftpb.EntryConfChange, Data: data}}} + n.Record(testutil.Action{Name: "ProposeConfChange:" + confChangeActionName(conf)}) + n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: typ, Data: data}}} return nil } func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready { return n.readyc } -func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState { - n.Record(testutil.Action{Name: "ApplyConfChange:" + conf.Type.String()}) +func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState { + n.Record(testutil.Action{Name: "ApplyConfChange:" + confChangeActionName(conf)}) return &raftpb.ConfState{} } diff --git a/raft/bootstrap.go b/raft/bootstrap.go index fdd0987561a..bd82b2041af 100644 --- a/raft/bootstrap.go +++ b/raft/bootstrap.go @@ -74,7 +74,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error { // the invariant that committed < unstable? rn.raft.raftLog.committed = uint64(len(ents)) for _, peer := range peers { - rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) + rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2()) } return nil } diff --git a/raft/confchange/confchange.go b/raft/confchange/confchange.go index fd75aedc801..bfb2033c941 100644 --- a/raft/confchange/confchange.go +++ b/raft/confchange/confchange.go @@ -41,12 +41,12 @@ type Changer struct { // to // (1 2 3)&&(1 2 3). // -// The supplied ConfChanges are then applied to the incoming majority config, +// The supplied changes are then applied to the incoming majority config, // resulting in a joint configuration that in terms of the Raft thesis[1] // (Section 4.3) corresponds to `C_{new,old}`. // // [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf -func (c Changer) EnterJoint(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressMap, error) { +func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) { cfg, prs, err := c.checkAndCopy() if err != nil { return c.err(err) @@ -74,7 +74,7 @@ func (c Changer) EnterJoint(ccs ...pb.ConfChange) (tracker.Config, tracker.Progr if err := c.apply(&cfg, prs, ccs...); err != nil { return c.err(err) } - + cfg.AutoLeave = autoLeave return checkAndReturn(cfg, prs) } @@ -120,6 +120,7 @@ func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) { } } *outgoingPtr(&cfg.Voters) = nil + cfg.AutoLeave = false return checkAndReturn(cfg, prs) } @@ -129,7 +130,7 @@ func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) { // will return an error if that is not the case, if the resulting quorum is // zero, or if the configuration is in a joint state (i.e. if there is an // outgoing configuration). -func (c Changer) Simple(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressMap, error) { +func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) { cfg, prs, err := c.checkAndCopy() if err != nil { return c.err(err) @@ -142,7 +143,7 @@ func (c Changer) Simple(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressM return c.err(err) } if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 { - return tracker.Config{}, nil, errors.New("more than voter changed without entering joint config") + return tracker.Config{}, nil, errors.New("more than one voter changed without entering joint config") } if err := checkInvariants(cfg, prs); err != nil { return tracker.Config{}, tracker.ProgressMap{}, nil @@ -151,14 +152,14 @@ func (c Changer) Simple(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressM return checkAndReturn(cfg, prs) } -// apply a ConfChange to the configuration. By convention, changes to voters are +// apply a change to the configuration. By convention, changes to voters are // always made to the incoming majority config Voters[0]. Voters[1] is either // empty or preserves the outgoing majority configuration while in a joint state. -func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.ConfChange) error { +func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.ConfChangeSingle) error { for _, cc := range ccs { if cc.NodeID == 0 { // etcd replaces the NodeID with zero if it decides (downstream of - // raft) to not apply a ConfChange, so we have to have explicit code + // raft) to not apply a change, so we have to have explicit code // here to ignore these. continue } @@ -327,6 +328,9 @@ func checkInvariants(cfg tracker.Config, prs tracker.ProgressMap) error { if cfg.LearnersNext != nil { return fmt.Errorf("LearnersNext must be nil when not joint") } + if cfg.AutoLeave { + return fmt.Errorf("AutoLeave must be false when not joint") + } } return nil @@ -408,7 +412,7 @@ func outgoingPtr(voters *quorum.JointConfig) *quorum.MajorityConfig { return &vo // Describe prints the type and NodeID of the configuration changes as a // space-delimited string. -func Describe(ccs ...pb.ConfChange) string { +func Describe(ccs ...pb.ConfChangeSingle) string { var buf strings.Builder for _, cc := range ccs { if buf.Len() > 0 { diff --git a/raft/confchange/datadriven_test.go b/raft/confchange/datadriven_test.go index 7d5428f1790..063d927dac8 100644 --- a/raft/confchange/datadriven_test.go +++ b/raft/confchange/datadriven_test.go @@ -48,7 +48,7 @@ func TestConfChangeDataDriven(t *testing.T) { defer func() { c.LastIndex++ }() - var ccs []pb.ConfChange + var ccs []pb.ConfChangeSingle toks := strings.Split(strings.TrimSpace(d.Input), " ") if toks[0] == "" { toks = nil @@ -57,7 +57,7 @@ func TestConfChangeDataDriven(t *testing.T) { if len(tok) < 2 { return fmt.Sprintf("unknown token %s", tok) } - var cc pb.ConfChange + var cc pb.ConfChangeSingle switch tok[0] { case 'v': cc.Type = pb.ConfChangeAddNode @@ -85,7 +85,11 @@ func TestConfChangeDataDriven(t *testing.T) { case "simple": cfg, prs, err = c.Simple(ccs...) case "enter-joint": - cfg, prs, err = c.EnterJoint(ccs...) + var autoLeave bool + if len(d.CmdArgs) > 0 { + d.ScanArgs(t, "autoleave", &autoLeave) + } + cfg, prs, err = c.EnterJoint(autoLeave, ccs...) case "leave-joint": if len(ccs) > 0 { err = errors.New("this command takes no input") diff --git a/raft/confchange/quick_test.go b/raft/confchange/quick_test.go index 7a4c112ca70..04a77b3ceb9 100644 --- a/raft/confchange/quick_test.go +++ b/raft/confchange/quick_test.go @@ -15,6 +15,7 @@ package confchange import ( + "fmt" "math/rand" "reflect" "testing" @@ -35,23 +36,45 @@ func TestConfChangeQuick(t *testing.T) { // as intended. const infoCount = 5 - runWithJoint := func(c *Changer, ccs []pb.ConfChange) error { - cfg, prs, err := c.EnterJoint(ccs...) + runWithJoint := func(c *Changer, ccs []pb.ConfChangeSingle) error { + cfg, prs, err := c.EnterJoint(false /* autoLeave */, ccs...) if err != nil { return err } + // Also do this with autoLeave on, just to check that we'd get the same + // result. + cfg2a, prs2a, err := c.EnterJoint(true /* autoLeave */, ccs...) + if err != nil { + return err + } + cfg2a.AutoLeave = false + if !reflect.DeepEqual(cfg, cfg2a) || !reflect.DeepEqual(prs, prs2a) { + return fmt.Errorf("cfg: %+v\ncfg2a: %+v\nprs: %+v\nprs2a: %+v", + cfg, cfg2a, prs, prs2a) + } + c.Tracker.Config = cfg + c.Tracker.Progress = prs + cfg2b, prs2b, err := c.LeaveJoint() + if err != nil { + return err + } + // Reset back to the main branch with autoLeave=false. c.Tracker.Config = cfg c.Tracker.Progress = prs cfg, prs, err = c.LeaveJoint() if err != nil { return err } + if !reflect.DeepEqual(cfg, cfg2b) || !reflect.DeepEqual(prs, prs2b) { + return fmt.Errorf("cfg: %+v\ncfg2b: %+v\nprs: %+v\nprs2b: %+v", + cfg, cfg2b, prs, prs2b) + } c.Tracker.Config = cfg c.Tracker.Progress = prs return nil } - runWithSimple := func(c *Changer, ccs []pb.ConfChange) error { + runWithSimple := func(c *Changer, ccs []pb.ConfChangeSingle) error { for _, cc := range ccs { cfg, prs, err := c.Simple(cc) if err != nil { @@ -62,7 +85,7 @@ func TestConfChangeQuick(t *testing.T) { return nil } - type testFunc func(*Changer, []pb.ConfChange) error + type testFunc func(*Changer, []pb.ConfChangeSingle) error wrapper := func(invoke testFunc) func(setup initialChanges, ccs confChanges) (*Changer, error) { return func(setup initialChanges, ccs confChanges) (*Changer, error) { @@ -112,8 +135,8 @@ func TestConfChangeQuick(t *testing.T) { t.Fatal(err) } - t.Error("setup:", Describe(cErr.In[0].([]pb.ConfChange)...)) - t.Error("ccs:", Describe(cErr.In[1].([]pb.ConfChange)...)) + t.Error("setup:", Describe(cErr.In[0].([]pb.ConfChangeSingle)...)) + t.Error("ccs:", Describe(cErr.In[1].([]pb.ConfChangeSingle)...)) t.Errorf("out1: %+v\nout2: %+v", cErr.Out1, cErr.Out2) } @@ -123,13 +146,13 @@ func (confChangeTyp) Generate(rand *rand.Rand, _ int) reflect.Value { return reflect.ValueOf(confChangeTyp(rand.Intn(4))) } -type confChanges []pb.ConfChange +type confChanges []pb.ConfChangeSingle -func genCC(num func() int, id func() uint64, typ func() pb.ConfChangeType) []pb.ConfChange { - var ccs []pb.ConfChange +func genCC(num func() int, id func() uint64, typ func() pb.ConfChangeType) []pb.ConfChangeSingle { + var ccs []pb.ConfChangeSingle n := num() for i := 0; i < n; i++ { - ccs = append(ccs, pb.ConfChange{Type: typ(), NodeID: id()}) + ccs = append(ccs, pb.ConfChangeSingle{Type: typ(), NodeID: id()}) } return ccs } @@ -150,7 +173,7 @@ func (confChanges) Generate(rand *rand.Rand, _ int) reflect.Value { return reflect.ValueOf(genCC(num, id, typ)) } -type initialChanges []pb.ConfChange +type initialChanges []pb.ConfChangeSingle func (initialChanges) Generate(rand *rand.Rand, _ int) reflect.Value { num := func() int { @@ -163,6 +186,6 @@ func (initialChanges) Generate(rand *rand.Rand, _ int) reflect.Value { // NodeID one is special - it's in the initial config and will be a voter // always (this is to avoid uninteresting edge cases where the simple conf // changes can't easily make progress). - ccs := append([]pb.ConfChange{{Type: pb.ConfChangeAddNode, NodeID: 1}}, genCC(num, id, typ)...) + ccs := append([]pb.ConfChangeSingle{{Type: pb.ConfChangeAddNode, NodeID: 1}}, genCC(num, id, typ)...) return reflect.ValueOf(ccs) } diff --git a/raft/confchange/testdata/joint_autoleave.txt b/raft/confchange/testdata/joint_autoleave.txt new file mode 100644 index 00000000000..be855f78185 --- /dev/null +++ b/raft/confchange/testdata/joint_autoleave.txt @@ -0,0 +1,29 @@ +# Test the autoleave argument to EnterJoint. It defaults to false in the +# datadriven tests. The flag has no associated semantics in this package, +# it is simply passed through. +simple +v1 +---- +voters=(1) +1: StateProbe match=0 next=1 + +# Autoleave is reflected in the config. +enter-joint autoleave=true +v2 v3 +---- +voters=(1 2 3)&&(1) autoleave +1: StateProbe match=0 next=1 +2: StateProbe match=0 next=2 +3: StateProbe match=0 next=2 + +# Can't enter-joint twice, even if autoleave changes. +enter-joint autoleave=false +---- +config is already joint + +leave-joint +---- +voters=(1 2 3) +1: StateProbe match=0 next=1 +2: StateProbe match=0 next=2 +3: StateProbe match=0 next=2 diff --git a/raft/confchange/testdata/simple_safety.txt b/raft/confchange/testdata/simple_safety.txt index 4bf420fc14b..b360737fb15 100644 --- a/raft/confchange/testdata/simple_safety.txt +++ b/raft/confchange/testdata/simple_safety.txt @@ -20,7 +20,7 @@ voters=(1 2) learners=(3) simple r1 v5 ---- -more than voter changed without entering joint config +more than one voter changed without entering joint config simple r1 r2 @@ -30,12 +30,12 @@ removed all voters simple v3 v4 ---- -more than voter changed without entering joint config +more than one voter changed without entering joint config simple l1 v5 ---- -more than voter changed without entering joint config +more than one voter changed without entering joint config simple l1 l2 diff --git a/raft/node.go b/raft/node.go index 6b730c0d4c1..daf068055e6 100644 --- a/raft/node.go +++ b/raft/node.go @@ -132,10 +132,20 @@ type Node interface { // Propose proposes that data be appended to the log. Note that proposals can be lost without // notice, therefore it is user's job to ensure proposal retries. Propose(ctx context.Context, data []byte) error - // ProposeConfChange proposes config change. - // At most one ConfChange can be in the process of going through consensus. - // Application needs to call ApplyConfChange when applying EntryConfChange type entry. - ProposeConfChange(ctx context.Context, cc pb.ConfChange) error + // ProposeConfChange proposes a configuration change. Like any proposal, the + // configuration change may be dropped with or without an error being + // returned. In particular, configuration changes are dropped unless the + // leader has certainty that there is no prior unapplied configuration + // change in its log. + // + // The method accepts either a pb.ConfChange (deprecated) or pb.ConfChangeV2 + // message. The latter allows arbitrary configuration changes via joint + // consensus, notably including replacing a voter. Passing a ConfChangeV2 + // message is only allowed if all Nodes participating in the cluster run a + // version of this library aware of the V2 API. See pb.ConfChangeV2 for + // usage details and semantics. + ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error + // Step advances the state machine using the given message. ctx.Err() will be returned, if any. Step(ctx context.Context, msg pb.Message) error @@ -156,11 +166,13 @@ type Node interface { // a long time to apply the snapshot data. To continue receiving Ready without blocking raft // progress, it can call Advance before finishing applying the last ready. Advance() - // ApplyConfChange applies config change to the local node. - // Returns an opaque ConfState protobuf which must be recorded - // in snapshots. Will never return nil; it returns a pointer only - // to match MemoryStorage.Compact. - ApplyConfChange(cc pb.ConfChange) *pb.ConfState + // ApplyConfChange applies a config change (previously passed to + // ProposeConfChange) to the node. This must be called whenever a config + // change is observed in Ready.CommittedEntries. + // + // Returns an opaque non-nil ConfState protobuf which must be recorded in + // snapshots. + ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState // TransferLeadership attempts to transfer leadership to the given transferee. TransferLeadership(ctx context.Context, lead, transferee uint64) @@ -240,7 +252,7 @@ type msgWithResult struct { type node struct { propc chan msgWithResult recvc chan pb.Message - confc chan pb.ConfChange + confc chan pb.ConfChangeV2 confstatec chan pb.ConfState readyc chan Ready advancec chan struct{} @@ -256,7 +268,7 @@ func newNode() node { return node{ propc: make(chan msgWithResult), recvc: make(chan pb.Message), - confc: make(chan pb.ConfChange), + confc: make(chan pb.ConfChangeV2), confstatec: make(chan pb.ConfState), readyc: make(chan Ready), advancec: make(chan struct{}), @@ -341,11 +353,27 @@ func (n *node) run(rn *RawNode) { r.Step(m) } case cc := <-n.confc: + _, okBefore := r.prs.Progress[r.id] cs := r.applyConfChange(cc) - if _, ok := r.prs.Progress[r.id]; !ok { - // block incoming proposal when local node is - // removed - if cc.NodeID == r.id { + // If the node was removed, block incoming proposals. Note that we + // only do this if the node was in the config before. Nodes may be + // a member of the group without knowing this (when they're catching + // up on the log and don't have the latest config) and we don't want + // to block the proposal channel in that case. + // + // NB: propc is reset when the leader changes, which, if we learn + // about it, sort of implies that we got readded, maybe? This isn't + // very sound and likely has bugs. + if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter { + var found bool + for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} { + for _, id := range sl { + if id == r.id { + found = true + } + } + } + if !found { propc = nil } } @@ -397,12 +425,20 @@ func (n *node) Step(ctx context.Context, m pb.Message) error { return n.step(ctx, m) } -func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { - data, err := cc.Marshal() +func confChangeToMsg(c pb.ConfChangeI) (pb.Message, error) { + typ, data, err := pb.MarshalConfChange(c) + if err != nil { + return pb.Message{}, err + } + return pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: typ, Data: data}}}, nil +} + +func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error { + msg, err := confChangeToMsg(cc) if err != nil { return err } - return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}}) + return n.Step(ctx, msg) } func (n *node) step(ctx context.Context, m pb.Message) error { @@ -463,10 +499,10 @@ func (n *node) Advance() { } } -func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { +func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState { var cs pb.ConfState select { - case n.confc <- cc: + case n.confc <- cc.AsV2(): case <-n.done: } select { diff --git a/raft/node_test.go b/raft/node_test.go index af072852282..6127bf39774 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -691,7 +691,7 @@ func TestNodeRestart(t *testing.T) { func TestNodeRestartFromSnapshot(t *testing.T) { snap := raftpb.Snapshot{ Metadata: raftpb.SnapshotMetadata{ - ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}}, + ConfState: raftpb.ConfState{Voters: []uint64{1, 2}}, Index: 2, Term: 1, }, @@ -845,7 +845,7 @@ func TestNodeProposeAddLearnerNode(t *testing.T) { t.Errorf("apply conf change should return new added learner: %v", state.String()) } - if len(state.Nodes) != 1 { + if len(state.Voters) != 1 { t.Errorf("add learner should not change the nodes: %v", state.String()) } t.Logf("apply raft conf %v changed to: %v", cc, state.String()) diff --git a/raft/quorum/majority.go b/raft/quorum/majority.go index 5eba5034448..9b10e95838e 100644 --- a/raft/quorum/majority.go +++ b/raft/quorum/majority.go @@ -102,6 +102,16 @@ func (c MajorityConfig) Describe(l AckedIndexer) string { return buf.String() } +// Slice returns the MajorityConfig as a sorted slice. +func (c MajorityConfig) Slice() []uint64 { + var sl []uint64 + for id := range c { + sl = append(sl, id) + } + sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] }) + return sl +} + type uint64Slice []uint64 func insertionSort(sl uint64Slice) { diff --git a/raft/raft.go b/raft/raft.go index 0b2c42c5937..bc6b4a12656 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -329,14 +329,14 @@ func newRaft(c *Config) *raft { } peers := c.peers learners := c.learners - if len(cs.Nodes) > 0 || len(cs.Learners) > 0 { + if len(cs.Voters) > 0 || len(cs.Learners) > 0 { if len(peers) > 0 || len(learners) > 0 { // TODO(bdarnell): the peers argument is always nil except in // tests; the argument should be removed and these tests should be // updated to specify their nodes through a snapshot. - panic("cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)") + panic("cannot specify both newRaft(peers, learners) and ConfState.(Voters, Learners)") } - peers = cs.Nodes + peers = cs.Voters learners = cs.Learners } r := &raft{ @@ -357,11 +357,11 @@ func newRaft(c *Config) *raft { } for _, p := range peers { // Add node to active config. - r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: p}) + r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: p}.AsV2()) } for _, p := range learners { // Add learner to active config. - r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddLearnerNode, NodeID: p}) + r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddLearnerNode, NodeID: p}.AsV2()) } if !isHardStateEqual(hs, emptyState) { @@ -551,6 +551,46 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { }) } +func (r *raft) advance(rd Ready) { + // If entries were applied (or a snapshot), update our cursor for + // the next Ready. Note that if the current HardState contains a + // new Commit index, this does not mean that we're also applying + // all of the new entries due to commit pagination by size. + if index := rd.appliedCursor(); index > 0 { + r.raftLog.appliedTo(index) + if r.prs.Config.AutoLeave && index >= r.pendingConfIndex && r.state == StateLeader { + // If the current (and most recent, at least for this leader's term) + // configuration should be auto-left, initiate that now. + ccdata, err := (&pb.ConfChangeV2{}).Marshal() + if err != nil { + panic(err) + } + ent := pb.Entry{ + Type: pb.EntryConfChangeV2, + Data: ccdata, + } + if !r.appendEntry(ent) { + // If we could not append the entry, bump the pending conf index + // so that we'll try again later. + // + // TODO(tbg): test this case. + r.pendingConfIndex = r.raftLog.lastIndex() + } else { + r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config) + } + } + } + r.reduceUncommittedSize(rd.CommittedEntries) + + if len(rd.Entries) > 0 { + e := rd.Entries[len(rd.Entries)-1] + r.raftLog.stableTo(e.Index, e.Term) + } + if !IsEmptySnap(rd.Snapshot) { + r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) + } +} + // maybeCommit attempts to advance the commit index. Returns true if // the commit index changed (in which case the caller should call // r.bcastAppend). @@ -973,10 +1013,10 @@ func stepLeader(r *raft, m pb.Message) error { for i := range m.Entries { e := &m.Entries[i] - if e.Type == pb.EntryConfChange { + if e.Type == pb.EntryConfChange || e.Type == pb.EntryConfChangeV2 { if r.pendingConfIndex > r.raftLog.applied { - r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]", - e, r.pendingConfIndex, r.raftLog.applied) + r.logger.Infof("%x propose conf %s ignored since pending unapplied configuration [index %d, applied %d]", + r.id, e, r.pendingConfIndex, r.raftLog.applied) m.Entries[i] = pb.Entry{Type: pb.EntryNormal} } else { r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1 @@ -1344,7 +1384,7 @@ func (r *raft) restore(s pb.Snapshot) bool { found := false cs := s.Metadata.ConfState for _, set := range [][]uint64{ - cs.Nodes, + cs.Voters, cs.Learners, } { for _, id := range set { @@ -1375,11 +1415,11 @@ func (r *raft) restore(s pb.Snapshot) bool { // Reset the configuration and add the (potentially updated) peers in anew. r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) - for _, id := range s.Metadata.ConfState.Nodes { - r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode}) + for _, id := range s.Metadata.ConfState.Voters { + r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode}.AsV2()) } for _, id := range s.Metadata.ConfState.Learners { - r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode}) + r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode}.AsV2()) } pr := r.prs.Progress[r.id] @@ -1397,21 +1437,38 @@ func (r *raft) promotable() bool { return pr != nil && !pr.IsLearner } -func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState { - cfg, prs, err := confchange.Changer{ - Tracker: r.prs, - LastIndex: r.raftLog.lastIndex(), - }.Simple(cc) +func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState { + cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) { + changer := confchange.Changer{ + Tracker: r.prs, + LastIndex: r.raftLog.lastIndex(), + } + if cc.LeaveJoint() { + return changer.LeaveJoint() + } else if autoLeave, ok := cc.EnterJoint(); ok { + return changer.EnterJoint(autoLeave, cc.Changes...) + } + return changer.Simple(cc.Changes...) + }() + if err != nil { + // TODO(tbg): return the error to the caller. panic(err) } + r.prs.Config = cfg r.prs.Progress = prs r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config) // Now that the configuration is updated, handle any side effects. - cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()} + cs := pb.ConfState{ + Voters: r.prs.Voters[0].Slice(), + VotersOutgoing: r.prs.Voters[1].Slice(), + Learners: quorum.MajorityConfig(r.prs.Learners).Slice(), + LearnersNext: quorum.MajorityConfig(r.prs.LearnersNext).Slice(), + AutoLeave: r.prs.AutoLeave, + } pr, ok := r.prs.Progress[r.id] // Update whether the node itself is a learner, resetting to false when the @@ -1433,7 +1490,7 @@ func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState { // The remaining steps only make sense if this node is the leader and there // are other nodes. - if r.state != StateLeader || len(cs.Nodes) == 0 { + if r.state != StateLeader || len(cs.Voters) == 0 { return cs } if r.maybeCommit() { diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index 187aef697fa..d49c8837e9b 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -26,7 +26,7 @@ var ( Metadata: pb.SnapshotMetadata{ Index: 11, // magic number Term: 11, // magic number - ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, + ConfState: pb.ConfState{Voters: []uint64{1, 2}}, }, } ) @@ -142,10 +142,10 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { // Add a follower to the group. Do this in a clandestine way for simplicity. // Also set up a snapshot that will be sent to the follower. - n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) s1.snapshot = pb.Snapshot{ Metadata: pb.SnapshotMetadata{ - ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, + ConfState: pb.ConfState{Voters: []uint64{1, 2}}, Index: s1.lastIndex(), Term: s1.ents[len(s1.ents)-1].Term, }, diff --git a/raft/raft_test.go b/raft/raft_test.go index 1e2d0e2af52..6de385e672b 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -356,8 +356,8 @@ func TestLearnerPromotion(t *testing.T) { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) - n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) + n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) if n2.isLearner { t.Error("peer 2 is learner, want not") } @@ -1143,7 +1143,7 @@ func TestCommit(t *testing.T) { for j := 0; j < len(tt.matches); j++ { id := uint64(j) + 1 if id > 1 { - sm.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: id}) + sm.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: id}.AsV2()) } pr := sm.prs.Progress[id] pr.Match, pr.Next = tt.matches[j], tt.matches[j]+1 @@ -1931,7 +1931,7 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) { nt := newNetwork(a, b) setRandomizedElectionTimeout(b, b.electionTimeout+1) // Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states - b.applyConfChange(pb.ConfChange{Type: pb.ConfChangeRemoveNode, NodeID: 2}) + b.applyConfChange(pb.ConfChange{Type: pb.ConfChangeRemoveNode, NodeID: 2}.AsV2()) if b.promotable() { t.Fatalf("promotable = %v, want false", b.promotable()) @@ -2458,7 +2458,7 @@ func TestBcastBeat(t *testing.T) { Metadata: pb.SnapshotMetadata{ Index: offset, Term: 1, - ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}}, + ConfState: pb.ConfState{Voters: []uint64{1, 2, 3}}, }, } storage := NewMemoryStorage() @@ -2709,7 +2709,7 @@ func TestRestore(t *testing.T) { Metadata: pb.SnapshotMetadata{ Index: 11, // magic number Term: 11, // magic number - ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}}, + ConfState: pb.ConfState{Voters: []uint64{1, 2, 3}}, }, } @@ -2726,8 +2726,8 @@ func TestRestore(t *testing.T) { t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term) } sg := sm.prs.VoterNodes() - if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) { - t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Metadata.ConfState.Nodes) + if !reflect.DeepEqual(sg, s.Metadata.ConfState.Voters) { + t.Errorf("sm.Voters = %+v, want %+v", sg, s.Metadata.ConfState.Voters) } if ok := sm.restore(s); ok { @@ -2741,7 +2741,7 @@ func TestRestoreWithLearner(t *testing.T) { Metadata: pb.SnapshotMetadata{ Index: 11, // magic number Term: 11, // magic number - ConfState: pb.ConfState{Nodes: []uint64{1, 2}, Learners: []uint64{3}}, + ConfState: pb.ConfState{Voters: []uint64{1, 2}, Learners: []uint64{3}}, }, } @@ -2758,14 +2758,14 @@ func TestRestoreWithLearner(t *testing.T) { t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term) } sg := sm.prs.VoterNodes() - if len(sg) != len(s.Metadata.ConfState.Nodes) { - t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Nodes) + if len(sg) != len(s.Metadata.ConfState.Voters) { + t.Errorf("sm.Voters = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Voters) } lns := sm.prs.LearnerNodes() if len(lns) != len(s.Metadata.ConfState.Learners) { t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners) } - for _, n := range s.Metadata.ConfState.Nodes { + for _, n := range s.Metadata.ConfState.Voters { if sm.prs.Progress[n].IsLearner { t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.Progress[n], false) } @@ -2794,7 +2794,7 @@ func TestRestoreVoterToLearner(t *testing.T) { Metadata: pb.SnapshotMetadata{ Index: 11, // magic number Term: 11, // magic number - ConfState: pb.ConfState{Nodes: []uint64{1, 2}, Learners: []uint64{3}}, + ConfState: pb.ConfState{Voters: []uint64{1, 2}, Learners: []uint64{3}}, }, } @@ -2816,7 +2816,7 @@ func TestRestoreLearnerPromotion(t *testing.T) { Metadata: pb.SnapshotMetadata{ Index: 11, // magic number Term: 11, // magic number - ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}}, + ConfState: pb.ConfState{Voters: []uint64{1, 2, 3}}, }, } @@ -2843,7 +2843,7 @@ func TestLearnerReceiveSnapshot(t *testing.T) { Metadata: pb.SnapshotMetadata{ Index: 11, // magic number Term: 11, // magic number - ConfState: pb.ConfState{Nodes: []uint64{1}, Learners: []uint64{2}}, + ConfState: pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}}, }, } @@ -2881,7 +2881,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) { Metadata: pb.SnapshotMetadata{ Index: commit, Term: 1, - ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, + ConfState: pb.ConfState{Voters: []uint64{1, 2}}, }, } @@ -2909,7 +2909,7 @@ func TestProvideSnap(t *testing.T) { Metadata: pb.SnapshotMetadata{ Index: 11, // magic number Term: 11, // magic number - ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, + ConfState: pb.ConfState{Voters: []uint64{1, 2}}, }, } storage := NewMemoryStorage() @@ -2939,7 +2939,7 @@ func TestIgnoreProvidingSnap(t *testing.T) { Metadata: pb.SnapshotMetadata{ Index: 11, // magic number Term: 11, // magic number - ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, + ConfState: pb.ConfState{Voters: []uint64{1, 2}}, }, } storage := NewMemoryStorage() @@ -2967,7 +2967,7 @@ func TestRestoreFromSnapMsg(t *testing.T) { Metadata: pb.SnapshotMetadata{ Index: 11, // magic number Term: 11, // magic number - ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, + ConfState: pb.ConfState{Voters: []uint64{1, 2}}, }, } m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s} @@ -2992,7 +2992,7 @@ func TestSlowNodeRestore(t *testing.T) { } lead := nt.peers[1].(*raft) nextEnts(lead, nt.storage[1]) - nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.VoterNodes()}, nil) + nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Voters: lead.prs.VoterNodes()}, nil) nt.storage[1].Compact(lead.raftLog.applied) nt.recover() @@ -3086,7 +3086,7 @@ func TestNewLeaderPendingConfig(t *testing.T) { // TestAddNode tests that addNode could update nodes correctly. func TestAddNode(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) nodes := r.prs.VoterNodes() wnodes := []uint64{1, 2} if !reflect.DeepEqual(nodes, wnodes) { @@ -3098,7 +3098,7 @@ func TestAddNode(t *testing.T) { func TestAddLearner(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) // Add new learner peer. - r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2()) if r.isLearner { t.Fatal("expected 1 to be voter") } @@ -3112,13 +3112,13 @@ func TestAddLearner(t *testing.T) { } // Promote peer to voter. - r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) if r.prs.Progress[2].IsLearner { t.Fatal("expected 2 to be voter") } // Demote r. - r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddLearnerNode}) + r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddLearnerNode}.AsV2()) if !r.prs.Progress[1].IsLearner { t.Fatal("expected 1 to be learner") } @@ -3127,7 +3127,7 @@ func TestAddLearner(t *testing.T) { } // Promote r again. - r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddNode}) + r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddNode}.AsV2()) if r.prs.Progress[1].IsLearner { t.Fatal("expected 1 to be voter") } @@ -3149,7 +3149,7 @@ func TestAddNodeCheckQuorum(t *testing.T) { r.tick() } - r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) // This tick will reach electionTimeout, which triggers a quorum check. r.tick() @@ -3174,7 +3174,7 @@ func TestAddNodeCheckQuorum(t *testing.T) { // and removed list correctly. func TestRemoveNode(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) - r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}.AsV2()) w := []uint64{1} if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) @@ -3186,20 +3186,20 @@ func TestRemoveNode(t *testing.T) { t.Error("did not panic") } }() - r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}) + r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}.AsV2()) } // TestRemoveLearner tests that removeNode could update nodes and // and removed list correctly. func TestRemoveLearner(t *testing.T) { r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) - r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}.AsV2()) w := []uint64{1} if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } - w = []uint64{} + w = nil if g := r.prs.LearnerNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } @@ -3210,7 +3210,7 @@ func TestRemoveLearner(t *testing.T) { t.Error("did not panic") } }() - r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}) + r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}.AsV2()) } func TestPromotable(t *testing.T) { @@ -3342,7 +3342,7 @@ func TestCommitAfterRemoveNode(t *testing.T) { // Apply the config change. This reduces quorum requirements so the // pending command can now commit. - r.applyConfChange(cc) + r.applyConfChange(cc.AsV2()) ents = nextEnts(r, s) if len(ents) != 1 || ents[0].Type != pb.EntryNormal || string(ents[0].Data) != "hello" { @@ -3469,7 +3469,7 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) lead := nt.peers[1].(*raft) nextEnts(lead, nt.storage[1]) - nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.VoterNodes()}, nil) + nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Voters: lead.prs.VoterNodes()}, nil) nt.storage[1].Compact(lead.raftLog.applied) nt.recover() @@ -3591,7 +3591,7 @@ func TestLeaderTransferRemoveNode(t *testing.T) { t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) } - lead.applyConfChange(pb.ConfChange{NodeID: 3, Type: pb.ConfChangeRemoveNode}) + lead.applyConfChange(pb.ConfChange{NodeID: 3, Type: pb.ConfChangeRemoveNode}.AsV2()) checkLeaderTransferState(t, lead, StateLeader, 1) } @@ -3917,9 +3917,9 @@ func TestPreVoteWithCheckQuorum(t *testing.T) { // a MsgHup or MsgTimeoutNow. func TestLearnerCampaign(t *testing.T) { n1 := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2()) n2 := newTestRaft(2, []uint64{1}, 10, 1, NewMemoryStorage()) - n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) + n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2()) nt := newNetwork(n1, n2) nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) diff --git a/raft/raftpb/confchange.go b/raft/raftpb/confchange.go new file mode 100644 index 00000000000..a91c18dc12c --- /dev/null +++ b/raft/raftpb/confchange.go @@ -0,0 +1,105 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raftpb + +import ( + "fmt" + + "github.com/gogo/protobuf/proto" +) + +// ConfChangeI abstracts over ConfChangeV2 and (legacy) ConfChange to allow +// treating them in a unified manner. +type ConfChangeI interface { + AsV2() ConfChangeV2 + AsV1() (ConfChange, bool) +} + +// MarshalConfChange calls Marshal on the underlying ConfChange or ConfChangeV2 +// and returns the result along with the corresponding EntryType. +func MarshalConfChange(c ConfChangeI) (EntryType, []byte, error) { + var typ EntryType + var ccdata []byte + var err error + if ccv1, ok := c.AsV1(); ok { + typ = EntryConfChange + ccdata, err = ccv1.Marshal() + } else { + ccv2 := c.AsV2() + typ = EntryConfChangeV2 + ccdata, err = ccv2.Marshal() + } + return typ, ccdata, err +} + +// AsV2 returns a V2 configuration change carrying out the same operation. +func (c ConfChange) AsV2() ConfChangeV2 { + return ConfChangeV2{ + Changes: []ConfChangeSingle{{ + Type: c.Type, + NodeID: c.NodeID, + }}, + Context: c.Context, + } +} + +// AsV1 returns the ConfChange and true. +func (c ConfChange) AsV1() (ConfChange, bool) { + return c, true +} + +// AsV2 is the identity. +func (c ConfChangeV2) AsV2() ConfChangeV2 { return c } + +// AsV1 returns ConfChange{} and false. +func (c ConfChangeV2) AsV1() (ConfChange, bool) { return ConfChange{}, false } + +// EnterJoint returns two bools. The second bool is true if and only if this +// config change will use Joint Consensus, which is the case if it contains more +// than one change or if the use of Joint Consensus was requested explicitly. +// The first bool can only be true if second one is, and indicates whether the +// Joint State will be left automatically. +func (c *ConfChangeV2) EnterJoint() (autoLeave bool, ok bool) { + // NB: in theory, more config changes could qualify for the "simple" + // protocol but it depends on the config on top of which the changes apply. + // For example, adding two learners is not OK if both nodes are part of the + // base config (i.e. two voters are turned into learners in the process of + // applying the conf change). In practice, these distinctions should not + // matter, so we keep it simple and use Joint Consensus liberally. + if c.Transition != ConfChangeTransitionAuto || len(c.Changes) > 1 { + // Use Joint Consensus. + var autoLeave bool + switch c.Transition { + case ConfChangeTransitionAuto: + autoLeave = true + case ConfChangeTransitionJointImplicit: + autoLeave = true + case ConfChangeTransitionJointExplicit: + default: + panic(fmt.Sprintf("unknown transition: %+v", c)) + } + return autoLeave, true + } + return false, false +} + +// LeaveJoint is true if the configuration change leaves a joint configuration. +// This is the case if the ConfChangeV2 is zero, with the possible exception of +// the Context field. +func (c *ConfChangeV2) LeaveJoint() bool { + cpy := *c + cpy.Context = nil + return proto.Equal(&cpy, &ConfChangeV2{}) +} diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index fd9ee3729ec..fcf259c89be 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -15,6 +15,8 @@ HardState ConfState ConfChange + ConfChangeSingle + ConfChangeV2 */ package raftpb @@ -44,17 +46,20 @@ const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type EntryType int32 const ( - EntryNormal EntryType = 0 - EntryConfChange EntryType = 1 + EntryNormal EntryType = 0 + EntryConfChange EntryType = 1 + EntryConfChangeV2 EntryType = 2 ) var EntryType_name = map[int32]string{ 0: "EntryNormal", 1: "EntryConfChange", + 2: "EntryConfChangeV2", } var EntryType_value = map[string]int32{ - "EntryNormal": 0, - "EntryConfChange": 1, + "EntryNormal": 0, + "EntryConfChange": 1, + "EntryConfChangeV2": 2, } func (x EntryType) Enum() *EntryType { @@ -160,6 +165,57 @@ func (x *MessageType) UnmarshalJSON(data []byte) error { } func (MessageType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{1} } +// ConfChangeTransition specifies the behavior of a configuration change with +// respect to joint consensus. +type ConfChangeTransition int32 + +const ( + // Automatically use the simple protocol if possible, otherwise fall back + // to ConfChangeJointImplicit. Most applications will want to use this. + ConfChangeTransitionAuto ConfChangeTransition = 0 + // Use joint consensus unconditionally, and transition out of them + // automatically (by proposing a zero configuration change). + // + // This option is suitable for applications that want to minimize the time + // spent in the joint configuration and do not store the joint configuration + // in the state machine (outside of InitialState). + ConfChangeTransitionJointImplicit ConfChangeTransition = 1 + // Use joint consensus and remain in the joint configuration until the + // application proposes a no-op configuration change. This is suitable for + // applications that want to explicitly control the transitions, for example + // to use a custom payload (via the Context field). + ConfChangeTransitionJointExplicit ConfChangeTransition = 2 +) + +var ConfChangeTransition_name = map[int32]string{ + 0: "ConfChangeTransitionAuto", + 1: "ConfChangeTransitionJointImplicit", + 2: "ConfChangeTransitionJointExplicit", +} +var ConfChangeTransition_value = map[string]int32{ + "ConfChangeTransitionAuto": 0, + "ConfChangeTransitionJointImplicit": 1, + "ConfChangeTransitionJointExplicit": 2, +} + +func (x ConfChangeTransition) Enum() *ConfChangeTransition { + p := new(ConfChangeTransition) + *p = x + return p +} +func (x ConfChangeTransition) String() string { + return proto.EnumName(ConfChangeTransition_name, int32(x)) +} +func (x *ConfChangeTransition) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(ConfChangeTransition_value, data, "ConfChangeTransition") + if err != nil { + return err + } + *x = ConfChangeTransition(value) + return nil +} +func (ConfChangeTransition) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{2} } + type ConfChangeType int32 const ( @@ -198,7 +254,7 @@ func (x *ConfChangeType) UnmarshalJSON(data []byte) error { *x = ConfChangeType(value) return nil } -func (ConfChangeType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{2} } +func (ConfChangeType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{3} } type Entry struct { Term uint64 `protobuf:"varint,2,opt,name=Term" json:"Term"` @@ -270,9 +326,21 @@ func (*HardState) ProtoMessage() {} func (*HardState) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{4} } type ConfState struct { - Nodes []uint64 `protobuf:"varint,1,rep,name=nodes" json:"nodes,omitempty"` - Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"` - XXX_unrecognized []byte `json:"-"` + // The voters in the incoming config. (If the configuration is not joint, + // then the outgoing config is empty). + Voters []uint64 `protobuf:"varint,1,rep,name=voters" json:"voters,omitempty"` + // The learners in the incoming config. + Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"` + // The voters in the outgoing config. + VotersOutgoing []uint64 `protobuf:"varint,3,rep,name=voters_outgoing,json=votersOutgoing" json:"voters_outgoing,omitempty"` + // The nodes that will become learners when the outgoing config is removed. + // These nodes are necessarily currently in nodes_joint (or they would have + // been added to the incoming config right away). + LearnersNext []uint64 `protobuf:"varint,4,rep,name=learners_next,json=learnersNext" json:"learners_next,omitempty"` + // If set, the config is joint and Raft will automatically transition into + // the final config (i.e. remove the outgoing config) when this is safe. + AutoLeave bool `protobuf:"varint,5,opt,name=auto_leave,json=autoLeave" json:"auto_leave"` + XXX_unrecognized []byte `json:"-"` } func (m *ConfState) Reset() { *m = ConfState{} } @@ -281,11 +349,14 @@ func (*ConfState) ProtoMessage() {} func (*ConfState) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{5} } type ConfChange struct { - ID uint64 `protobuf:"varint,1,opt,name=ID" json:"ID"` - Type ConfChangeType `protobuf:"varint,2,opt,name=Type,enum=raftpb.ConfChangeType" json:"Type"` - NodeID uint64 `protobuf:"varint,3,opt,name=NodeID" json:"NodeID"` - Context []byte `protobuf:"bytes,4,opt,name=Context" json:"Context,omitempty"` - XXX_unrecognized []byte `json:"-"` + Type ConfChangeType `protobuf:"varint,2,opt,name=type,enum=raftpb.ConfChangeType" json:"type"` + NodeID uint64 `protobuf:"varint,3,opt,name=node_id,json=nodeId" json:"node_id"` + Context []byte `protobuf:"bytes,4,opt,name=context" json:"context,omitempty"` + // NB: this is used only by etcd to thread through a unique identifier. + // Ideally it should really use the Context instead. No counterpart to + // this field exists in ConfChangeV2. + ID uint64 `protobuf:"varint,1,opt,name=id" json:"id"` + XXX_unrecognized []byte `json:"-"` } func (m *ConfChange) Reset() { *m = ConfChange{} } @@ -293,6 +364,63 @@ func (m *ConfChange) String() string { return proto.CompactTextString func (*ConfChange) ProtoMessage() {} func (*ConfChange) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{6} } +// ConfChangeSingle is an individual configuration change operation. Multiple +// such operations can be carried out atomically via a ConfChangeV2. +type ConfChangeSingle struct { + Type ConfChangeType `protobuf:"varint,1,opt,name=type,enum=raftpb.ConfChangeType" json:"type"` + NodeID uint64 `protobuf:"varint,2,opt,name=node_id,json=nodeId" json:"node_id"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ConfChangeSingle) Reset() { *m = ConfChangeSingle{} } +func (m *ConfChangeSingle) String() string { return proto.CompactTextString(m) } +func (*ConfChangeSingle) ProtoMessage() {} +func (*ConfChangeSingle) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{7} } + +// ConfChangeV2 messages initiate configuration changes. They support both the +// simple "one at a time" membership change protocol and full Joint Consensus +// allowing for arbitrary changes in membership. +// +// The supplied context is treated as an opaque payload and can be used to +// attach an action on the state machine to the application of the config change +// proposal. Note that contrary to Joint Consensus as outlined in the Raft +// paper[1], configuration changes become active when they are *applied* to the +// state machine (not when they are appended to the log). +// +// The simple protocol can be used whenever only a single change is made. +// +// Non-simple changes require the use of Joint Consensus, for which two +// configuration changes are run. The first configuration change specifies the +// desired changes and transitions the Raft group into the joint configuration, +// in which quorum requires a majority of both the pre-changes and post-changes +// configuration. Joint Consensus avoids entering fragile intermediate +// configurations that could compromise survivability. For example, without the +// use of Joint Consensus and running across three availability zones with a +// replication factor of three, it is not possible to replace a voter without +// entering an intermediate configuration that does not survive the outage of +// one availability zone. +// +// The provided ConfChangeTransition specifies how (and whether) Joint Consensus +// is used, and assigns the task of leaving the joint configuration either to +// Raft or the application. Leaving the joint configuration is accomplished by +// proposing a ConfChangeV2 with only and optionally the Context field +// populated. +// +// For details on Raft membership changes, see: +// +// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf +type ConfChangeV2 struct { + Transition ConfChangeTransition `protobuf:"varint,1,opt,name=transition,enum=raftpb.ConfChangeTransition" json:"transition"` + Changes []ConfChangeSingle `protobuf:"bytes,2,rep,name=changes" json:"changes"` + Context []byte `protobuf:"bytes,3,opt,name=context" json:"context,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ConfChangeV2) Reset() { *m = ConfChangeV2{} } +func (m *ConfChangeV2) String() string { return proto.CompactTextString(m) } +func (*ConfChangeV2) ProtoMessage() {} +func (*ConfChangeV2) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{8} } + func init() { proto.RegisterType((*Entry)(nil), "raftpb.Entry") proto.RegisterType((*SnapshotMetadata)(nil), "raftpb.SnapshotMetadata") @@ -301,8 +429,11 @@ func init() { proto.RegisterType((*HardState)(nil), "raftpb.HardState") proto.RegisterType((*ConfState)(nil), "raftpb.ConfState") proto.RegisterType((*ConfChange)(nil), "raftpb.ConfChange") + proto.RegisterType((*ConfChangeSingle)(nil), "raftpb.ConfChangeSingle") + proto.RegisterType((*ConfChangeV2)(nil), "raftpb.ConfChangeV2") proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value) proto.RegisterEnum("raftpb.MessageType", MessageType_name, MessageType_value) + proto.RegisterEnum("raftpb.ConfChangeTransition", ConfChangeTransition_name, ConfChangeTransition_value) proto.RegisterEnum("raftpb.ConfChangeType", ConfChangeType_name, ConfChangeType_value) } func (m *Entry) Marshal() (dAtA []byte, err error) { @@ -535,8 +666,8 @@ func (m *ConfState) MarshalTo(dAtA []byte) (int, error) { _ = i var l int _ = l - if len(m.Nodes) > 0 { - for _, num := range m.Nodes { + if len(m.Voters) > 0 { + for _, num := range m.Voters { dAtA[i] = 0x8 i++ i = encodeVarintRaft(dAtA, i, uint64(num)) @@ -549,6 +680,28 @@ func (m *ConfState) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintRaft(dAtA, i, uint64(num)) } } + if len(m.VotersOutgoing) > 0 { + for _, num := range m.VotersOutgoing { + dAtA[i] = 0x18 + i++ + i = encodeVarintRaft(dAtA, i, uint64(num)) + } + } + if len(m.LearnersNext) > 0 { + for _, num := range m.LearnersNext { + dAtA[i] = 0x20 + i++ + i = encodeVarintRaft(dAtA, i, uint64(num)) + } + } + dAtA[i] = 0x28 + i++ + if m.AutoLeave { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -591,6 +744,75 @@ func (m *ConfChange) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *ConfChangeSingle) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ConfChangeSingle) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0x8 + i++ + i = encodeVarintRaft(dAtA, i, uint64(m.Type)) + dAtA[i] = 0x10 + i++ + i = encodeVarintRaft(dAtA, i, uint64(m.NodeID)) + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + +func (m *ConfChangeV2) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ConfChangeV2) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0x8 + i++ + i = encodeVarintRaft(dAtA, i, uint64(m.Transition)) + if len(m.Changes) > 0 { + for _, msg := range m.Changes { + dAtA[i] = 0x12 + i++ + i = encodeVarintRaft(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if m.Context != nil { + dAtA[i] = 0x1a + i++ + i = encodeVarintRaft(dAtA, i, uint64(len(m.Context))) + i += copy(dAtA[i:], m.Context) + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + func encodeVarintRaft(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -689,8 +911,8 @@ func (m *HardState) Size() (n int) { func (m *ConfState) Size() (n int) { var l int _ = l - if len(m.Nodes) > 0 { - for _, e := range m.Nodes { + if len(m.Voters) > 0 { + for _, e := range m.Voters { n += 1 + sovRaft(uint64(e)) } } @@ -699,6 +921,17 @@ func (m *ConfState) Size() (n int) { n += 1 + sovRaft(uint64(e)) } } + if len(m.VotersOutgoing) > 0 { + for _, e := range m.VotersOutgoing { + n += 1 + sovRaft(uint64(e)) + } + } + if len(m.LearnersNext) > 0 { + for _, e := range m.LearnersNext { + n += 1 + sovRaft(uint64(e)) + } + } + n += 2 if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -721,6 +954,37 @@ func (m *ConfChange) Size() (n int) { return n } +func (m *ConfChangeSingle) Size() (n int) { + var l int + _ = l + n += 1 + sovRaft(uint64(m.Type)) + n += 1 + sovRaft(uint64(m.NodeID)) + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *ConfChangeV2) Size() (n int) { + var l int + _ = l + n += 1 + sovRaft(uint64(m.Transition)) + if len(m.Changes) > 0 { + for _, e := range m.Changes { + l = e.Size() + n += 1 + l + sovRaft(uint64(l)) + } + } + if m.Context != nil { + l = len(m.Context) + n += 1 + l + sovRaft(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + func sovRaft(x uint64) (n int) { for { n++ @@ -1573,7 +1837,7 @@ func (m *ConfState) Unmarshal(dAtA []byte) error { break } } - m.Nodes = append(m.Nodes, v) + m.Voters = append(m.Voters, v) } else if wireType == 2 { var packedLen int for shift := uint(0); ; shift += 7 { @@ -1613,10 +1877,10 @@ func (m *ConfState) Unmarshal(dAtA []byte) error { break } } - m.Nodes = append(m.Nodes, v) + m.Voters = append(m.Voters, v) } } else { - return fmt.Errorf("proto: wrong wireType = %d for field Nodes", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Voters", wireType) } case 2: if wireType == 0 { @@ -1680,6 +1944,150 @@ func (m *ConfState) Unmarshal(dAtA []byte) error { } else { return fmt.Errorf("proto: wrong wireType = %d for field Learners", wireType) } + case 3: + if wireType == 0 { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.VotersOutgoing = append(m.VotersOutgoing, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthRaft + } + postIndex := iNdEx + packedLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + for iNdEx < postIndex { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.VotersOutgoing = append(m.VotersOutgoing, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field VotersOutgoing", wireType) + } + case 4: + if wireType == 0 { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.LearnersNext = append(m.LearnersNext, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthRaft + } + postIndex := iNdEx + packedLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + for iNdEx < postIndex { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.LearnersNext = append(m.LearnersNext, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field LearnersNext", wireType) + } + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field AutoLeave", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.AutoLeave = bool(v != 0) default: iNdEx = preIndex skippy, err := skipRaft(dAtA[iNdEx:]) @@ -1841,6 +2249,227 @@ func (m *ConfChange) Unmarshal(dAtA []byte) error { } return nil } +func (m *ConfChangeSingle) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ConfChangeSingle: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ConfChangeSingle: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Type |= (ConfChangeType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NodeID", wireType) + } + m.NodeID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.NodeID |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRaft(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRaft + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ConfChangeV2) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ConfChangeV2: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ConfChangeV2: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Transition", wireType) + } + m.Transition = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Transition |= (ConfChangeTransition(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Changes", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRaft + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Changes = append(m.Changes, ConfChangeSingle{}) + if err := m.Changes[len(m.Changes)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Context", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRaft + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Context = append(m.Context[:0], dAtA[iNdEx:postIndex]...) + if m.Context == nil { + m.Context = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRaft(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRaft + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipRaft(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 @@ -1949,56 +2578,69 @@ var ( func init() { proto.RegisterFile("raft.proto", fileDescriptorRaft) } var fileDescriptorRaft = []byte{ - // 815 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x54, 0xcd, 0x6e, 0x23, 0x45, - 0x10, 0xf6, 0x8c, 0xc7, 0x7f, 0x35, 0x8e, 0xd3, 0xa9, 0x35, 0xa8, 0x15, 0x45, 0xc6, 0xb2, 0x38, - 0x58, 0x41, 0x1b, 0x20, 0x07, 0x0e, 0x48, 0x1c, 0x36, 0x09, 0x52, 0x22, 0xad, 0xa3, 0xc5, 0x9b, - 0xe5, 0x80, 0x84, 0x50, 0xc7, 0x53, 0x9e, 0x18, 0x32, 0xd3, 0xa3, 0x9e, 0xf6, 0xb2, 0xb9, 0x20, - 0x1e, 0x80, 0x07, 0xe0, 0xc2, 0xfb, 0xe4, 0xb8, 0x12, 0x77, 0xc4, 0x86, 0x17, 0x41, 0xdd, 0xd3, - 0x63, 0xcf, 0x24, 0xb7, 0xae, 0xef, 0xab, 0xae, 0xfa, 0xea, 0xeb, 0x9a, 0x01, 0x50, 0x62, 0xa9, - 0x8f, 0x32, 0x25, 0xb5, 0xc4, 0xb6, 0x39, 0x67, 0xd7, 0xfb, 0xc3, 0x58, 0xc6, 0xd2, 0x42, 0x9f, - 0x9b, 0x53, 0xc1, 0x4e, 0x7e, 0x83, 0xd6, 0xb7, 0xa9, 0x56, 0x77, 0xf8, 0x19, 0x04, 0x57, 0x77, - 0x19, 0x71, 0x6f, 0xec, 0x4d, 0x07, 0xc7, 0x7b, 0x47, 0xc5, 0xad, 0x23, 0x4b, 0x1a, 0xe2, 0x24, - 0xb8, 0xff, 0xe7, 0x93, 0xc6, 0xdc, 0x26, 0x21, 0x87, 0xe0, 0x8a, 0x54, 0xc2, 0xfd, 0xb1, 0x37, - 0x0d, 0x36, 0x0c, 0xa9, 0x04, 0xf7, 0xa1, 0x75, 0x91, 0x46, 0xf4, 0x8e, 0x37, 0x2b, 0x54, 0x01, - 0x21, 0x42, 0x70, 0x26, 0xb4, 0xe0, 0xc1, 0xd8, 0x9b, 0xf6, 0xe7, 0xf6, 0x3c, 0xf9, 0xdd, 0x03, - 0xf6, 0x3a, 0x15, 0x59, 0x7e, 0x23, 0xf5, 0x8c, 0xb4, 0x88, 0x84, 0x16, 0xf8, 0x15, 0xc0, 0x42, - 0xa6, 0xcb, 0x9f, 0x72, 0x2d, 0x74, 0xa1, 0x28, 0xdc, 0x2a, 0x3a, 0x95, 0xe9, 0xf2, 0xb5, 0x21, - 0x5c, 0xf1, 0xde, 0xa2, 0x04, 0x4c, 0xf3, 0x95, 0x6d, 0x5e, 0xd5, 0x55, 0x40, 0x46, 0xb2, 0x36, - 0x92, 0xab, 0xba, 0x2c, 0x32, 0xf9, 0x01, 0xba, 0xa5, 0x02, 0x23, 0xd1, 0x28, 0xb0, 0x3d, 0xfb, - 0x73, 0x7b, 0xc6, 0xaf, 0xa1, 0x9b, 0x38, 0x65, 0xb6, 0x70, 0x78, 0xcc, 0x4b, 0x2d, 0x8f, 0x95, - 0xbb, 0xba, 0x9b, 0xfc, 0xc9, 0x5f, 0x4d, 0xe8, 0xcc, 0x28, 0xcf, 0x45, 0x4c, 0xf8, 0x1c, 0x02, - 0xbd, 0x75, 0xf8, 0x59, 0x59, 0xc3, 0xd1, 0x55, 0x8f, 0x4d, 0x1a, 0x0e, 0xc1, 0xd7, 0xb2, 0x36, - 0x89, 0xaf, 0xa5, 0x19, 0x63, 0xa9, 0xe4, 0xa3, 0x31, 0x0c, 0xb2, 0x19, 0x30, 0x78, 0x3c, 0x20, - 0x8e, 0xa0, 0x73, 0x2b, 0x63, 0xfb, 0x60, 0xad, 0x0a, 0x59, 0x82, 0x5b, 0xdb, 0xda, 0x4f, 0x6d, - 0x7b, 0x0e, 0x1d, 0x4a, 0xb5, 0x5a, 0x51, 0xce, 0x3b, 0xe3, 0xe6, 0x34, 0x3c, 0xde, 0xa9, 0x6d, - 0x46, 0x59, 0xca, 0xe5, 0xe0, 0x01, 0xb4, 0x17, 0x32, 0x49, 0x56, 0x9a, 0x77, 0x2b, 0xb5, 0x1c, - 0x86, 0xc7, 0xd0, 0xcd, 0x9d, 0x63, 0xbc, 0x67, 0x9d, 0x64, 0x8f, 0x9d, 0x2c, 0x1d, 0x2c, 0xf3, - 0x4c, 0x45, 0x45, 0x3f, 0xd3, 0x42, 0x73, 0x18, 0x7b, 0xd3, 0x6e, 0x59, 0xb1, 0xc0, 0xf0, 0x53, - 0x80, 0xe2, 0x74, 0xbe, 0x4a, 0x35, 0x0f, 0x2b, 0x3d, 0x2b, 0x38, 0x72, 0xe8, 0x2c, 0x64, 0xaa, - 0xe9, 0x9d, 0xe6, 0x7d, 0xfb, 0xb0, 0x65, 0x38, 0xf9, 0x11, 0x7a, 0xe7, 0x42, 0x45, 0xc5, 0xfa, - 0x94, 0x0e, 0x7a, 0x4f, 0x1c, 0xe4, 0x10, 0xbc, 0x95, 0x9a, 0xea, 0xfb, 0x6e, 0x90, 0xca, 0xc0, - 0xcd, 0xa7, 0x03, 0x4f, 0xbe, 0x81, 0xde, 0x66, 0x5d, 0x71, 0x08, 0xad, 0x54, 0x46, 0x94, 0x73, - 0x6f, 0xdc, 0x9c, 0x06, 0xf3, 0x22, 0xc0, 0x7d, 0xe8, 0xde, 0x92, 0x50, 0x29, 0xa9, 0x9c, 0xfb, - 0x96, 0xd8, 0xc4, 0x93, 0x3f, 0x3c, 0x00, 0x73, 0xff, 0xf4, 0x46, 0xa4, 0xb1, 0xdd, 0x88, 0x8b, - 0xb3, 0x9a, 0x3a, 0xff, 0xe2, 0x0c, 0xbf, 0x70, 0x1f, 0xae, 0x6f, 0xd7, 0xea, 0xe3, 0xea, 0x67, - 0x52, 0xdc, 0x7b, 0xf2, 0xf5, 0x1e, 0x40, 0xfb, 0x52, 0x46, 0x74, 0x71, 0x56, 0xd7, 0x5c, 0x60, - 0xc6, 0xac, 0x53, 0x67, 0x56, 0xf1, 0xa1, 0x96, 0xe1, 0xe1, 0x97, 0xd0, 0xdb, 0xfc, 0x0e, 0x70, - 0x17, 0x42, 0x1b, 0x5c, 0x4a, 0x95, 0x88, 0x5b, 0xd6, 0xc0, 0x67, 0xb0, 0x6b, 0x81, 0x6d, 0x63, - 0xe6, 0x1d, 0xfe, 0xed, 0x43, 0x58, 0x59, 0x70, 0x04, 0x68, 0xcf, 0xf2, 0xf8, 0x7c, 0x9d, 0xb1, - 0x06, 0x86, 0xd0, 0x99, 0xe5, 0xf1, 0x09, 0x09, 0xcd, 0x3c, 0x17, 0xbc, 0x52, 0x32, 0x63, 0xbe, - 0xcb, 0x7a, 0x91, 0x65, 0xac, 0x89, 0x03, 0x80, 0xe2, 0x3c, 0xa7, 0x3c, 0x63, 0x81, 0x4b, 0xfc, - 0x5e, 0x6a, 0x62, 0x2d, 0x23, 0xc2, 0x05, 0x96, 0x6d, 0x3b, 0xd6, 0x2c, 0x13, 0xeb, 0x20, 0x83, - 0xbe, 0x69, 0x46, 0x42, 0xe9, 0x6b, 0xd3, 0xa5, 0x8b, 0x43, 0x60, 0x55, 0xc4, 0x5e, 0xea, 0x21, - 0xc2, 0x60, 0x96, 0xc7, 0x6f, 0x52, 0x45, 0x62, 0x71, 0x23, 0xae, 0x6f, 0x89, 0x01, 0xee, 0xc1, - 0x8e, 0x2b, 0x64, 0x1e, 0x6f, 0x9d, 0xb3, 0xd0, 0xa5, 0x9d, 0xde, 0xd0, 0xe2, 0x97, 0xef, 0xd6, - 0x52, 0xad, 0x13, 0xd6, 0xc7, 0x8f, 0x60, 0x6f, 0x96, 0xc7, 0x57, 0x4a, 0xa4, 0xf9, 0x92, 0xd4, - 0x4b, 0x12, 0x11, 0x29, 0xb6, 0xe3, 0x6e, 0x5f, 0xad, 0x12, 0x92, 0x6b, 0x7d, 0x29, 0x7f, 0x65, - 0x03, 0x27, 0x66, 0x4e, 0x22, 0xb2, 0x3f, 0x43, 0xb6, 0xeb, 0xc4, 0x6c, 0x10, 0x2b, 0x86, 0xb9, - 0x79, 0x5f, 0x29, 0xb2, 0x23, 0xee, 0xb9, 0xae, 0x2e, 0xb6, 0x39, 0x78, 0x78, 0x07, 0x83, 0xfa, - 0xf3, 0x1a, 0x1d, 0x5b, 0xe4, 0x45, 0x14, 0x99, 0xb7, 0x64, 0x0d, 0xe4, 0x30, 0xdc, 0xc2, 0x73, - 0x4a, 0xe4, 0x5b, 0xb2, 0x8c, 0x57, 0x67, 0xde, 0x64, 0x91, 0xd0, 0x05, 0xe3, 0xe3, 0x01, 0xf0, - 0x5a, 0xa9, 0x97, 0xc5, 0x36, 0x5a, 0xb6, 0x79, 0xc2, 0xef, 0x3f, 0x8c, 0x1a, 0xef, 0x3f, 0x8c, - 0x1a, 0xf7, 0x0f, 0x23, 0xef, 0xfd, 0xc3, 0xc8, 0xfb, 0xf7, 0x61, 0xe4, 0xfd, 0xf9, 0xdf, 0xa8, - 0xf1, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x86, 0x52, 0x5b, 0xe0, 0x74, 0x06, 0x00, 0x00, + // 1009 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0xcd, 0x6e, 0xe3, 0x36, + 0x17, 0xb5, 0x64, 0xc5, 0x3f, 0xd7, 0x8e, 0xc3, 0xdc, 0xc9, 0x37, 0x20, 0x82, 0xc0, 0xe3, 0xcf, + 0xd3, 0x62, 0x8c, 0x14, 0x93, 0x16, 0x5e, 0x14, 0x45, 0x77, 0xf9, 0x19, 0x20, 0x29, 0xe2, 0x74, + 0xea, 0x64, 0xb2, 0x28, 0x50, 0x04, 0x8c, 0x45, 0x2b, 0x6a, 0x2d, 0x51, 0xa0, 0xe8, 0x34, 0xd9, + 0x14, 0x45, 0x9f, 0xa2, 0x9b, 0xd9, 0xf6, 0x01, 0xfa, 0x14, 0x59, 0x0e, 0xd0, 0xfd, 0xa0, 0x93, + 0xbe, 0x48, 0x41, 0x8a, 0xb2, 0x65, 0x27, 0x98, 0x45, 0x77, 0xe4, 0x39, 0x87, 0xf7, 0x9e, 0x7b, + 0x79, 0x45, 0x01, 0x48, 0x36, 0x56, 0x3b, 0x89, 0x14, 0x4a, 0x60, 0x45, 0xaf, 0x93, 0xcb, 0xcd, + 0x8d, 0x40, 0x04, 0xc2, 0x40, 0x9f, 0xeb, 0x55, 0xc6, 0x76, 0x7f, 0x81, 0x95, 0x57, 0xb1, 0x92, + 0xb7, 0xf8, 0x19, 0x78, 0x67, 0xb7, 0x09, 0xa7, 0x4e, 0xc7, 0xe9, 0xb5, 0xfa, 0xeb, 0x3b, 0xd9, + 0xa9, 0x1d, 0x43, 0x6a, 0x62, 0xcf, 0xbb, 0x7b, 0xff, 0xac, 0x34, 0x34, 0x22, 0xa4, 0xe0, 0x9d, + 0x71, 0x19, 0x51, 0xb7, 0xe3, 0xf4, 0xbc, 0x19, 0xc3, 0x65, 0x84, 0x9b, 0xb0, 0x72, 0x14, 0xfb, + 0xfc, 0x86, 0x96, 0x0b, 0x54, 0x06, 0x21, 0x82, 0x77, 0xc0, 0x14, 0xa3, 0x5e, 0xc7, 0xe9, 0x35, + 0x87, 0x66, 0xdd, 0xfd, 0xd5, 0x01, 0x72, 0x1a, 0xb3, 0x24, 0xbd, 0x12, 0x6a, 0xc0, 0x15, 0xf3, + 0x99, 0x62, 0xf8, 0x25, 0xc0, 0x48, 0xc4, 0xe3, 0x8b, 0x54, 0x31, 0x95, 0x39, 0x6a, 0xcc, 0x1d, + 0xed, 0x8b, 0x78, 0x7c, 0xaa, 0x09, 0x1b, 0xbc, 0x3e, 0xca, 0x01, 0x9d, 0x3c, 0x34, 0xc9, 0x8b, + 0xbe, 0x32, 0x48, 0x5b, 0x56, 0xda, 0x72, 0xd1, 0x97, 0x41, 0xba, 0xdf, 0x43, 0x2d, 0x77, 0xa0, + 0x2d, 0x6a, 0x07, 0x26, 0x67, 0x73, 0x68, 0xd6, 0xf8, 0x35, 0xd4, 0x22, 0xeb, 0xcc, 0x04, 0x6e, + 0xf4, 0x69, 0xee, 0x65, 0xd9, 0xb9, 0x8d, 0x3b, 0xd3, 0x77, 0xdf, 0x96, 0xa1, 0x3a, 0xe0, 0x69, + 0xca, 0x02, 0x8e, 0x2f, 0xc1, 0x53, 0xf3, 0x0e, 0x3f, 0xc9, 0x63, 0x58, 0xba, 0xd8, 0x63, 0x2d, + 0xc3, 0x0d, 0x70, 0x95, 0x58, 0xa8, 0xc4, 0x55, 0x42, 0x97, 0x31, 0x96, 0x62, 0xa9, 0x0c, 0x8d, + 0xcc, 0x0a, 0xf4, 0x96, 0x0b, 0xc4, 0x36, 0x54, 0x27, 0x22, 0x30, 0x17, 0xb6, 0x52, 0x20, 0x73, + 0x70, 0xde, 0xb6, 0xca, 0xc3, 0xb6, 0xbd, 0x84, 0x2a, 0x8f, 0x95, 0x0c, 0x79, 0x4a, 0xab, 0x9d, + 0x72, 0xaf, 0xd1, 0x5f, 0x5d, 0x98, 0x8c, 0x3c, 0x94, 0xd5, 0xe0, 0x16, 0x54, 0x46, 0x22, 0x8a, + 0x42, 0x45, 0x6b, 0x85, 0x58, 0x16, 0xc3, 0x3e, 0xd4, 0x52, 0xdb, 0x31, 0x5a, 0x37, 0x9d, 0x24, + 0xcb, 0x9d, 0xcc, 0x3b, 0x98, 0xeb, 0x74, 0x44, 0xc9, 0x7f, 0xe4, 0x23, 0x45, 0xa1, 0xe3, 0xf4, + 0x6a, 0x79, 0xc4, 0x0c, 0xc3, 0x4f, 0x00, 0xb2, 0xd5, 0x61, 0x18, 0x2b, 0xda, 0x28, 0xe4, 0x2c, + 0xe0, 0x48, 0xa1, 0x3a, 0x12, 0xb1, 0xe2, 0x37, 0x8a, 0x36, 0xcd, 0xc5, 0xe6, 0xdb, 0xee, 0x0f, + 0x50, 0x3f, 0x64, 0xd2, 0xcf, 0xc6, 0x27, 0xef, 0xa0, 0xf3, 0xa0, 0x83, 0x14, 0xbc, 0x6b, 0xa1, + 0xf8, 0xe2, 0xbc, 0x6b, 0xa4, 0x50, 0x70, 0xf9, 0x61, 0xc1, 0xdd, 0x3f, 0x1d, 0xa8, 0xcf, 0xe6, + 0x15, 0x9f, 0x42, 0x45, 0x9f, 0x91, 0x29, 0x75, 0x3a, 0xe5, 0x9e, 0x37, 0xb4, 0x3b, 0xdc, 0x84, + 0xda, 0x84, 0x33, 0x19, 0x6b, 0xc6, 0x35, 0xcc, 0x6c, 0x8f, 0x2f, 0x60, 0x2d, 0x53, 0x5d, 0x88, + 0xa9, 0x0a, 0x44, 0x18, 0x07, 0xb4, 0x6c, 0x24, 0xad, 0x0c, 0xfe, 0xd6, 0xa2, 0xf8, 0x1c, 0x56, + 0xf3, 0x43, 0x17, 0xb1, 0xae, 0xd4, 0x33, 0xb2, 0x66, 0x0e, 0x9e, 0xf0, 0x1b, 0x85, 0xcf, 0x01, + 0xd8, 0x54, 0x89, 0x8b, 0x09, 0x67, 0xd7, 0xdc, 0x0c, 0x43, 0xde, 0xd0, 0xba, 0xc6, 0x8f, 0x35, + 0xdc, 0x7d, 0xeb, 0x00, 0x68, 0xd3, 0xfb, 0x57, 0x2c, 0x0e, 0xf4, 0x47, 0xe5, 0x86, 0xbe, 0xed, + 0x09, 0x68, 0xed, 0xfd, 0xfb, 0x67, 0xee, 0xd1, 0xc1, 0xd0, 0x0d, 0x7d, 0xfc, 0xc2, 0x8e, 0xb4, + 0x6b, 0x46, 0xfa, 0x69, 0xf1, 0x13, 0xcd, 0x4e, 0x3f, 0x98, 0xea, 0x17, 0x50, 0x8d, 0x85, 0xcf, + 0x2f, 0x42, 0xdf, 0x36, 0xac, 0x65, 0x43, 0x56, 0x4e, 0x84, 0xcf, 0x8f, 0x0e, 0x86, 0x15, 0x4d, + 0x1f, 0xf9, 0xc5, 0x3b, 0xf3, 0x16, 0xef, 0x2c, 0x02, 0x32, 0x4f, 0x70, 0x1a, 0xc6, 0xc1, 0x84, + 0xcf, 0x8c, 0x38, 0xff, 0xc5, 0x88, 0xfb, 0x31, 0x23, 0xdd, 0x3f, 0x1c, 0x68, 0xce, 0xe3, 0x9c, + 0xf7, 0x71, 0x0f, 0x40, 0x49, 0x16, 0xa7, 0xa1, 0x0a, 0x45, 0x6c, 0x33, 0x6e, 0x3d, 0x92, 0x71, + 0xa6, 0xc9, 0x27, 0x72, 0x7e, 0x0a, 0xbf, 0x82, 0xea, 0xc8, 0xa8, 0xb2, 0x1b, 0x2f, 0x3c, 0x29, + 0xcb, 0xa5, 0xe5, 0x5f, 0x98, 0x95, 0x17, 0xfb, 0x52, 0x5e, 0xe8, 0xcb, 0xf6, 0x21, 0xd4, 0x67, + 0xaf, 0x35, 0xae, 0x41, 0xc3, 0x6c, 0x4e, 0x84, 0x8c, 0xd8, 0x84, 0x94, 0xf0, 0x09, 0xac, 0x19, + 0x60, 0x1e, 0x9f, 0x38, 0xf8, 0x3f, 0x58, 0x5f, 0x02, 0xcf, 0xfb, 0xc4, 0xdd, 0xfe, 0xcb, 0x85, + 0x46, 0xe1, 0x59, 0x42, 0x80, 0xca, 0x20, 0x0d, 0x0e, 0xa7, 0x09, 0x29, 0x61, 0x03, 0xaa, 0x83, + 0x34, 0xd8, 0xe3, 0x4c, 0x11, 0xc7, 0x6e, 0x5e, 0x4b, 0x91, 0x10, 0xd7, 0xaa, 0x76, 0x93, 0x84, + 0x94, 0xb1, 0x05, 0x90, 0xad, 0x87, 0x3c, 0x4d, 0x88, 0x67, 0x85, 0xe7, 0x42, 0x71, 0xb2, 0xa2, + 0xbd, 0xd9, 0x8d, 0x61, 0x2b, 0x96, 0xd5, 0x4f, 0x00, 0xa9, 0x22, 0x81, 0xa6, 0x4e, 0xc6, 0x99, + 0x54, 0x97, 0x3a, 0x4b, 0x0d, 0x37, 0x80, 0x14, 0x11, 0x73, 0xa8, 0x8e, 0x08, 0xad, 0x41, 0x1a, + 0xbc, 0x89, 0x25, 0x67, 0xa3, 0x2b, 0x76, 0x39, 0xe1, 0x04, 0x70, 0x1d, 0x56, 0x6d, 0x20, 0xfd, + 0xc5, 0x4d, 0x53, 0xd2, 0xb0, 0xb2, 0xfd, 0x2b, 0x3e, 0xfa, 0xe9, 0xbb, 0xa9, 0x90, 0xd3, 0x88, + 0x34, 0x75, 0xd9, 0x83, 0x34, 0x30, 0x17, 0x34, 0xe6, 0xf2, 0x98, 0x33, 0x9f, 0x4b, 0xb2, 0x6a, + 0x4f, 0x9f, 0x85, 0x11, 0x17, 0x53, 0x75, 0x22, 0x7e, 0x26, 0x2d, 0x6b, 0x66, 0xc8, 0x99, 0x6f, + 0x7e, 0x61, 0x64, 0xcd, 0x9a, 0x99, 0x21, 0xc6, 0x0c, 0xb1, 0xf5, 0xbe, 0x96, 0xdc, 0x94, 0xb8, + 0x6e, 0xb3, 0xda, 0xbd, 0xd1, 0xe0, 0xf6, 0x6f, 0x0e, 0x6c, 0x3c, 0x36, 0x1e, 0xb8, 0x05, 0xf4, + 0x31, 0x7c, 0x77, 0xaa, 0x04, 0x29, 0xe1, 0xa7, 0xf0, 0xff, 0xc7, 0xd8, 0x6f, 0x44, 0x18, 0xab, + 0xa3, 0x28, 0x99, 0x84, 0xa3, 0x50, 0x5f, 0xc5, 0xc7, 0x64, 0xaf, 0x6e, 0xac, 0xcc, 0xdd, 0xbe, + 0x85, 0xd6, 0xe2, 0x47, 0xa1, 0x9b, 0x31, 0x47, 0x76, 0x7d, 0x5f, 0x8f, 0x3f, 0x29, 0x21, 0x2d, + 0x9a, 0x1d, 0xf2, 0x48, 0x5c, 0x73, 0xc3, 0x38, 0x8b, 0xcc, 0x9b, 0xc4, 0x67, 0x2a, 0x63, 0xdc, + 0xc5, 0x42, 0x76, 0x7d, 0xff, 0x38, 0x7b, 0x7b, 0x0c, 0x5b, 0xde, 0xa3, 0x77, 0x1f, 0xda, 0xa5, + 0x77, 0x1f, 0xda, 0xa5, 0xbb, 0xfb, 0xb6, 0xf3, 0xee, 0xbe, 0xed, 0xfc, 0x7d, 0xdf, 0x76, 0x7e, + 0xff, 0xa7, 0x5d, 0xfa, 0x37, 0x00, 0x00, 0xff, 0xff, 0x87, 0x11, 0x6d, 0xd6, 0xaf, 0x08, 0x00, + 0x00, } diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 644ce7b8f2f..23d62ec2fb0 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -10,8 +10,9 @@ option (gogoproto.goproto_getters_all) = false; option (gogoproto.goproto_enum_prefix_all) = false; enum EntryType { - EntryNormal = 0; - EntryConfChange = 1; + EntryNormal = 0; + EntryConfChange = 1; // corresponds to pb.ConfChange + EntryConfChangeV2 = 2; // corresponds to pb.ConfChangeV2 } message Entry { @@ -75,9 +76,41 @@ message HardState { optional uint64 commit = 3 [(gogoproto.nullable) = false]; } +// ConfChangeTransition specifies the behavior of a configuration change with +// respect to joint consensus. +enum ConfChangeTransition { + // Automatically use the simple protocol if possible, otherwise fall back + // to ConfChangeJointImplicit. Most applications will want to use this. + ConfChangeTransitionAuto = 0; + // Use joint consensus unconditionally, and transition out of them + // automatically (by proposing a zero configuration change). + // + // This option is suitable for applications that want to minimize the time + // spent in the joint configuration and do not store the joint configuration + // in the state machine (outside of InitialState). + ConfChangeTransitionJointImplicit = 1; + // Use joint consensus and remain in the joint configuration until the + // application proposes a no-op configuration change. This is suitable for + // applications that want to explicitly control the transitions, for example + // to use a custom payload (via the Context field). + ConfChangeTransitionJointExplicit = 2; +} + message ConfState { - repeated uint64 nodes = 1; - repeated uint64 learners = 2; + // The voters in the incoming config. (If the configuration is not joint, + // then the outgoing config is empty). + repeated uint64 voters = 1; + // The learners in the incoming config. + repeated uint64 learners = 2; + // The voters in the outgoing config. + repeated uint64 voters_outgoing = 3; + // The nodes that will become learners when the outgoing config is removed. + // These nodes are necessarily currently in nodes_joint (or they would have + // been added to the incoming config right away). + repeated uint64 learners_next = 4; + // If set, the config is joint and Raft will automatically transition into + // the final config (i.e. remove the outgoing config) when this is safe. + optional bool auto_leave = 5 [(gogoproto.nullable) = false]; } enum ConfChangeType { @@ -88,8 +121,57 @@ enum ConfChangeType { } message ConfChange { - optional uint64 ID = 1 [(gogoproto.nullable) = false]; - optional ConfChangeType Type = 2 [(gogoproto.nullable) = false]; - optional uint64 NodeID = 3 [(gogoproto.nullable) = false]; - optional bytes Context = 4; + optional ConfChangeType type = 2 [(gogoproto.nullable) = false]; + optional uint64 node_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "NodeID" ]; + optional bytes context = 4; + + // NB: this is used only by etcd to thread through a unique identifier. + // Ideally it should really use the Context instead. No counterpart to + // this field exists in ConfChangeV2. + optional uint64 id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "ID" ]; +} + +// ConfChangeSingle is an individual configuration change operation. Multiple +// such operations can be carried out atomically via a ConfChangeV2. +message ConfChangeSingle { + optional ConfChangeType type = 1 [(gogoproto.nullable) = false]; + optional uint64 node_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "NodeID"]; +} + +// ConfChangeV2 messages initiate configuration changes. They support both the +// simple "one at a time" membership change protocol and full Joint Consensus +// allowing for arbitrary changes in membership. +// +// The supplied context is treated as an opaque payload and can be used to +// attach an action on the state machine to the application of the config change +// proposal. Note that contrary to Joint Consensus as outlined in the Raft +// paper[1], configuration changes become active when they are *applied* to the +// state machine (not when they are appended to the log). +// +// The simple protocol can be used whenever only a single change is made. +// +// Non-simple changes require the use of Joint Consensus, for which two +// configuration changes are run. The first configuration change specifies the +// desired changes and transitions the Raft group into the joint configuration, +// in which quorum requires a majority of both the pre-changes and post-changes +// configuration. Joint Consensus avoids entering fragile intermediate +// configurations that could compromise survivability. For example, without the +// use of Joint Consensus and running across three availability zones with a +// replication factor of three, it is not possible to replace a voter without +// entering an intermediate configuration that does not survive the outage of +// one availability zone. +// +// The provided ConfChangeTransition specifies how (and whether) Joint Consensus +// is used, and assigns the task of leaving the joint configuration either to +// Raft or the application. Leaving the joint configuration is accomplished by +// proposing a ConfChangeV2 with only and optionally the Context field +// populated. +// +// For details on Raft membership changes, see: +// +// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf +message ConfChangeV2 { + optional ConfChangeTransition transition = 1 [(gogoproto.nullable) = false]; + repeated ConfChangeSingle changes = 2 [(gogoproto.nullable) = false]; + optional bytes context = 3; } diff --git a/raft/rawnode.go b/raft/rawnode.go index b7e53434640..9c192fdd0d1 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -91,23 +91,19 @@ func (rn *RawNode) Propose(data []byte) error { }}) } -// ProposeConfChange proposes a config change. -func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error { - data, err := cc.Marshal() +// ProposeConfChange proposes a config change. See (Node).ProposeConfChange for +// details. +func (rn *RawNode) ProposeConfChange(cc pb.ConfChangeI) error { + m, err := confChangeToMsg(cc) if err != nil { return err } - return rn.raft.Step(pb.Message{ - Type: pb.MsgProp, - Entries: []pb.Entry{ - {Type: pb.EntryConfChange, Data: data}, - }, - }) + return rn.raft.Step(m) } // ApplyConfChange applies a config change to the local node. -func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { - cs := rn.raft.applyConfChange(cc) +func (rn *RawNode) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState { + cs := rn.raft.applyConfChange(cc.AsV2()) return &cs } @@ -159,23 +155,7 @@ func (rn *RawNode) commitReady(rd Ready) { if !IsEmptyHardState(rd.HardState) { rn.prevHardSt = rd.HardState } - - // If entries were applied (or a snapshot), update our cursor for - // the next Ready. Note that if the current HardState contains a - // new Commit index, this does not mean that we're also applying - // all of the new entries due to commit pagination by size. - if index := rd.appliedCursor(); index > 0 { - rn.raft.raftLog.appliedTo(index) - } - rn.raft.reduceUncommittedSize(rd.CommittedEntries) - - if len(rd.Entries) > 0 { - e := rd.Entries[len(rd.Entries)-1] - rn.raft.raftLog.stableTo(e.Index, e.Term) - } - if !IsEmptySnap(rd.Snapshot) { - rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) - } + rn.raft.advance(rd) } // HasReady called when RawNode user need to check if any Ready pending. diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 37386387646..543547a774b 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -64,7 +64,7 @@ func (a *rawNodeAdapter) ReadIndex(_ context.Context, rctx []byte) error { } func (a *rawNodeAdapter) Step(_ context.Context, m pb.Message) error { return a.RawNode.Step(m) } func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) } -func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc pb.ConfChange) error { +func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc pb.ConfChangeI) error { return a.RawNode.ProposeConfChange(cc) } @@ -77,7 +77,7 @@ func TestRawNodeStep(t *testing.T) { s.Append([]pb.Entry{{Term: 1, Index: 1}}) if err := s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{ ConfState: pb.ConfState{ - Nodes: []uint64{1}, + Voters: []uint64{1}, }, Index: 1, Term: 1, @@ -105,65 +105,257 @@ func TestRawNodeStep(t *testing.T) { // TestNodeStepUnblock from node_test.go has no equivalent in rawNode because there is // no goroutine in RawNode. -// TestRawNodeProposeAndConfChange ensures that RawNode.Propose and RawNode.ProposeConfChange -// send the given proposal and ConfChange to the underlying raft. +// TestRawNodeProposeAndConfChange tests the configuration change mechanism. Each +// test case sends a configuration change which is either simple or joint, verifies +// that it applies and that the resulting ConfState matches expectations, and for +// joint configurations makes sure that they are exited successfully. func TestRawNodeProposeAndConfChange(t *testing.T) { - s := NewMemoryStorage() - var err error - rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s)) - if err != nil { - t.Fatal(err) + testCases := []struct { + cc pb.ConfChangeI + exp pb.ConfState + exp2 *pb.ConfState + }{ + // V1 config change. + { + pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 2}, + pb.ConfState{Voters: []uint64{1, 2}}, + nil, + }, + // Proposing the same as a V2 change works just the same, without entering + // a joint config. + { + pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{ + {Type: pb.ConfChangeAddNode, NodeID: 2}, + }, + }, + pb.ConfState{Voters: []uint64{1, 2}}, + nil, + }, + // Ditto if we add it as a learner instead. + { + pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{ + {Type: pb.ConfChangeAddLearnerNode, NodeID: 2}, + }, + }, + pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}}, + nil, + }, + // We can ask explicitly for joint consensus if we want it. + { + pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{ + {Type: pb.ConfChangeAddLearnerNode, NodeID: 2}, + }, + Transition: pb.ConfChangeTransitionJointExplicit, + }, + pb.ConfState{Voters: []uint64{1}, VotersOutgoing: []uint64{1}, Learners: []uint64{2}}, + &pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}}, + }, + // Ditto, but with implicit transition (the harness checks this). + { + pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{ + {Type: pb.ConfChangeAddLearnerNode, NodeID: 2}, + }, + Transition: pb.ConfChangeTransitionJointImplicit, + }, + pb.ConfState{ + Voters: []uint64{1}, VotersOutgoing: []uint64{1}, Learners: []uint64{2}, + AutoLeave: true, + }, + &pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}}, + }, + // Add a new node and demote n1. This exercises the interesting case in + // which we really need joint config changes and also need LearnersNext. + { + pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{ + {NodeID: 2, Type: pb.ConfChangeAddNode}, + {NodeID: 1, Type: pb.ConfChangeAddLearnerNode}, + {NodeID: 3, Type: pb.ConfChangeAddLearnerNode}, + }, + }, + pb.ConfState{ + Voters: []uint64{2}, + VotersOutgoing: []uint64{1}, + Learners: []uint64{3}, + LearnersNext: []uint64{1}, + AutoLeave: true, + }, + &pb.ConfState{Voters: []uint64{2}, Learners: []uint64{1, 3}}, + }, + // Ditto explicit. + { + pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{ + {NodeID: 2, Type: pb.ConfChangeAddNode}, + {NodeID: 1, Type: pb.ConfChangeAddLearnerNode}, + {NodeID: 3, Type: pb.ConfChangeAddLearnerNode}, + }, + Transition: pb.ConfChangeTransitionJointExplicit, + }, + pb.ConfState{ + Voters: []uint64{2}, + VotersOutgoing: []uint64{1}, + Learners: []uint64{3}, + LearnersNext: []uint64{1}, + }, + &pb.ConfState{Voters: []uint64{2}, Learners: []uint64{1, 3}}, + }, + // Ditto implicit. + { + pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{ + {NodeID: 2, Type: pb.ConfChangeAddNode}, + {NodeID: 1, Type: pb.ConfChangeAddLearnerNode}, + {NodeID: 3, Type: pb.ConfChangeAddLearnerNode}, + }, + Transition: pb.ConfChangeTransitionJointImplicit, + }, + pb.ConfState{ + Voters: []uint64{2}, + VotersOutgoing: []uint64{1}, + Learners: []uint64{3}, + LearnersNext: []uint64{1}, + AutoLeave: true, + }, + &pb.ConfState{Voters: []uint64{2}, Learners: []uint64{1, 3}}, + }, } - rawNode.Campaign() - proposed := false - var ( - lastIndex uint64 - ccdata []byte - ) - for { - rd := rawNode.Ready() - s.Append(rd.Entries) - rawNode.Advance(rd) - // Once we are the leader, propose a command and a ConfChange. - if !proposed && rd.SoftState.Lead == rawNode.raft.id { - if err = rawNode.Propose([]byte("somedata")); err != nil { + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + s := NewMemoryStorage() + rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s)) + if err != nil { t.Fatal(err) } - cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1} - ccdata, err = cc.Marshal() + + rawNode.Campaign() + proposed := false + var ( + lastIndex uint64 + ccdata []byte + ) + // Propose the ConfChange, wait until it applies, save the resulting + // ConfState. + var cs *pb.ConfState + for cs == nil { + rd := rawNode.Ready() + s.Append(rd.Entries) + for _, ent := range rd.CommittedEntries { + var cc pb.ConfChangeI + if ent.Type == pb.EntryConfChange { + var ccc pb.ConfChange + if err = ccc.Unmarshal(ent.Data); err != nil { + t.Fatal(err) + } + cc = ccc + } else if ent.Type == pb.EntryConfChangeV2 { + var ccc pb.ConfChangeV2 + if err = ccc.Unmarshal(ent.Data); err != nil { + t.Fatal(err) + } + cc = ccc + } + if cc != nil { + cs = rawNode.ApplyConfChange(cc) + } + } + rawNode.Advance(rd) + // Once we are the leader, propose a command and a ConfChange. + if !proposed && rd.SoftState.Lead == rawNode.raft.id { + if err = rawNode.Propose([]byte("somedata")); err != nil { + t.Fatal(err) + } + if ccv1, ok := tc.cc.AsV1(); ok { + ccdata, err = ccv1.Marshal() + if err != nil { + t.Fatal(err) + } + rawNode.ProposeConfChange(ccv1) + } else { + ccv2 := tc.cc.AsV2() + ccdata, err = ccv2.Marshal() + if err != nil { + t.Fatal(err) + } + rawNode.ProposeConfChange(ccv2) + } + proposed = true + } + } + + // Check that the last index is exactly the conf change we put in, + // down to the bits. + lastIndex, err = s.LastIndex() if err != nil { t.Fatal(err) } - rawNode.ProposeConfChange(cc) - proposed = true - } else if proposed { - // We proposed last cycle, which means we appended the conf change - // in this cycle. - lastIndex, err = s.LastIndex() + entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit) if err != nil { t.Fatal(err) } - break - } - } + if len(entries) != 2 { + t.Fatalf("len(entries) = %d, want %d", len(entries), 2) + } + if !bytes.Equal(entries[0].Data, []byte("somedata")) { + t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata")) + } + typ := pb.EntryConfChange + if _, ok := tc.cc.AsV1(); !ok { + typ = pb.EntryConfChangeV2 + } + if entries[1].Type != typ { + t.Fatalf("type = %v, want %v", entries[1].Type, typ) + } + if !bytes.Equal(entries[1].Data, ccdata) { + t.Errorf("data = %v, want %v", entries[1].Data, ccdata) + } - entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit) - if err != nil { - t.Fatal(err) - } - if len(entries) != 2 { - t.Fatalf("len(entries) = %d, want %d", len(entries), 2) - } - if !bytes.Equal(entries[0].Data, []byte("somedata")) { - t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata")) - } - if entries[1].Type != pb.EntryConfChange { - t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChange) - } - if !bytes.Equal(entries[1].Data, ccdata) { - t.Errorf("data = %v, want %v", entries[1].Data, ccdata) + if exp := &tc.exp; !reflect.DeepEqual(exp, cs) { + t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs) + } + + if exp, act := lastIndex, rawNode.raft.pendingConfIndex; exp != act { + t.Fatalf("pendingConfIndex: expected %d, got %d", exp, act) + } + + // Move the RawNode along. If the ConfChange was simple, nothing else + // should happen. Otherwise, we're in a joint state, which is either + // left automatically or not. If not, we add the proposal that leaves + // it manually. + rd := rawNode.Ready() + var context []byte + if !tc.exp.AutoLeave { + if len(rd.Entries) > 0 { + t.Fatal("expected no more entries") + } + if tc.exp2 == nil { + return + } + context = []byte("manual") + t.Log("leaving joint state manually") + if err := rawNode.ProposeConfChange(pb.ConfChangeV2{Context: context}); err != nil { + t.Fatal(err) + } + rd = rawNode.Ready() + } + + // Check that the right ConfChange comes out. + if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 { + t.Fatalf("expected exactly one more entry, got %+v", rd) + } + var cc pb.ConfChangeV2 + if err := cc.Unmarshal(rd.Entries[0].Data); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(cc, pb.ConfChangeV2{Context: context}) { + t.Fatalf("expected zero ConfChangeV2, got %+v", cc) + } + // Lie and pretend the ConfChange applied. It won't do so because now + // we require the joint quorum and we're only running one node. + cs = rawNode.ApplyConfChange(cc) + if exp := tc.exp2; !reflect.DeepEqual(exp, cs) { + t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs) + } + }) } } @@ -350,7 +542,7 @@ func TestRawNodeStart(t *testing.T) { ApplySnapshot(pb.Snapshot) error } bootstrap := func(storage appenderStorage, cs pb.ConfState) error { - if len(cs.Nodes) == 0 { + if len(cs.Voters) == 0 { return fmt.Errorf("no voters specified") } fi, err := storage.FirstIndex() @@ -378,7 +570,7 @@ func TestRawNodeStart(t *testing.T) { if !IsEmptyHardState(hs) { return fmt.Errorf("HardState not empty") } - if len(ics.Nodes) != 0 { + if len(ics.Voters) != 0 { return fmt.Errorf("ConfState not empty") } @@ -391,7 +583,7 @@ func TestRawNodeStart(t *testing.T) { return storage.ApplySnapshot(snap) } - if err := bootstrap(storage, pb.ConfState{Nodes: []uint64{1}}); err != nil { + if err := bootstrap(storage, pb.ConfState{Voters: []uint64{1}}); err != nil { t.Fatal(err) } @@ -456,7 +648,7 @@ func TestRawNodeRestart(t *testing.T) { func TestRawNodeRestartFromSnapshot(t *testing.T) { snap := pb.Snapshot{ Metadata: pb.SnapshotMetadata{ - ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, + ConfState: pb.ConfState{Voters: []uint64{1, 2}}, Index: 2, Term: 1, }, diff --git a/raft/storage_test.go b/raft/storage_test.go index 6d075eafd74..a4759e0248f 100644 --- a/raft/storage_test.go +++ b/raft/storage_test.go @@ -177,7 +177,7 @@ func TestStorageCompact(t *testing.T) { func TestStorageCreateSnapshot(t *testing.T) { ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}} - cs := &pb.ConfState{Nodes: []uint64{1, 2, 3}} + cs := &pb.ConfState{Voters: []uint64{1, 2, 3}} data := []byte("data") tests := []struct { @@ -263,7 +263,7 @@ func TestStorageAppend(t *testing.T) { } func TestStorageApplySnapshot(t *testing.T) { - cs := &pb.ConfState{Nodes: []uint64{1, 2, 3}} + cs := &pb.ConfState{Voters: []uint64{1, 2, 3}} data := []byte("data") tests := []pb.Snapshot{{Data: data, Metadata: pb.SnapshotMetadata{Index: 4, Term: 4, ConfState: *cs}}, diff --git a/raft/tracker/tracker.go b/raft/tracker/tracker.go index a2638f5f01e..f67f3aa53da 100644 --- a/raft/tracker/tracker.go +++ b/raft/tracker/tracker.go @@ -25,6 +25,11 @@ import ( // Config reflects the configuration tracked in a ProgressTracker. type Config struct { Voters quorum.JointConfig + // AutoLeave is true if the configuration is joint and a transition to the + // incoming configuration should be carried out automatically by Raft when + // this is possible. If false, the configuration will be joint until the + // application initiates the transition manually. + AutoLeave bool // Learners is a set of IDs corresponding to the learners active in the // current configuration. // @@ -80,6 +85,9 @@ func (c Config) String() string { if c.LearnersNext != nil { fmt.Fprintf(&buf, " learners_next=%s", quorum.MajorityConfig(c.LearnersNext).String()) } + if c.AutoLeave { + fmt.Fprintf(&buf, " autoleave") + } return buf.String() } @@ -192,6 +200,9 @@ func (p *ProgressTracker) VoterNodes() []uint64 { // LearnerNodes returns a sorted slice of learners. func (p *ProgressTracker) LearnerNodes() []uint64 { + if len(p.Learners) == 0 { + return nil + } nodes := make([]uint64, 0, len(p.Learners)) for id := range p.Learners { nodes = append(nodes, id) diff --git a/tools/etcd-dump-logs/main.go b/tools/etcd-dump-logs/main.go index 6bb58929687..e9e0b918ad2 100644 --- a/tools/etcd-dump-logs/main.go +++ b/tools/etcd-dump-logs/main.go @@ -35,7 +35,6 @@ import ( "go.etcd.io/etcd/raft/raftpb" "go.etcd.io/etcd/wal" "go.etcd.io/etcd/wal/walpb" - "go.uber.org/zap" ) @@ -83,7 +82,7 @@ func main() { switch err { case nil: walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term - nodes := genIDSlice(snapshot.Metadata.ConfState.Nodes) + nodes := genIDSlice(snapshot.Metadata.ConfState.Voters) fmt.Printf("Snapshot:\nterm=%d index=%d nodes=%s\n", walsnap.Term, walsnap.Index, nodes) case snap.ErrNoSnapshot: