Skip to content

Commit

Permalink
tests linearizability: reproduce and prevent 14571
Browse files Browse the repository at this point in the history
Signed-off-by: Chao Chen <chaochn@amazon.com>
  • Loading branch information
chaochn47 committed Jan 6, 2023
1 parent 6200b22 commit b279ce6
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 66 deletions.
2 changes: 1 addition & 1 deletion bill-of-materials.json
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@
]
},
{
"project": "github.com/stretchr/testify/assert",
"project": "github.com/stretchr/testify",
"licenses": [
{
"type": "MIT License",
Expand Down
10 changes: 5 additions & 5 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,16 +854,16 @@ func findMemberIDByEndpoint(members []*etcdserverpb.Member, endpoint string) (ui

// WaitLeader returns index of the member in c.Members() that is leader
// or fails the test (if not established in 30s).
func (epc *EtcdProcessCluster) WaitLeader(t testing.TB) int {
func (epc *EtcdProcessCluster) WaitLeader(t testing.TB, opts ...config.ClientOption) int {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return epc.WaitMembersForLeader(ctx, t, epc.Procs)
return epc.WaitMembersForLeader(ctx, t, epc.Procs, opts...)
}

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []EtcdProcess) int {
cc := epc.Client()
func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []EtcdProcess, opts ...config.ClientOption) int {
cc := epc.Client(opts...)

// ensure leader is up via linearizable get
for {
Expand All @@ -887,7 +887,7 @@ func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testi
default:
}
for i := range membs {
resp, err := membs[i].Client().Status(ctx)
resp, err := membs[i].Client(opts...).Status(ctx)
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
// if member[i] has stopped
Expand Down
12 changes: 6 additions & 6 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,18 +405,18 @@ func (c *Cluster) WaitMembersMatch(t testutil.TB, membs []*pb.Member) {

// WaitLeader returns index of the member in c.Members that is leader
// or fails the test (if not established in 30s).
func (c *Cluster) WaitLeader(t testing.TB) int {
return c.WaitMembersForLeader(t, c.Members)
func (c *Cluster) WaitLeader(t testing.TB, opts ...framecfg.ClientOption) int {
return c.WaitMembersForLeader(t, c.Members, opts...)
}

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (c *Cluster) WaitMembersForLeader(t testing.TB, membs []*Member) int {
func (c *Cluster) WaitMembersForLeader(t testing.TB, membs []*Member, opts ...framecfg.ClientOption) int {
t.Logf("WaitMembersForLeader")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
l := 0
for l = c.waitMembersForLeader(ctx, t, membs); l < 0; {
for l = c.waitMembersForLeader(ctx, t, membs, opts...); l < 0; {
if ctx.Err() != nil {
t.Fatalf("WaitLeader FAILED: %v", ctx.Err())
}
Expand All @@ -437,13 +437,13 @@ func (c *Cluster) WaitMembersForLeader(t testing.TB, membs []*Member) int {

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (c *Cluster) waitMembersForLeader(ctx context.Context, t testing.TB, membs []*Member) int {
func (c *Cluster) waitMembersForLeader(ctx context.Context, t testing.TB, membs []*Member, opts ...framecfg.ClientOption) int {
possibleLead := make(map[uint64]bool)
var lead uint64
for _, m := range membs {
possibleLead[uint64(m.Server.MemberId())] = true
}
cc, err := c.ClusterClient(t)
cc, err := c.ClusterClient(t, opts...)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/framework/interfaces/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type TestRunner interface {
type Cluster interface {
Members() []Member
Client(opts ...config.ClientOption) (Client, error)
WaitLeader(t testing.TB) int
WaitLeader(t testing.TB, opts ...config.ClientOption) int
Close() error
Endpoints() []string
}
Expand Down
20 changes: 17 additions & 3 deletions tests/linearizability/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (

"go.uber.org/zap"

"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/integration"

"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
Expand All @@ -29,13 +32,17 @@ type recordingClient struct {
history *appendableHistory
}

func NewClient(endpoints []string, ids idProvider) (*recordingClient, error) {
cc, err := clientv3.New(clientv3.Config{
func NewClient(endpoints []string, ids idProvider, opts ...config.ClientOption) (*recordingClient, error) {
cfg := &clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
}
for _, opt := range opts {
opt(cfg)
}
cc, err := clientv3.New(*cfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -94,3 +101,10 @@ func (c *recordingClient) Txn(ctx context.Context, key, expectedValue, newValue
c.history.AppendTxn(key, expectedValue, newValue, callTime, returnTime, resp, err)
return err
}

func clientOption(authEnabled bool) config.ClientOption {
if !authEnabled {
return func(any) {}
}
return integration.WithAuth(rootUserName, rootUserPassword)
}
84 changes: 68 additions & 16 deletions tests/linearizability/failpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.etcd.io/etcd/tests/v3/framework/config"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
Expand All @@ -36,7 +39,8 @@ const (
)

var (
KillFailpoint Failpoint = killFailpoint{}
KillFailpoint Failpoint = killFailpoint{target: AnyMember}
EnableAuthKillFailpoint Failpoint = killFailpoint{enableAuth: true, target: Follower}
DefragBeforeCopyPanic Failpoint = goPanicFailpoint{"defragBeforeCopy", triggerDefrag, AnyMember}
DefragBeforeRenamePanic Failpoint = goPanicFailpoint{"defragBeforeRename", triggerDefrag, AnyMember}
BeforeCommitPanic Failpoint = goPanicFailpoint{"beforeCommit", nil, AnyMember}
Expand Down Expand Up @@ -77,14 +81,21 @@ var (
)

type Failpoint interface {
Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error
Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster, lg *zap.Logger) error
Name() string
}

type killFailpoint struct{}
type killFailpoint struct {
enableAuth bool
target failpointTarget
}

func (f killFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
func (f killFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster, lg *zap.Logger) error {
opt := func(any) {}
if f.enableAuth {
opt = e2e.WithAuth(rootUserName, rootUserPassword)
}
member := pickMember(f.target, t, clus, opt)

killCtx, cancel := context.WithTimeout(ctx, triggerTimeout)
defer cancel()
Expand All @@ -99,17 +110,52 @@ func (f killFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.Etcd
}
}

err := member.Start(ctx)
if err != nil {
return err
// get endpoints excluding the killed member client URLs
endpoints := make([]string, 0, len(clus.EndpointsV3()))
for _, ed := range clus.EndpointsV3() {
if ed != member.EndpointsV3()[0] {
endpoints = append(endpoints, ed)
}
}
return nil

if f.enableAuth {
require.NoError(t, addTestUserAuth(ctx, endpoints))
}

return member.Start(ctx)
}

func (f killFailpoint) Name() string {
return "Kill"
}

func addTestUserAuth(ctx context.Context, endpoints []string) (err error) {
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
Username: rootUserName,
Password: rootUserPassword,
})
if err != nil {
return err
}
if _, err := cc.UserAdd(ctx, testUserName, testUserPassword); err != nil {
return err
}
if _, err := cc.RoleAdd(ctx, testRoleName); err != nil {
return err
}
if _, err := cc.UserGrantRole(ctx, testUserName, testRoleName); err != nil {
return err
}
if _, err := cc.RoleGrantPermission(ctx, testRoleName, startKey, endKey, clientv3.PermissionType(clientv3.PermReadWrite)); err != nil {
return err
}
return cc.Close()
}

type goPanicFailpoint struct {
failpoint string
trigger func(ctx context.Context, member e2e.EtcdProcess) error
Expand All @@ -121,10 +167,11 @@ type failpointTarget string
const (
AnyMember failpointTarget = "AnyMember"
Leader failpointTarget = "Leader"
Follower failpointTarget = "Follower"
)

func (f goPanicFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
member := f.pickMember(t, clus)
func (f goPanicFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster, lg *zap.Logger) error {
member := pickMember(f.target, t, clus)
address := fmt.Sprintf("127.0.0.1:%d", member.Config().GoFailPort)

triggerCtx, cancel := context.WithTimeout(ctx, triggerTimeout)
Expand Down Expand Up @@ -154,12 +201,17 @@ func (f goPanicFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.E
return nil
}

func (f goPanicFailpoint) pickMember(t *testing.T, clus *e2e.EtcdProcessCluster) e2e.EtcdProcess {
switch f.target {
func pickMember(target failpointTarget, t *testing.T, clus *e2e.EtcdProcessCluster, opts ...config.ClientOption) e2e.EtcdProcess {
switch target {
case AnyMember:
return clus.Procs[rand.Int()%len(clus.Procs)]
case Leader:
return clus.Procs[clus.WaitLeader(t)]
return clus.Procs[clus.WaitLeader(t, opts...)]
case Follower:
if len(clus.Procs) == 1 {
panic("single node cluster does not have follower")
}
return clus.Procs[(clus.WaitLeader(t, opts...)+1)%len(clus.Procs)]
default:
panic("unknown target")
}
Expand Down Expand Up @@ -238,10 +290,10 @@ type randomFailpoint struct {
failpoints []Failpoint
}

func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster, lg *zap.Logger) error {
failpoint := f.failpoints[rand.Int()%len(f.failpoints)]
t.Logf("Triggering %v failpoint\n", failpoint.Name())
return failpoint.Trigger(t, ctx, clus)
return failpoint.Trigger(t, ctx, clus, lg)
}

func (f randomFailpoint) Name() string {
Expand Down
16 changes: 5 additions & 11 deletions tests/linearizability/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (h *appendableHistory) appendFailed(request EtcdRequest, start time.Time, e
Output: EtcdResponse{Err: err},
Return: 0, // For failed writes we don't know when request has really finished.
})

// Operations of single client needs to be sequential.
// As we don't know return time of failed operations, all new writes need to be done with new client id.
h.id = h.idProvider.ClientId()
Expand All @@ -134,19 +135,12 @@ type history struct {
failed []porcupine.Operation
}

func (h history) Merge(h2 history) history {
result := history{
successful: make([]porcupine.Operation, 0, len(h.successful)+len(h2.successful)),
failed: make([]porcupine.Operation, 0, len(h.failed)+len(h2.failed)),
}
result.successful = append(result.successful, h.successful...)
result.successful = append(result.successful, h2.successful...)
result.failed = append(result.failed, h.failed...)
result.failed = append(result.failed, h2.failed...)
return result
func (h *history) Merge(h2 history) {
h.successful = append(h.successful, h2.successful...)
h.failed = append(h.failed, h2.failed...)
}

func (h history) Operations() []porcupine.Operation {
func (h *history) Operations() []porcupine.Operation {
operations := make([]porcupine.Operation, 0, len(h.successful)+len(h.failed))
var maxTime int64
for _, op := range h.successful {
Expand Down
Loading

0 comments on commit b279ce6

Please sign in to comment.