Skip to content

Commit

Permalink
server/etcdserver/raft.go:
Browse files Browse the repository at this point in the history
1. rename confChangeCh to raftAdvancedC
2. rename waitApply to confChanged
3. add comments and test assertion

Signed-off-by: Chao Chen <chaochn@amazon.com>
  • Loading branch information
chaochn47 committed Jun 27, 2023
1 parent ad3b6ee commit 6cdc9ae
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 42 deletions.
28 changes: 15 additions & 13 deletions server/etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,17 @@ func init() {

// toApply contains entries, snapshot to be applied. Once
// an toApply is consumed, the entries will be persisted to
// to raft storage concurrently; the application must read
// raft storage concurrently; the application must read
// notifyc before assuming the raft messages are stable.
type toApply struct {
entries []raftpb.Entry
snapshot raftpb.Snapshot
// notifyc synchronizes etcd server applies with the raft node
notifyc chan struct{}
// confChangeCh synchronizes etcd server applies confChange with raft node
confChangeCh chan struct{}
// raftAdvancedC notifies EtcdServer.apply that
// 'raftLog.applied' has advanced by r.Advance
// it should be used only when entries contain raftpb.EntryConfChange
raftAdvancedC <-chan struct{}
}

type raftNode struct {
Expand Down Expand Up @@ -205,12 +207,12 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}

notifyc := make(chan struct{}, 1)
confChangeCh := make(chan struct{}, 1)
raftAdvancedC := make(chan struct{}, 1)
ap := toApply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
confChangeCh: confChangeCh,
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
raftAdvancedC: raftAdvancedC,
}

updateCommittedIndex(&ap, rh)
Expand Down Expand Up @@ -273,10 +275,10 @@ func (r *raftNode) start(rh *raftReadyHandler) {

r.raftStorage.Append(rd.Entries)

waitApply := false
confChanged := false
for _, ent := range rd.CommittedEntries {
if ent.Type == raftpb.EntryConfChange {
waitApply = true
confChanged = true
break
}
}
Expand All @@ -296,7 +298,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// We simply wait for ALL pending entries to be applied for now.
// We might improve this later on if it causes unnecessary long blocking issues.

if waitApply {
if confChanged {
// blocks until 'applyAll' calls 'applyWait.Trigger'
// to be in sync with scheduled config-change job
// (assume notifyc has cap of 1)
Expand All @@ -317,9 +319,9 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// gofail: var raftBeforeAdvance struct{}
r.Advance()

if waitApply {
if confChanged {
// notify etcdserver that raft has already been notified or advanced.
confChangeCh <- struct{}{}
raftAdvancedC <- struct{}{}
}
case <-r.stopped:
return
Expand Down
10 changes: 8 additions & 2 deletions server/etcdserver/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (

"go.uber.org/zap/zaptest"

"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"

"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/mock/mockstorage"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)

func TestGetIDs(t *testing.T) {
Expand Down Expand Up @@ -231,6 +232,11 @@ func TestConfigChangeBlocksApply(t *testing.T) {
// finish toApply, unblock raft routine
<-ap.notifyc

select {
case <-ap.raftAdvancedC:
t.Log("recevied raft advance notification")
}

select {
case <-continueC:
case <-time.After(time.Second):
Expand Down
28 changes: 11 additions & 17 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
return
}
var shouldstop bool
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState, apply.confChangeCh); shouldstop {
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState, apply.raftAdvancedC); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
}
Expand Down Expand Up @@ -1649,8 +1649,9 @@ func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() }
func (s *EtcdServer) Term() uint64 { return s.getTerm() }

type confChangeResponse struct {
membs []*membership.Member
err error
membs []*membership.Member
raftAdvanceC <-chan struct{}
err error
}

// configure sends a configuration change through consensus and
Expand All @@ -1673,6 +1674,11 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*me
lg.Panic("failed to configure")
}
resp := x.(*confChangeResponse)
// etcdserver need to ensure the raft has already been notified
// or advanced before it responds to the client. Otherwise, the
// following config change request may be rejected.
// See https://github.com/etcd-io/etcd/issues/15528.
<-resp.raftAdvanceC
lg.Info(
"applied a configuration change through raft",
zap.String("local-member-id", s.MemberId().String()),
Expand Down Expand Up @@ -1810,7 +1816,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
func (s *EtcdServer) apply(
es []raftpb.Entry,
confState *raftpb.ConfState,
confChangeCh chan struct{},
raftAdvancedC <-chan struct{},
) (appliedt uint64, appliedi uint64, shouldStop bool) {
s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
for i := range es {
Expand Down Expand Up @@ -1842,19 +1848,7 @@ func (s *EtcdServer) apply(
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
shouldStop = shouldStop || removedSelf

// etcdserver need to ensure the raft has already been notified
// or advanced before it responds to the client. Otherwise, the
// following config change request may be rejected.
// See https://github.com/etcd-io/etcd/issues/15528.
select {
case <-time.After(500 * time.Millisecond):
lg := s.Logger()
lg.Warn("timed out waiting for configChange notification")
case <-confChangeCh:
}

s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), raftAdvancedC, err})

default:
lg := s.Logger()
Expand Down
15 changes: 9 additions & 6 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
Expand All @@ -58,8 +61,6 @@ import (
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)

// TestDoLocalAction tests requests which do not need to go through raft to be applied,
Expand Down Expand Up @@ -689,8 +690,9 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
Data: pbutil.MustMarshal(cc),
}}

confChangeCh := make(chan struct{}, 1)
_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}, confChangeCh)
raftAdvancedC := make(chan struct{}, 1)
raftAdvancedC <- struct{}{}
_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}, raftAdvancedC)
consistIndex := srv.consistIndex.ConsistentIndex()
assert.Equal(t, uint64(2), appliedi)

