diff --git a/api/v3rpc/rpctypes/error.go b/api/v3rpc/rpctypes/error.go index b820e696e0f..9c04082f65e 100644 --- a/api/v3rpc/rpctypes/error.go +++ b/api/v3rpc/rpctypes/error.go @@ -47,6 +47,7 @@ var ( ErrGRPCMemberNotLearner = status.Error(codes.FailedPrecondition, "etcdserver: can only promote a learner member") ErrGRPCLearnerNotReady = status.Error(codes.FailedPrecondition, "etcdserver: can only promote a learner member which is in sync with leader") ErrGRPCTooManyLearners = status.Error(codes.FailedPrecondition, "etcdserver: too many learner members in cluster") + ErrGRPCClusterIdMismatch = status.Error(codes.FailedPrecondition, "etcdserver: cluster ID mismatch") ErrGRPCRequestTooLarge = status.Error(codes.InvalidArgument, "etcdserver: request is too large") ErrGRPCRequestTooManyRequests = status.Error(codes.ResourceExhausted, "etcdserver: too many requests") @@ -117,6 +118,7 @@ var ( ErrorDesc(ErrGRPCMemberNotLearner): ErrGRPCMemberNotLearner, ErrorDesc(ErrGRPCLearnerNotReady): ErrGRPCLearnerNotReady, ErrorDesc(ErrGRPCTooManyLearners): ErrGRPCTooManyLearners, + ErrorDesc(ErrGRPCClusterIdMismatch): ErrGRPCClusterIdMismatch, ErrorDesc(ErrGRPCRequestTooLarge): ErrGRPCRequestTooLarge, ErrorDesc(ErrGRPCRequestTooManyRequests): ErrGRPCRequestTooManyRequests, @@ -204,6 +206,7 @@ var ( ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken) ErrAuthOldRevision = Error(ErrGRPCAuthOldRevision) ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt) + ErrClusterIdMismatch = Error(ErrGRPCClusterIdMismatch) ErrNoLeader = Error(ErrGRPCNoLeader) ErrNotLeader = Error(ErrGRPCNotLeader) diff --git a/server/etcdserver/api/rafthttp/http.go b/server/etcdserver/api/rafthttp/http.go index 6e6686b4c87..c22e514ed12 100644 --- a/server/etcdserver/api/rafthttp/http.go +++ b/server/etcdserver/api/rafthttp/http.go @@ -54,7 +54,7 @@ var ( RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot") errIncompatibleVersion = errors.New("incompatible version") - errClusterIDMismatch = errors.New("cluster ID mismatch") + ErrClusterIDMismatch = errors.New("cluster ID mismatch") ) type peerGetter interface { @@ -508,7 +508,7 @@ func checkClusterCompatibilityFromHeader(lg *zap.Logger, localID types.ID, heade zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs), zap.String("remote-peer-cluster-id", gcid), ) - return errClusterIDMismatch + return ErrClusterIDMismatch } return nil } diff --git a/server/etcdserver/api/rafthttp/stream.go b/server/etcdserver/api/rafthttp/stream.go index 994e080dc5f..e364ea560e7 100644 --- a/server/etcdserver/api/rafthttp/stream.go +++ b/server/etcdserver/api/rafthttp/stream.go @@ -648,7 +648,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { } return nil, errIncompatibleVersion - case errClusterIDMismatch.Error(): + case ErrClusterIDMismatch.Error(): if cr.lg != nil { cr.lg.Warn( "request sent was ignored by remote peer due to cluster ID mismatch", @@ -656,10 +656,10 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")), zap.String("local-member-id", cr.tr.ID.String()), zap.String("local-member-cluster-id", cr.tr.ClusterID.String()), - zap.Error(errClusterIDMismatch), + zap.Error(ErrClusterIDMismatch), ) } - return nil, errClusterIDMismatch + return nil, ErrClusterIDMismatch default: return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b)) diff --git a/server/etcdserver/api/rafthttp/util.go b/server/etcdserver/api/rafthttp/util.go index 0f8d520714d..854ce7b220a 100644 --- a/server/etcdserver/api/rafthttp/util.go +++ b/server/etcdserver/api/rafthttp/util.go @@ -94,7 +94,7 @@ func checkPostResponse(lg *zap.Logger, resp *http.Response, body []byte, req *ht ) } return errIncompatibleVersion - case errClusterIDMismatch.Error(): + case ErrClusterIDMismatch.Error(): if lg != nil { lg.Error( "request sent was ignored due to cluster ID mismatch", @@ -103,7 +103,7 @@ func checkPostResponse(lg *zap.Logger, resp *http.Response, body []byte, req *ht zap.String("local-member-cluster-id", req.Header.Get("X-Etcd-Cluster-ID")), ) } - return errClusterIDMismatch + return ErrClusterIDMismatch default: return fmt.Errorf("unhandled error %q when precondition failed", string(body)) } diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 0b4596823d1..57e59a58795 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -29,6 +29,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/storage/mvcc" "go.uber.org/zap" @@ -149,6 +150,17 @@ func (cm *corruptionChecker) InitialCheck() error { zap.Strings("remote-peer-endpoints", p.eps), zap.Error(err), ) + case rpctypes.ErrClusterIdMismatch: + cm.lg.Warn( + "cluster ID mismatch", + zap.String("local-member-id", cm.hasher.MemberId().String()), + zap.Int64("local-member-revision", h.Revision), + zap.Int64("local-member-compact-revision", h.CompactRevision), + zap.Uint32("local-member-hash", h.Hash), + zap.String("remote-peer-id", p.id.String()), + zap.Strings("remote-peer-endpoints", p.eps), + zap.Error(err), + ) } } } @@ -466,7 +478,7 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp { var lastErr error for _, ep := range p.eps { ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - resp, lastErr := HashByRev(ctx, cc, ep, rev) + resp, lastErr := HashByRev(ctx, s.cluster.ID(), cc, ep, rev) cancel() if lastErr == nil { resps = append(resps, &peerHashKVResp{peerInfo: p, resp: resp, err: nil}) @@ -510,6 +522,10 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "bad path", http.StatusBadRequest) return } + if gcid := r.Header.Get("X-Etcd-Cluster-ID"); gcid != "" && gcid != h.server.cluster.ID().String() { + http.Error(w, rafthttp.ErrClusterIDMismatch.Error(), http.StatusPreconditionFailed) + return + } defer r.Body.Close() b, err := io.ReadAll(r.Body) @@ -553,7 +569,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // HashByRev fetch hash of kv store at the given rev via http call to the given url -func HashByRev(ctx context.Context, cc *http.Client, url string, rev int64) (*pb.HashKVResponse, error) { +func HashByRev(ctx context.Context, cid types.ID, cc *http.Client, url string, rev int64) (*pb.HashKVResponse, error) { hashReq := &pb.HashKVRequest{Revision: rev} hashReqBytes, err := json.Marshal(hashReq) if err != nil { @@ -566,6 +582,7 @@ func HashByRev(ctx context.Context, cc *http.Client, url string, rev int64) (*pb } req = req.WithContext(ctx) req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Etcd-Cluster-ID", cid.String()) req.Cancel = ctx.Done() resp, err := cc.Do(req) @@ -585,6 +602,10 @@ func HashByRev(ctx context.Context, cc *http.Client, url string, rev int64) (*pb if strings.Contains(string(b), mvcc.ErrFutureRev.Error()) { return nil, rpctypes.ErrFutureRev } + } else if resp.StatusCode == http.StatusPreconditionFailed { + if strings.Contains(string(b), rafthttp.ErrClusterIDMismatch.Error()) { + return nil, rpctypes.ErrClusterIdMismatch + } } if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("unknown error: %s", string(b)) diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index 3fff8a533f3..3fda1f40911 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -15,11 +15,23 @@ package etcdserver import ( + "bytes" "context" + "encoding/json" "fmt" + "io" + "net/http" + "net/http/httptest" + "strconv" + "strings" "testing" "time" + "go.uber.org/zap" + + "go.etcd.io/etcd/server/v3/lease" + betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap/zaptest" @@ -86,6 +98,13 @@ func TestInitialCheck(t *testing.T) { hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}}, expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, }, + { + name: "Cluster ID Mismatch does not fail CorruptionChecker.InitialCheck()", + hasher: fakeHasher{ + peerHashes: []*peerHashKVResp{{err: rpctypes.ErrClusterIdMismatch}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { @@ -203,6 +222,13 @@ func TestPeriodicCheck(t *testing.T) { expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(88)"}, expectCorrupt: true, }, + { + name: "Cluster ID Mismatch does not fail CorruptionChecker.PeriodicCheck()", + hasher: fakeHasher{ + peerHashes: []*peerHashKVResp{{err: rpctypes.ErrClusterIdMismatch}}, + }, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { @@ -388,6 +414,14 @@ func TestCompactHashCheck(t *testing.T) { }, expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)", "MemberId()"}, }, + { + name: "Cluster ID Mismatch does not fail CorruptionChecker.CompactHashCheck()", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}}, + peerHashes: []*peerHashKVResp{{err: rpctypes.ErrClusterIdMismatch}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)", "MemberId()"}, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { @@ -473,3 +507,88 @@ func (f *fakeHasher) TriggerCorruptAlarm(memberId types.ID) { f.actions = append(f.actions, fmt.Sprintf("TriggerCorruptAlarm(%d)", memberId)) f.alarmTriggered = true } + +func TestHashKVHandler(t *testing.T) { + var remoteClusterID = 111195 + var localClusterID = 111196 + var revision = 1 + + etcdSrv := &EtcdServer{} + etcdSrv.cluster = newTestCluster(t, nil) + etcdSrv.cluster.SetID(types.ID(localClusterID), types.ID(localClusterID)) + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + ph := &hashKVHandler{ + lg: zap.NewNop(), + server: etcdSrv, + } + srv := httptest.NewServer(ph) + defer srv.Close() + + tests := []struct { + name string + remoteClusterID int + wcode int + wKeyWords string + }{ + { + name: "HashKV returns 200 if cluster hash matches", + remoteClusterID: localClusterID, + wcode: http.StatusOK, + wKeyWords: "", + }, + { + name: "HashKV returns 400 if cluster hash doesn't matche", + remoteClusterID: remoteClusterID, + wcode: http.StatusPreconditionFailed, + wKeyWords: "cluster ID mismatch", + }, + } + for i, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hashReq := &pb.HashKVRequest{Revision: int64(revision)} + hashReqBytes, err := json.Marshal(hashReq) + if err != nil { + t.Fatalf("failed to marshal request: %v", err) + } + req, err := http.NewRequest(http.MethodGet, srv.URL+PeerHashKVPath, bytes.NewReader(hashReqBytes)) + if err != nil { + t.Fatalf("failed to create request: %v", err) + } + req.Header.Set("X-Etcd-Cluster-ID", strconv.FormatUint(uint64(tt.remoteClusterID), 16)) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("failed to get http response: %v", err) + } + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + t.Fatalf("unexpected io.ReadAll error: %v", err) + } + if resp.StatusCode != tt.wcode { + t.Fatalf("#%d: code = %d, want %d", i, resp.StatusCode, tt.wcode) + } + if resp.StatusCode != http.StatusOK { + if !strings.Contains(string(body), tt.wKeyWords) { + t.Errorf("#%d: body: %s, want body to contain keywords: %s", i, string(body), tt.wKeyWords) + } + return + } + + hashKVResponse := pb.HashKVResponse{} + err = json.Unmarshal(body, &hashKVResponse) + if err != nil { + t.Fatalf("unmarshal response error: %v", err) + } + hashValue, _, err := etcdSrv.KV().HashStorage().HashByRev(int64(revision)) + if err != nil { + t.Fatalf("etcd server hash failed: %v", err) + } + if hashKVResponse.Hash != hashValue.Hash { + t.Fatalf("hash value inconsistent: %d != %d", hashKVResponse.Hash, hashValue) + } + }) + } +} diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index e7c99bcea96..7a463c68949 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -99,6 +99,89 @@ func corruptTest(cx ctlCtx) { e2e.WaitReadyExpectProc(context.TODO(), proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)}) } +func TestInPlaceRecovery(t *testing.T) { + basePort := 20000 + e2e.BeforeTest(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Initialize the cluster. + epcOld, err := e2e.NewEtcdProcessCluster(ctx, t, + e2e.WithInitialClusterToken("old"), + e2e.WithKeepDataDir(false), + e2e.WithCorruptCheckTime(time.Second), + e2e.WithBasePort(basePort), + ) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + t.Cleanup(func() { + if errC := epcOld.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + t.Log("old cluster started.") + + //Put some data into the old cluster, so that after recovering from a blank db, the hash diverges. + t.Log("putting 10 keys...") + oldCc, err := e2e.NewEtcdctl(epcOld.Cfg.Client, epcOld.EndpointsGRPC()) + assert.NoError(t, err) + for i := 0; i < 10; i++ { + err := oldCc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i), config.PutOptions{}) + assert.NoError(t, err, "error on put") + } + + // Create a new cluster config, but with the same port numbers. In this way the new servers can stay in + // contact with the old ones. + epcNewConfig := e2e.NewConfig( + e2e.WithInitialClusterToken("new"), + e2e.WithKeepDataDir(false), + e2e.WithCorruptCheckTime(time.Second), + e2e.WithBasePort(basePort), + e2e.WithInitialCorruptCheck(true), + ) + epcNew, err := e2e.InitEtcdProcessCluster(t, epcNewConfig) + if err != nil { + t.Fatalf("could not init etcd process cluster (%v)", err) + } + t.Cleanup(func() { + if errC := epcNew.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + newCc, err := e2e.NewEtcdctl(epcNew.Cfg.Client, epcNew.EndpointsGRPC()) + assert.NoError(t, err) + + // Rolling recovery of the servers. + t.Log("rolling updating servers in place...") + for i, newProc := range epcNew.Procs { + oldProc := epcOld.Procs[i] + err = oldProc.Close() + if err != nil { + t.Fatalf("could not stop etcd process (%v)", err) + } + t.Logf("old cluster server %d: %s stopped.", i, oldProc.Config().Name) + err = newProc.Start(ctx) + if err != nil { + t.Fatalf("could not start etcd process (%v)", err) + } + t.Logf("new cluster server %d: %s started in-place with blank db.", i, newProc.Config().Name) + t.Log("sleeping 5 sec to let nodes do periodical check...") + time.Sleep(5 * time.Second) + } + t.Log("new cluster started.") + + alarmResponse, err := newCc.AlarmList(ctx) + assert.NoError(t, err, "error on alarm list") + for _, alarm := range alarmResponse.Alarms { + if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT { + t.Fatalf("there is no corruption after in-place recovery, but corruption reported.") + } + } + t.Log("no corruption detected.") +} + func TestPeriodicCheckDetectsCorruption(t *testing.T) { checkTime := time.Second e2e.BeforeTest(t) diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index f00c66f3544..ac8243d9d7e 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -324,6 +324,14 @@ func WithCorruptCheckTime(time time.Duration) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.CorruptCheckTime = time } } +func WithInitialClusterToken(token string) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.InitialToken = token } +} + +func WithInitialCorruptCheck(enabled bool) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.InitialCorruptCheck = enabled } +} + func WithCompactHashCheckEnabled(enabled bool) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.CompactHashCheckEnabled = enabled } } diff --git a/tests/integration/hashkv_test.go b/tests/integration/hashkv_test.go index 3fc10a604d8..238958559e8 100644 --- a/tests/integration/hashkv_test.go +++ b/tests/integration/hashkv_test.go @@ -47,13 +47,14 @@ func TestCompactionHash(t *testing.T) { }, } - testutil.TestCompactionHash(context.Background(), t, hashTestCase{cc, clus.Members[0].GRPCURL(), client}, 1000) + testutil.TestCompactionHash(context.Background(), t, hashTestCase{cc, clus.Members[0].GRPCURL(), client, clus.Members[0].Server}, 1000) } type hashTestCase struct { *clientv3.Client - url string - http *http.Client + url string + http *http.Client + server *etcdserver.EtcdServer } func (tc hashTestCase) Put(ctx context.Context, key, value string) error { @@ -67,7 +68,7 @@ func (tc hashTestCase) Delete(ctx context.Context, key string) error { } func (tc hashTestCase) HashByRev(ctx context.Context, rev int64) (testutil.KeyValueHash, error) { - resp, err := etcdserver.HashByRev(ctx, tc.http, "http://unix", rev) + resp, err := etcdserver.HashByRev(ctx, tc.server.Cluster().ID(), tc.http, "http://unix", rev) return testutil.KeyValueHash{Hash: resp.Hash, CompactRevision: resp.CompactRevision, Revision: resp.Header.Revision}, err }