Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.4] allow downgrade from 3.5 #17330

Merged
merged 4 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
return err
}

s.cl, err = membership.NewClusterFromURLsMap(s.lg, cfg.InitialClusterToken, ics)
s.cl, err = membership.NewClusterFromURLsMap(s.lg, cfg.InitialClusterToken, ics, true)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ type Config struct {
// UnsafeNoFsync disables all uses of fsync.
// Setting this is unsafe and will cause data loss.
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
// NextClusterVersionCompatible enables 3.4 to be compatible with next version 3.5, to allow 3.4 server to join 3.5 cluster and start on 3.5 schema.
NextClusterVersionCompatible bool `json:"next-cluster-version-compatible"`
}

// configYAML holds the config suitable for yaml parsing
Expand Down
106 changes: 56 additions & 50 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.etcd.io/etcd/etcdserver/api/v2v3"
"go.etcd.io/etcd/etcdserver/api/v3client"
"go.etcd.io/etcd/etcdserver/api/v3rpc"
"go.etcd.io/etcd/etcdserver/verify"
"go.etcd.io/etcd/pkg/debugutil"
runtimeutil "go.etcd.io/etcd/pkg/runtime"
"go.etcd.io/etcd/pkg/transport"
Expand Down Expand Up @@ -164,56 +165,57 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
backendFreelistType := parseBackendFreelistType(cfg.ExperimentalBackendFreelistType)

