diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index aee855c79cd..ebd3d5e95e2 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -65,15 +65,17 @@ func init() { // toApply contains entries, snapshot to be applied. Once // an toApply is consumed, the entries will be persisted to -// to raft storage concurrently; the application must read +// raft storage concurrently; the application must read // notifyc before assuming the raft messages are stable. type toApply struct { entries []raftpb.Entry snapshot raftpb.Snapshot // notifyc synchronizes etcd server applies with the raft node notifyc chan struct{} - // confChangeCh synchronizes etcd server applies confChange with raft node - confChangeCh chan struct{} + // raftAdvancedC notifies EtcdServer.apply that + // 'raftLog.applied' has advanced by r.Advance + // it should be used only when entries contain raftpb.EntryConfChange + raftAdvancedC <-chan struct{} } type raftNode struct { @@ -205,12 +207,12 @@ func (r *raftNode) start(rh *raftReadyHandler) { } notifyc := make(chan struct{}, 1) - confChangeCh := make(chan struct{}, 1) + raftAdvancedC := make(chan struct{}, 1) ap := toApply{ - entries: rd.CommittedEntries, - snapshot: rd.Snapshot, - notifyc: notifyc, - confChangeCh: confChangeCh, + entries: rd.CommittedEntries, + snapshot: rd.Snapshot, + notifyc: notifyc, + raftAdvancedC: raftAdvancedC, } updateCommittedIndex(&ap, rh) @@ -273,10 +275,10 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.raftStorage.Append(rd.Entries) - waitApply := false + confChanged := false for _, ent := range rd.CommittedEntries { if ent.Type == raftpb.EntryConfChange { - waitApply = true + confChanged = true break } } @@ -296,7 +298,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { // We simply wait for ALL pending entries to be applied for now. // We might improve this later on if it causes unnecessary long blocking issues. - if waitApply { + if confChanged { // blocks until 'applyAll' calls 'applyWait.Trigger' // to be in sync with scheduled config-change job // (assume notifyc has cap of 1) @@ -317,9 +319,9 @@ func (r *raftNode) start(rh *raftReadyHandler) { // gofail: var raftBeforeAdvance struct{} r.Advance() - if waitApply { + if confChanged { // notify etcdserver that raft has already been notified or advanced. - confChangeCh <- struct{}{} + raftAdvancedC <- struct{}{} } case <-r.stopped: return diff --git a/server/etcdserver/raft_test.go b/server/etcdserver/raft_test.go index e795732cb51..c8d9ff10e97 100644 --- a/server/etcdserver/raft_test.go +++ b/server/etcdserver/raft_test.go @@ -24,13 +24,14 @@ import ( "go.uber.org/zap/zaptest" + "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" + "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/mock/mockstorage" serverstorage "go.etcd.io/etcd/server/v3/storage" - "go.etcd.io/raft/v3" - "go.etcd.io/raft/v3/raftpb" ) func TestGetIDs(t *testing.T) { @@ -231,6 +232,11 @@ func TestConfigChangeBlocksApply(t *testing.T) { // finish toApply, unblock raft routine <-ap.notifyc + select { + case <-ap.raftAdvancedC: + t.Log("recevied raft advance notification") + } + select { case <-continueC: case <-time.After(time.Second): diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index dc81c81764a..4a2128104c9 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1133,7 +1133,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) { return } var shouldstop bool - if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState, apply.confChangeCh); shouldstop { + if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState, apply.raftAdvancedC); shouldstop { go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster")) } } @@ -1649,8 +1649,9 @@ func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() } func (s *EtcdServer) Term() uint64 { return s.getTerm() } type confChangeResponse struct { - membs []*membership.Member - err error + membs []*membership.Member + raftAdvanceC <-chan struct{} + err error } // configure sends a configuration change through consensus and @@ -1673,6 +1674,11 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*me lg.Panic("failed to configure") } resp := x.(*confChangeResponse) + // etcdserver need to ensure the raft has already been notified + // or advanced before it responds to the client. Otherwise, the + // following config change request may be rejected. + // See https://github.com/etcd-io/etcd/issues/15528. + <-resp.raftAdvanceC lg.Info( "applied a configuration change through raft", zap.String("local-member-id", s.MemberId().String()), @@ -1810,7 +1816,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) { func (s *EtcdServer) apply( es []raftpb.Entry, confState *raftpb.ConfState, - confChangeCh chan struct{}, + raftAdvancedC <-chan struct{}, ) (appliedt uint64, appliedi uint64, shouldStop bool) { s.lg.Debug("Applying entries", zap.Int("num-entries", len(es))) for i := range es { @@ -1842,19 +1848,7 @@ func (s *EtcdServer) apply( s.setAppliedIndex(e.Index) s.setTerm(e.Term) shouldStop = shouldStop || removedSelf - - // etcdserver need to ensure the raft has already been notified - // or advanced before it responds to the client. Otherwise, the - // following config change request may be rejected. - // See https://github.com/etcd-io/etcd/issues/15528. - select { - case <-time.After(500 * time.Millisecond): - lg := s.Logger() - lg.Warn("timed out waiting for configChange notification") - case <-confChangeCh: - } - - s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err}) + s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), raftAdvancedC, err}) default: lg := s.Logger() diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 7ca28c0347f..e28df6648ae 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -32,6 +32,9 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zaptest" + "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/client/pkg/v3/fileutil" @@ -58,8 +61,6 @@ import ( betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" "go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/server/v3/storage/schema" - "go.etcd.io/raft/v3" - "go.etcd.io/raft/v3/raftpb" ) // TestDoLocalAction tests requests which do not need to go through raft to be applied, @@ -689,8 +690,9 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { Data: pbutil.MustMarshal(cc), }} - confChangeCh := make(chan struct{}, 1) - _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}, confChangeCh) + raftAdvancedC := make(chan struct{}, 1) + raftAdvancedC <- struct{}{} + _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}, raftAdvancedC) consistIndex := srv.consistIndex.ConsistentIndex() assert.Equal(t, uint64(2), appliedi) @@ -764,8 +766,9 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { ents = append(ents, ent) } - confChangeCh := make(chan struct{}, 1) - _, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}, confChangeCh) + raftAdvancedC := make(chan struct{}, 1) + raftAdvancedC <- struct{}{} + _, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}, raftAdvancedC) if !shouldStop { t.Errorf("shouldStop = %t, want %t", shouldStop, true) } diff --git a/tests/integration/clientv3/cluster_test.go b/tests/integration/clientv3/cluster_test.go index 6ac1e20c69e..6b311c235c5 100644 --- a/tests/integration/clientv3/cluster_test.go +++ b/tests/integration/clientv3/cluster_test.go @@ -294,8 +294,7 @@ func TestMemberPromote(t *testing.T) { // TestMemberPromoteMemberNotLearner ensures that promoting a voting member fails. func TestMemberPromoteMemberNotLearner(t *testing.T) { - // TODO enable this test with integration2.WithFailpoint("raftBeforeAdvance", `sleep(100)`) after PR 15708 is merged - integration2.BeforeTest(t) + integration2.BeforeTest(t, integration2.WithFailpoint("raftBeforeAdvance", `sleep(100)`)) clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3}) defer clus.Terminate(t) @@ -380,7 +379,7 @@ func TestMemberPromoteMemberNotExist(t *testing.T) { // TestMaxLearnerInCluster verifies that the maximum number of learners allowed in a cluster func TestMaxLearnerInCluster(t *testing.T) { - integration2.BeforeTest(t) + integration2.BeforeTest(t, integration2.WithFailpoint("raftBeforeAdvance", `sleep(100)`)) // 1. start with a cluster with 3 voting member and max learner 2 clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3, ExperimentalMaxLearners: 2, DisableStrictReconfigCheck: true}) @@ -388,7 +387,9 @@ func TestMaxLearnerInCluster(t *testing.T) { // 2. adding 2 learner members should succeed for i := 0; i < 2; i++ { - _, err := clus.Client(0).MemberAddAsLearner(context.Background(), []string{fmt.Sprintf("http://127.0.0.1:123%d", i)}) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _, err := clus.Client(0).MemberAddAsLearner(ctx, []string{fmt.Sprintf("http://127.0.0.1:123%d", i)}) + cancel() if err != nil { t.Fatalf("failed to add learner member %v", err) }