Skip to content

Commit

Permalink
raft: allow use of joint quorums
Browse files Browse the repository at this point in the history
This change introduces joint quorums by changing the Node and RawNode
API to accept pb.ConfChangeV2 (on top of pb.ConfChange).

pb.ConfChange continues to work as today: it allows carrying out a
single configuration change. A pb.ConfChange proposal gets added to
the Raft log as such and is thus also observed by the app during Ready
handling, and fed back to ApplyConfChange.

ConfChangeV2 allows joint configuration changes but will continue to
carry out configuration changes in "one phase" (i.e. without ever
entering a joint config) when this is possible.
  • Loading branch information
tbg committed Jul 23, 2019
1 parent 88f5561 commit b67303c
Show file tree
Hide file tree
Showing 17 changed files with 615 additions and 152 deletions.
4 changes: 2 additions & 2 deletions etcdserver/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
}
}

// TestConfgChangeBlocksApply ensures apply blocks if committed entries contain config-change.
func TestConfgChangeBlocksApply(t *testing.T) {
// TestConfigChangeBlocksApply ensures apply blocks if committed entries contain config-change.
func TestConfigChangeBlocksApply(t *testing.T) {
n := newNopReadyNode()

r := newRaftNode(raftNodeConfig{
Expand Down
35 changes: 25 additions & 10 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"testing"
"time"

"go.uber.org/zap"

"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/etcdserver/api/rafthttp"
"go.etcd.io/etcd/etcdserver/api/snap"
Expand All @@ -49,6 +47,7 @@ import (
"go.etcd.io/etcd/pkg/wait"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.uber.org/zap"
)

// TestDoLocalAction tests requests which do not need to go through raft to be applied,
Expand Down Expand Up @@ -1632,7 +1631,7 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
n.Record(testutil.Action{Name: "Propose", Params: []interface{}{data}})
return nil
}
func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error {
n.Record(testutil.Action{Name: "ProposeConfChange"})
return nil
}
Expand All @@ -1645,7 +1644,7 @@ func (n *nodeRecorder) Ready() <-chan raft.Ready
func (n *nodeRecorder) TransferLeadership(ctx context.Context, lead, transferee uint64) {}
func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil }
func (n *nodeRecorder) Advance() {}
func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState {
n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
return &raftpb.ConfState{}
}
Expand Down Expand Up @@ -1706,21 +1705,37 @@ func newNodeConfChangeCommitterStream() *nodeConfChangeCommitterRecorder {
return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0}
}

func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
data, err := conf.Marshal()
func confChangeActionName(conf raftpb.ConfChangeI) string {
var s string
if confV1, ok := conf.AsV1(); ok {
s = confV1.Type.String()
} else {
for i, chg := range conf.AsV2().Changes {
if i > 0 {
s += "/"
}
s += chg.Type.String()
}
}
return s
}

func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error {
typ, data, err := raftpb.MarshalConfChange(conf)
if err != nil {
return err
}

n.index++
n.Record(testutil.Action{Name: "ProposeConfChange:" + conf.Type.String()})
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: raftpb.EntryConfChange, Data: data}}}
n.Record(testutil.Action{Name: "ProposeConfChange:" + confChangeActionName(conf)})
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: typ, Data: data}}}
return nil
}
func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
return n.readyc
}
func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
n.Record(testutil.Action{Name: "ApplyConfChange:" + conf.Type.String()})
func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState {
n.Record(testutil.Action{Name: "ApplyConfChange:" + confChangeActionName(conf)})
return &raftpb.ConfState{}
}