srvcfg := etcdserver.ServerConfig{
Name: cfg.Name,
ClientURLs: cfg.AdvertiseClientUrls,
PeerURLs: cfg.AdvertisePeerUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapshotCount: cfg.SnapshotCount,
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
NewCluster: cfg.IsNewCluster(),
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
QuotaBackendBytes: cfg.QuotaBackendBytes,
BackendBatchLimit: cfg.BackendBatchLimit,
BackendFreelistType: backendFreelistType,
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
BcryptCost: cfg.BcryptCost,
TokenTTL: cfg.AuthTokenTTL,
CORS: cfg.CORS,
HostWhitelist: cfg.HostWhitelist,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
LoggerConfig: cfg.loggerConfig,
LoggerCore: cfg.loggerCore,
LoggerWriteSyncer: cfg.loggerWriteSyncer,
Debug: cfg.Debug,
ForceNewCluster: cfg.ForceNewCluster,
EnableGRPCGateway: cfg.EnableGRPCGateway,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
Name: cfg.Name,
ClientURLs: cfg.AdvertiseClientUrls,
PeerURLs: cfg.AdvertisePeerUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapshotCount: cfg.SnapshotCount,
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
NewCluster: cfg.IsNewCluster(),
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
QuotaBackendBytes: cfg.QuotaBackendBytes,
BackendBatchLimit: cfg.BackendBatchLimit,
BackendFreelistType: backendFreelistType,
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
BcryptCost: cfg.BcryptCost,
TokenTTL: cfg.AuthTokenTTL,
CORS: cfg.CORS,
HostWhitelist: cfg.HostWhitelist,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
LoggerConfig: cfg.loggerConfig,
LoggerCore: cfg.loggerCore,
LoggerWriteSyncer: cfg.loggerWriteSyncer,
Debug: cfg.Debug,
ForceNewCluster: cfg.ForceNewCluster,
EnableGRPCGateway: cfg.EnableGRPCGateway,
UnsafeNoFsync: cfg.UnsafeNoFsync,
NextClusterVersionCompatible: cfg.NextClusterVersionCompatible,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
}
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
Expand Down Expand Up @@ -375,6 +377,10 @@ func (e *Etcd) Close() {
defer func() {
if lg != nil {
lg.Info("closed etcd server", fields...)
verify.MustVerifyIfEnabled(verify.Config{
Logger: lg,
DataDir: e.cfg.Dir,
})
lg.Sync()
}
}()
Expand Down
2 changes: 1 addition & 1 deletion etcdctl/ctlv3/command/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func prepareBackend() backend.Backend {

func rebuildStoreV2() (v2store.Store, uint64) {
var index uint64
cl := membership.NewCluster(zap.NewExample(), "")
cl := membership.NewCluster(zap.NewExample(), "", true)

waldir := migrateWALdir
if len(waldir) == 0 {
Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.StrictReconfigCheck, "strict-reconfig-check", cfg.ec.StrictReconfigCheck, "Reject reconfiguration requests that would cause quorum loss.")
fs.BoolVar(&cfg.ec.EnableV2, "enable-v2", cfg.ec.EnableV2, "Accept etcd V2 client requests.")
fs.BoolVar(&cfg.ec.PreVote, "pre-vote", cfg.ec.PreVote, "Enable to run an additional Raft election phase.")
fs.BoolVar(&cfg.ec.NextClusterVersionCompatible, "next-cluster-version-compatible", false, "Enable 3.4 to be compatible with next version 3.5, to allow 3.4 server to join 3.5 cluster and start on 3.5 schema")

// proxy
fs.Var(cfg.cf.proxy, "proxy", fmt.Sprintf("Valid values include %q", cfg.cf.proxy.Valids()))
Expand Down
2 changes: 1 addition & 1 deletion etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func startProxy(cfg *config) error {

clientURLs := []string{}
uf := func() []string {
gcls, gerr := etcdserver.GetClusterFromRemotePeers(lg, peerURLs, tr)
gcls, gerr := etcdserver.GetClusterFromRemotePeers(lg, peerURLs, tr, cfg.ec.NextClusterVersionCompatible)
if gerr != nil {
if lg != nil {
lg.Warn(
Expand Down
2 changes: 2 additions & 0 deletions etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ Clustering:
Interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.
--enable-v2 '` + strconv.FormatBool(embed.DefaultEnableV2) + `'
Accept etcd V2 client requests.
--next-cluster-version-compatible 'false'
Enable 3.4 to be compatible with next version 3.5, to allow 3.4 server to join 3.5 cluster and start on 3.5 schema.

Security:
--cert-file ''
Expand Down
1 change: 1 addition & 0 deletions etcdserver/api/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
"3.2.0": {AuthCapability: true, V3rpcCapability: true},
"3.3.0": {AuthCapability: true, V3rpcCapability: true},
"3.4.0": {AuthCapability: true, V3rpcCapability: true},
"3.5.0": {AuthCapability: true, V3rpcCapability: true},
}

enableMapMu sync.RWMutex
Expand Down
58 changes: 40 additions & 18 deletions etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type RaftCluster struct {
// removed contains the ids of removed members in the cluster.
// removed id cannot be reused.
removed map[types.ID]bool
// NextClusterVersionCompatible allows downgrade from 3.5 to 3.4.
NextClusterVersionCompatible bool
}

// ConfigChangeContext represents a context for confChange.
Expand All @@ -72,8 +74,8 @@ type ConfigChangeContext struct {

// NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating
// cluster with raft learner member.
func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
c := NewCluster(lg, token)
func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap, nextClusterVersionCompatible bool) (*RaftCluster, error) {
c := NewCluster(lg, token, nextClusterVersionCompatible)
for name, urls := range urlsmap {
m := NewMember(name, urls, token, nil)
if _, ok := c.members[m.ID]; ok {
Expand All @@ -88,21 +90,22 @@ func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap)
return c, nil
}

func NewClusterFromMembers(lg *zap.Logger, token string, id types.ID, membs []*Member) *RaftCluster {
c := NewCluster(lg, token)
func NewClusterFromMembers(lg *zap.Logger, token string, id types.ID, membs []*Member, nextClusterVersionCompatible bool) *RaftCluster {
c := NewCluster(lg, token, nextClusterVersionCompatible)
c.cid = id
for _, m := range membs {
c.members[m.ID] = m
}
return c
}

func NewCluster(lg *zap.Logger, token string) *RaftCluster {
func NewCluster(lg *zap.Logger, token string, nextClusterVersionCompatible bool) *RaftCluster {
return &RaftCluster{
lg: lg,
token: token,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
lg: lg,
token: token,
NextClusterVersionCompatible: nextClusterVersionCompatible,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
}
}

Expand Down Expand Up @@ -248,7 +251,7 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {

c.members, c.removed = membersFromStore(c.lg, c.v2store)
c.version = clusterVersionFromStore(c.lg, c.v2store)
mustDetectDowngrade(c.lg, c.version)
mustDetectDowngrade(c.lg, c.version, c.NextClusterVersionCompatible)
onSet(c.lg, c.version)

for _, m := range c.members {
Expand Down Expand Up @@ -567,7 +570,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
}
oldVer := c.version
c.version = ver
mustDetectDowngrade(c.lg, c.version)
mustDetectDowngrade(c.lg, c.version, c.NextClusterVersionCompatible)
if c.v2store != nil {
mustSaveClusterVersionToStore(c.v2store, ver)
}
Expand Down Expand Up @@ -786,23 +789,42 @@ func ValidateClusterAndAssignIDs(lg *zap.Logger, local *RaftCluster, existing *R
return nil
}

func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version) {
lv := semver.Must(semver.NewVersion(version.Version))
// only keep major.minor version for comparison against cluster version
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
if cv != nil && lv.LessThan(*cv) {
func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version, nextClusterVersionCompatible bool) {
err := detectDowngrade(cv, nextClusterVersionCompatible)
if err != nil {
if lg != nil {
lg.Fatal(
"invalid downgrade; server version is lower than determined cluster version",
err.Error(),
zap.String("current-server-version", version.Version),
zap.String("determined-cluster-version", version.Cluster(cv.String())),
)
} else {
plog.Fatalf("cluster cannot be downgraded (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String()))
plog.Fatal(err)
}
}
}

func detectDowngrade(cv *semver.Version, nextClusterVersionCompatible bool) error {
if cv == nil {
return nil
}
lv := semver.Must(semver.NewVersion(version.Version))
// only keep major.minor version for comparison against cluster version
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
if !lv.LessThan(*cv) {
return nil
}
// allow 3.4 server to join 3.5 cluster. Note the local data schema will
// be automatically migrated to 3.4 if `--next-cluster-version-compatible`
// is enabled (true). Users can also execute `etcdutl migrate` to migrate
// the data before starting the server.
oneMinorVersionDown := &semver.Version{Major: cv.Major, Minor: cv.Minor - 1}
if !nextClusterVersionCompatible || !lv.Equal(*oneMinorVersionDown) {
return fmt.Errorf("invalid downgrade; (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String()))
}
return nil
}

// IsLocalMemberLearner returns if the local member is raft learner
func (c *RaftCluster) IsLocalMemberLearner() bool {
c.Lock()
Expand Down
54 changes: 53 additions & 1 deletion etcdserver/api/membership/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"testing"

"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/etcdserver/api/v2store"
"go.etcd.io/etcd/pkg/mock/mockstore"
"go.etcd.io/etcd/pkg/testutil"
Expand Down Expand Up @@ -276,7 +277,7 @@ func TestClusterValidateAndAssignIDs(t *testing.T) {
}

func TestClusterValidateConfigurationChange(t *testing.T) {
cl := NewCluster(zap.NewExample(), "")
cl := NewCluster(zap.NewExample(), "", false)
cl.SetStore(v2store.New())
for i := 1; i <= 4; i++ {
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}}
Expand Down Expand Up @@ -946,3 +947,54 @@ func TestIsReadyToPromoteMember(t *testing.T) {
}
}
}

func TestDetectDowngrade(t *testing.T) {
tests := []struct {
clusterVersion string
nextClusterVersionCompatible bool
expectErr bool
}{
{
expectErr: false,
},
{
clusterVersion: "3.5.0",
expectErr: true,
},
{
clusterVersion: "3.5.0",
nextClusterVersionCompatible: true,
expectErr: false,
},
{
clusterVersion: "3.6.0",
expectErr: true,
},
{
clusterVersion: "3.6.0",
nextClusterVersionCompatible: true,
expectErr: true,
},
{
clusterVersion: "3.4.0",
expectErr: false,
},
{
clusterVersion: "3.3.0",
expectErr: false,
},
}
for i, tt := range tests {
var cv *semver.Version
if len(tt.clusterVersion) > 0 {
cv = semver.Must(semver.NewVersion(tt.clusterVersion))
}
err := detectDowngrade(cv, tt.nextClusterVersionCompatible)
if tt.expectErr && err == nil {
t.Errorf("%d: expect detectDowngrade error, got nil", i)
}
if !tt.expectErr && err != nil {
t.Errorf("%d: expect no detectDowngrade error, got %v", i, err)
}
}
}
Loading
Loading