Expand Down Expand Up @@ -764,8 +766,9 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
ents = append(ents, ent)
}

confChangeCh := make(chan struct{}, 1)
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}, confChangeCh)
raftAdvancedC := make(chan struct{}, 1)
raftAdvancedC <- struct{}{}
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}, raftAdvancedC)
if !shouldStop {
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
}
Expand Down
9 changes: 5 additions & 4 deletions tests/integration/clientv3/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,7 @@ func TestMemberPromote(t *testing.T) {

// TestMemberPromoteMemberNotLearner ensures that promoting a voting member fails.
func TestMemberPromoteMemberNotLearner(t *testing.T) {
// TODO enable this test with integration2.WithFailpoint("raftBeforeAdvance", `sleep(100)`) after PR 15708 is merged
integration2.BeforeTest(t)
integration2.BeforeTest(t, integration2.WithFailpoint("raftBeforeAdvance", `sleep(100)`))

clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
defer clus.Terminate(t)
Expand Down Expand Up @@ -380,15 +379,17 @@ func TestMemberPromoteMemberNotExist(t *testing.T) {

// TestMaxLearnerInCluster verifies that the maximum number of learners allowed in a cluster
func TestMaxLearnerInCluster(t *testing.T) {
integration2.BeforeTest(t)
integration2.BeforeTest(t, integration2.WithFailpoint("raftBeforeAdvance", `sleep(100)`))

// 1. start with a cluster with 3 voting member and max learner 2
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3, ExperimentalMaxLearners: 2, DisableStrictReconfigCheck: true})
defer clus.Terminate(t)

// 2. adding 2 learner members should succeed
for i := 0; i < 2; i++ {
_, err := clus.Client(0).MemberAddAsLearner(context.Background(), []string{fmt.Sprintf("http://127.0.0.1:123%d", i)})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err := clus.Client(0).MemberAddAsLearner(ctx, []string{fmt.Sprintf("http://127.0.0.1:123%d", i)})
cancel()
if err != nil {
t.Fatalf("failed to add learner member %v", err)
}
Expand Down

0 comments on commit 6cdc9ae

Please sign in to comment.