Expand Down
2 changes: 1 addition & 1 deletion raft/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {
// the invariant that committed < unstable?
rn.raft.raftLog.committed = uint64(len(ents))
for _, peer := range peers {
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
}
return nil
}
10 changes: 7 additions & 3 deletions raft/confchange/confchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Changer struct {
// (Section 4.3) corresponds to `C_{new,old}`.
//
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
func (c Changer) EnterJoint(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
cfg, prs, err := c.checkAndCopy()
if err != nil {
return c.err(err)
Expand Down Expand Up @@ -74,7 +74,7 @@ func (c Changer) EnterJoint(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker
if err := c.apply(&cfg, prs, ccs...); err != nil {
return c.err(err)
}

cfg.AutoLeave = autoLeave
return checkAndReturn(cfg, prs)
}

Expand Down Expand Up @@ -120,6 +120,7 @@ func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
}
}
*outgoingPtr(&cfg.Voters) = nil
cfg.AutoLeave = false

return checkAndReturn(cfg, prs)
}
Expand All @@ -142,7 +143,7 @@ func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.Pro
return c.err(err)
}
if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 {
return tracker.Config{}, nil, errors.New("more than voter changed without entering joint config")
return tracker.Config{}, nil, errors.New("more than one voter changed without entering joint config")
}
if err := checkInvariants(cfg, prs); err != nil {
return tracker.Config{}, tracker.ProgressMap{}, nil
Expand Down Expand Up @@ -327,6 +328,9 @@ func checkInvariants(cfg tracker.Config, prs tracker.ProgressMap) error {
if cfg.LearnersNext != nil {
return fmt.Errorf("LearnersNext must be nil when not joint")
}
if cfg.AutoLeave {
return fmt.Errorf("AutoLeave must be false when not joint")
}
}

return nil
Expand Down
6 changes: 5 additions & 1 deletion raft/confchange/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ func TestConfChangeDataDriven(t *testing.T) {
case "simple":
cfg, prs, err = c.Simple(ccs...)
case "enter-joint":
cfg, prs, err = c.EnterJoint(ccs...)
var autoLeave bool
if len(d.CmdArgs) > 0 {
d.ScanArgs(t, "autoleave", &autoLeave)
}
cfg, prs, err = c.EnterJoint(autoLeave, ccs...)
case "leave-joint":
if len(ccs) > 0 {
err = errors.New("this command takes no input")
Expand Down
25 changes: 24 additions & 1 deletion raft/confchange/quick_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package confchange

import (
"fmt"
"math/rand"
"reflect"
"testing"
Expand All @@ -36,16 +37,38 @@ func TestConfChangeQuick(t *testing.T) {
const infoCount = 5

runWithJoint := func(c *Changer, ccs []pb.ConfChangeSingle) error {
cfg, prs, err := c.EnterJoint(ccs...)
cfg, prs, err := c.EnterJoint(false /* autoLeave */, ccs...)
if err != nil {
return err
}
// Also do this with autoLeave on, just to check that we'd get the same
// result.
cfg2a, prs2a, err := c.EnterJoint(true /* autoLeave */, ccs...)
if err != nil {
return err
}
cfg2a.AutoLeave = false
if !reflect.DeepEqual(cfg, cfg2a) || !reflect.DeepEqual(prs, prs2a) {
return fmt.Errorf("cfg: %+v\ncfg2a: %+v\nprs: %+v\nprs2a: %+v",
cfg, cfg2a, prs, prs2a)
}
c.Tracker.Config = cfg
c.Tracker.Progress = prs
cfg2b, prs2b, err := c.LeaveJoint()
if err != nil {
return err
}
// Reset back to the main branch with autoLeave=false.
c.Tracker.Config = cfg
c.Tracker.Progress = prs
cfg, prs, err = c.LeaveJoint()
if err != nil {
return err
}
if !reflect.DeepEqual(cfg, cfg2b) || !reflect.DeepEqual(prs, prs2b) {
return fmt.Errorf("cfg: %+v\ncfg2b: %+v\nprs: %+v\nprs2b: %+v",
cfg, cfg2b, prs, prs2b)
}
c.Tracker.Config = cfg
c.Tracker.Progress = prs
return nil
Expand Down
29 changes: 29 additions & 0 deletions raft/confchange/testdata/joint_autoleave.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Test the autoleave argument to EnterJoint. It defaults to false in the
# datadriven tests. The flag has no associated semantics in this package,
# it is simply passed through.
simple
v1
----
voters=(1)
1: StateProbe match=0 next=1

# Autoleave is reflected in the config.
enter-joint autoleave=true
v2 v3
----
voters=(1 2 3)&&(1) autoleave
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2
3: StateProbe match=0 next=2

# Can't enter-joint twice, even if autoleave changes.
enter-joint autoleave=false
----
config is already joint

leave-joint
----
voters=(1 2 3)
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2
3: StateProbe match=0 next=2
6 changes: 3 additions & 3 deletions raft/confchange/testdata/simple_safety.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ voters=(1 2) learners=(3)
simple
r1 v5
----
more than voter changed without entering joint config
more than one voter changed without entering joint config

simple
r1 r2
Expand All @@ -30,12 +30,12 @@ removed all voters
simple
v3 v4
----
more than voter changed without entering joint config
more than one voter changed without entering joint config

simple
l1 v5
----
more than voter changed without entering joint config
more than one voter changed without entering joint config

simple
l1 l2
Expand Down
76 changes: 56 additions & 20 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,20 @@ type Node interface {
// Propose proposes that data be appended to the log. Note that proposals can be lost without
// notice, therefore it is user's job to ensure proposal retries.
Propose(ctx context.Context, data []byte) error
// ProposeConfChange proposes config change.
// At most one ConfChange can be in the process of going through consensus.
// Application needs to call ApplyConfChange when applying EntryConfChange type entry.
ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
// ProposeConfChange proposes a configuration change. Like any proposal, the
// configuration change may be dropped with or without an error being
// returned. In particular, configuration changes are dropped unless the
// leader has certainty that there is no prior unapplied configuration
// change in its log.
//
// The method accepts either a pb.ConfChange (deprecated) or pb.ConfChangeV2
// message. The latter allows arbitrary configuration changes via joint
// consensus, notably including replacing a voter. Passing a ConfChangeV2
// message is only allowed if all Nodes participating in the cluster run a
// version of this library aware of the V2 API. See pb.ConfChangeV2 for
// usage details and semantics.
ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error

// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
Step(ctx context.Context, msg pb.Message) error

Expand All @@ -156,11 +166,13 @@ type Node interface {
// a long time to apply the snapshot data. To continue receiving Ready without blocking raft
// progress, it can call Advance before finishing applying the last ready.
Advance()
// ApplyConfChange applies config change to the local node.
// Returns an opaque ConfState protobuf which must be recorded
// in snapshots. Will never return nil; it returns a pointer only
// to match MemoryStorage.Compact.
ApplyConfChange(cc pb.ConfChange) *pb.ConfState
// ApplyConfChange applies a config change (previously passed to
// ProposeConfChange) to the node. This must be called whenever a config
// change is observed in Ready.CommittedEntries.
//
// Returns an opaque non-nil ConfState protobuf which must be recorded in
// snapshots.
ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState

// TransferLeadership attempts to transfer leadership to the given transferee.
TransferLeadership(ctx context.Context, lead, transferee uint64)
Expand Down Expand Up @@ -240,7 +252,7 @@ type msgWithResult struct {
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChange
confc chan pb.ConfChangeV2
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
Expand All @@ -256,7 +268,7 @@ func newNode() node {
return node{
propc: make(chan msgWithResult),
recvc: make(chan pb.Message),
confc: make(chan pb.ConfChange),
confc: make(chan pb.ConfChangeV2),
confstatec: make(chan pb.ConfState),
readyc: make(chan Ready),
advancec: make(chan struct{}),
Expand Down Expand Up @@ -341,11 +353,27 @@ func (n *node) run(rn *RawNode) {
r.Step(m)
}
case cc := <-n.confc:
_, okBefore := r.prs.Progress[r.id]
cs := r.applyConfChange(cc)
if _, ok := r.prs.Progress[r.id]; !ok {
// block incoming proposal when local node is
// removed
if cc.NodeID == r.id {
// If the node was removed, block incoming proposals. Note that we
// only do this if the node was in the config before. Nodes may be
// a member of the group without knowing this (when they're catching
// up on the log and don't have the latest config) and we don't want
// to block the proposal channel in that case.
//
// NB: propc is reset when the leader changes, which, if we learn
// about it, sort of implies that we got readded, maybe? This isn't
// very sound and likely has bugs.
if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
var found bool
for _, sl := range [][]uint64{cs.Nodes, cs.NodesJoint} {
for _, id := range sl {
if id == r.id {
found = true
}
}
}
if !found {
propc = nil
}
}
Expand Down Expand Up @@ -397,12 +425,20 @@ func (n *node) Step(ctx context.Context, m pb.Message) error {
return n.step(ctx, m)
}

func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
data, err := cc.Marshal()
func confChangeToMsg(c pb.ConfChangeI) (pb.Message, error) {
typ, data, err := pb.MarshalConfChange(c)
if err != nil {
return pb.Message{}, err
}
return pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: typ, Data: data}}}, nil
}

func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error {
msg, err := confChangeToMsg(cc)
if err != nil {
return err
}
return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
return n.Step(ctx, msg)
}

func (n *node) step(ctx context.Context, m pb.Message) error {
Expand Down Expand Up @@ -463,10 +499,10 @@ func (n *node) Advance() {
}
}

func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
var cs pb.ConfState
select {
case n.confc <- cc:
case n.confc <- cc.AsV2():
case <-n.done:
}
select {
Expand Down
Loading

0 comments on commit b67303c

Please sign in to comment.