Skip to content

Commit

Permalink
tests linearizability: reproduce and prevent 14571
Browse files Browse the repository at this point in the history
Signed-off-by: Chao Chen <chaochn@amazon.com>
  • Loading branch information
chaochn47 committed Nov 22, 2022
1 parent 9cc2f64 commit b532575
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 31 deletions.
46 changes: 46 additions & 0 deletions tests/linearizability/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package linearizability

import (
"context"
"testing"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/interfaces"
)

const (
rootUserName = "root"
rootRoleName = "root"
rootUserPassword = "123"
testUserName = "test-user"
testRoleName = "test-role"
testUserPassword = "abc"
)

func setupAuth(ctx context.Context, t *testing.T, c interfaces.Client) {
if _, err := c.UserAdd(ctx, rootUserName, rootUserPassword, config.UserAddOptions{}); err != nil {
t.Fatal(err)
}
if _, err := c.RoleAdd(ctx, rootRoleName); err != nil {
t.Fatal(err)
}
if _, err := c.UserGrantRole(ctx, rootUserName, rootRoleName); err != nil {
t.Fatal(err)
}
if _, err := c.UserAdd(ctx, testUserName, testUserPassword, config.UserAddOptions{}); err != nil {
t.Fatal(err)
}
if _, err := c.RoleAdd(ctx, testRoleName); err != nil {
t.Fatal(err)
}
if _, err := c.UserGrantRole(ctx, testUserName, testRoleName); err != nil {
t.Fatal(err)
}
if _, err := c.RoleGrantPermission(ctx, testRoleName, "key", "key0", clientv3.PermissionType(clientv3.PermReadWrite)); err != nil {
t.Fatal(err)
}
if err := c.AuthEnable(ctx); err != nil {
t.Fatal(err)
}
}
25 changes: 22 additions & 3 deletions tests/linearizability/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,53 @@ package linearizability

import (
"context"
"fmt"
"time"

"github.com/anishathalye/porcupine"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type recordingClient struct {
client clientv3.Client
id int

operations []porcupine.Operation

lg *zap.Logger
}

func NewClient(endpoints []string, id int) (*recordingClient, error) {
cc, err := clientv3.New(clientv3.Config{
func NewClient(endpoints []string, id int, authEnabled bool, lg *zap.Logger) (*recordingClient, error) {
cfg := clientv3.Config{
DialOptions: []grpc.DialOption{grpc.WithBlock()},
Endpoints: endpoints,
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
}
if authEnabled {
if id == 0 {
// create test user client
cfg.Username = testUserName
cfg.Password = testUserPassword
} else {
cfg.Username = rootUserName
cfg.Password = rootUserPassword
}
}
cc, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
lg.Debug("successfully created recording client", zap.Int("client-id", id), zap.String("client-cfg", fmt.Sprintf("%+v", cfg)))

return &recordingClient{
client: *cc,
id: id,
operations: []porcupine.Operation{},
lg: lg,
}, nil
}

Expand Down
22 changes: 22 additions & 0 deletions tests/linearizability/failpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"
"time"

"go.etcd.io/etcd/tests/v3/framework/config"
"go.uber.org/zap"

clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -63,6 +64,7 @@ var (
CompactBeforeCommitBatchPanic, CompactAfterCommitBatchPanic,
RaftBeforeLeaderSendPanic,
}}
MemberAddFailpoint Failpoint = memberAddFailpoint{}
// TODO: Figure out how to reliably trigger below failpoints and add them to RandomFailpoint
raftBeforeApplySnapPanic Failpoint = goFailpoint{"etcdserver/raftBeforeApplySnap", "panic", nil, AnyMember}
raftAfterApplySnapPanic Failpoint = goFailpoint{"etcdserver/raftAfterApplySnap", "panic", nil, AnyMember}
Expand Down Expand Up @@ -228,3 +230,23 @@ func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.Et
func (f randomFailpoint) Name() string {
return "Random"
}

type memberAddFailpoint struct{}

func (f memberAddFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
// write more than SnapshotCount keys to single leader to make sure snapshot is created
testUserClientOpts := e2e.WithAuth("test-user", "abc")
for i := 0; i <= 10; i++ {
if err := clus.Client(testUserClientOpts).Put(ctx, "key", "test", config.PutOptions{}); err != nil {
panic(err)
}
}
if err := clus.StartNewProc(ctx, nil, t, e2e.WithAuth("root", "123")); err != nil {
return err
}
return nil
}

func (f memberAddFailpoint) Name() string {
return "MemberReconfiguration"
}
96 changes: 70 additions & 26 deletions tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

"github.com/anishathalye/porcupine"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/time/rate"
)

Expand All @@ -43,77 +45,112 @@ const (
func TestLinearizability(t *testing.T) {
testRunner.BeforeTest(t)
tcs := []struct {
name string
failpoint Failpoint
config e2e.EtcdProcessClusterConfig
name string
failpoint FailpointConfig
config e2e.EtcdProcessClusterConfig
clientCount int
trafficType Traffic
}{
{
name: "ClusterOfSize1",
failpoint: RandomFailpoint,
name: "ClusterOfSize1",
failpoint: FailpointConfig{
failpoint: RandomFailpoint,
count: failpointTriggersCount,
waitBetweenTriggers: waitBetweenFailpointTriggers,
},
config: *e2e.NewConfig(
e2e.WithClusterSize(1),
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
),
},
{
name: "ClusterOfSize3",
failpoint: RandomFailpoint,
name: "ClusterOfSize3",
failpoint: FailpointConfig{
failpoint: RandomFailpoint,
count: failpointTriggersCount,
waitBetweenTriggers: waitBetweenFailpointTriggers,
},
config: *e2e.NewConfig(
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
),
},
{
name: "Issue14370",
failpoint: RaftBeforeSavePanic,
name: "Issue14370",
failpoint: FailpointConfig{
failpoint: RaftBeforeSavePanic,
count: failpointTriggersCount,
waitBetweenTriggers: waitBetweenFailpointTriggers,
},
config: *e2e.NewConfig(
e2e.WithClusterSize(1),
e2e.WithGoFailEnabled(true),
),
},
{
name: "Issue14571",
failpoint: FailpointConfig{
failpoint: MemberAddFailpoint,
count: 1,
},
config: e2e.EtcdProcessClusterConfig{
ClusterSize: 1,
SnapshotCount: 2,
},
clientCount: 3,
trafficType: AuthTraffic,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
failpoint := FailpointConfig{
failpoint: tc.failpoint,
count: failpointTriggersCount,
waitBetweenTriggers: waitBetweenFailpointTriggers,
lg := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())).Named(tc.name)
if tc.clientCount == 0 {
tc.clientCount = 8
}
if tc.trafficType == nil {
tc.trafficType = DefaultTraffic
}
traffic := trafficConfig{
minimalQPS: minimalQPS,
maximalQPS: maximalQPS,
clientCount: 8,
traffic: DefaultTraffic,
clientCount: tc.clientCount,
traffic: tc.trafficType,
}
testLinearizability(context.Background(), t, tc.config, failpoint, traffic)
testLinearizability(context.Background(), t, tc.config, tc.failpoint, traffic, lg)
})
}
}

func testLinearizability(ctx context.Context, t *testing.T, config e2e.EtcdProcessClusterConfig, failpoint FailpointConfig, traffic trafficConfig) {
func testLinearizability(ctx context.Context, t *testing.T, config e2e.EtcdProcessClusterConfig, failpoint FailpointConfig, traffic trafficConfig, lg *zap.Logger) {
clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&config))
if err != nil {
t.Fatal(err)
}
defer clus.Close()
if traffic.traffic.AuthEnabled() {
lg.Info("setting up auth")
setupAuth(ctx, t, clus.Client())
}
ctx, cancel := context.WithCancel(ctx)
failpointTriggered := make(chan struct{})
go func() {
defer cancel()
err := triggerFailpoints(ctx, t, clus, failpoint)
err := triggerFailpoints(ctx, t, clus, failpoint, failpointTriggered)
if err != nil {
t.Error(err)
cancel()
}
}()
operations := simulateTraffic(ctx, t, clus, traffic)
operations := simulateTraffic(ctx, t, clus, traffic, lg, failpointTriggered)
err = clus.Stop()
if err != nil {
t.Error(err)
}
checkOperationsAndPersistResults(t, operations, clus)
cancel()
}

func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config FailpointConfig) error {
func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config FailpointConfig, failpointTriggered chan<- struct{}) error {
var err error
successes := 0
failures := 0
Expand All @@ -126,11 +163,13 @@ func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessC
continue
}
successes++
failpointTriggered <- struct{}{}
time.Sleep(config.waitBetweenTriggers)
}
if successes < config.count || failures >= config.count {
return fmt.Errorf("failed to trigger failpoints enough times, err: %v", err)
}
close(failpointTriggered)
return nil
}

Expand All @@ -140,26 +179,31 @@ type FailpointConfig struct {
waitBetweenTriggers time.Duration
}

func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config trafficConfig) (operations []porcupine.Operation) {
func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config trafficConfig, lg *zap.Logger, failpointTriggered <-chan struct{}) (operations []porcupine.Operation) {
mux := sync.Mutex{}
endpoints := clus.EndpointsV3()

// block for failpoint triggered when creating clients
select {
case <-failpointTriggered:
}
endpoints := clus.EndpointsV3()
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)

startTime := time.Now()
wg := sync.WaitGroup{}

for i := 0; i < config.clientCount; i++ {
wg.Add(1)
endpoints := []string{endpoints[i%len(endpoints)]}
c, err := NewClient(endpoints, i)
c, err := NewClient(endpoints, i, config.traffic.AuthEnabled(), lg)
if err != nil {
t.Fatal(err)
t.Fatalf("New client creation failed with %+v", err)
}
go func(c *recordingClient) {
defer wg.Done()
defer c.Close()

config.traffic.Run(ctx, c, limiter)
config.traffic.Run(ctx, c, limiter, lg)
mux.Lock()
operations = append(operations, c.operations...)
mux.Unlock()
Expand Down
Loading

0 comments on commit b532575

Please sign in to comment.