diff --git a/tests/linearizability/auth.go b/tests/linearizability/auth.go new file mode 100644 index 000000000000..a5d380acfd92 --- /dev/null +++ b/tests/linearizability/auth.go @@ -0,0 +1,46 @@ +package linearizability + +import ( + "context" + "testing" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/framework/config" + "go.etcd.io/etcd/tests/v3/framework/interfaces" +) + +const ( + rootUserName = "root" + rootRoleName = "root" + rootUserPassword = "123" + testUserName = "test-user" + testRoleName = "test-role" + testUserPassword = "abc" +) + +func setupAuth(ctx context.Context, t *testing.T, c interfaces.Client) { + if _, err := c.UserAdd(ctx, rootUserName, rootUserPassword, config.UserAddOptions{}); err != nil { + t.Fatal(err) + } + if _, err := c.RoleAdd(ctx, rootRoleName); err != nil { + t.Fatal(err) + } + if _, err := c.UserGrantRole(ctx, rootUserName, rootRoleName); err != nil { + t.Fatal(err) + } + if _, err := c.UserAdd(ctx, testUserName, testUserPassword, config.UserAddOptions{}); err != nil { + t.Fatal(err) + } + if _, err := c.RoleAdd(ctx, testRoleName); err != nil { + t.Fatal(err) + } + if _, err := c.UserGrantRole(ctx, testUserName, testRoleName); err != nil { + t.Fatal(err) + } + if _, err := c.RoleGrantPermission(ctx, testRoleName, "key", "key0", clientv3.PermissionType(clientv3.PermReadWrite)); err != nil { + t.Fatal(err) + } + if err := c.AuthEnable(ctx); err != nil { + t.Fatal(err) + } +} diff --git a/tests/linearizability/client.go b/tests/linearizability/client.go index 5addf729769b..fd1d3c7ea77d 100644 --- a/tests/linearizability/client.go +++ b/tests/linearizability/client.go @@ -16,11 +16,13 @@ package linearizability import ( "context" + "fmt" "time" "github.com/anishathalye/porcupine" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" + "google.golang.org/grpc" ) type recordingClient struct { @@ -28,22 +30,39 @@ type recordingClient struct { id int operations []porcupine.Operation + + lg *zap.Logger } -func NewClient(endpoints []string, id int) (*recordingClient, error) { - cc, err := clientv3.New(clientv3.Config{ +func NewClient(endpoints []string, id int, authEnabled bool, lg *zap.Logger) (*recordingClient, error) { + cfg := clientv3.Config{ + DialOptions: []grpc.DialOption{grpc.WithBlock()}, Endpoints: endpoints, Logger: zap.NewNop(), DialKeepAliveTime: 1 * time.Millisecond, DialKeepAliveTimeout: 5 * time.Millisecond, - }) + } + if authEnabled { + if id == 0 { + // create test user client + cfg.Username = testUserName + cfg.Password = testUserPassword + } else { + cfg.Username = rootUserName + cfg.Password = rootUserPassword + } + } + cc, err := clientv3.New(cfg) if err != nil { return nil, err } + lg.Debug("successfully created recording client", zap.Int("client-id", id), zap.String("client-cfg", fmt.Sprintf("%+v", cfg))) + return &recordingClient{ client: *cc, id: id, operations: []porcupine.Operation{}, + lg: lg, }, nil } diff --git a/tests/linearizability/failpoints.go b/tests/linearizability/failpoints.go index 247951c19ef1..8438267f805f 100644 --- a/tests/linearizability/failpoints.go +++ b/tests/linearizability/failpoints.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "go.etcd.io/etcd/tests/v3/framework/config" "go.uber.org/zap" clientv3 "go.etcd.io/etcd/client/v3" @@ -63,6 +64,7 @@ var ( CompactBeforeCommitBatchPanic, CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, }} + MemberAddFailpoint Failpoint = memberAddFailpoint{} // TODO: Figure out how to reliably trigger below failpoints and add them to RandomFailpoint raftBeforeApplySnapPanic Failpoint = goFailpoint{"etcdserver/raftBeforeApplySnap", "panic", nil, AnyMember} raftAfterApplySnapPanic Failpoint = goFailpoint{"etcdserver/raftAfterApplySnap", "panic", nil, AnyMember} @@ -228,3 +230,23 @@ func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.Et func (f randomFailpoint) Name() string { return "Random" } + +type memberAddFailpoint struct{} + +func (f memberAddFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error { + // write more than SnapshotCount keys to single leader to make sure snapshot is created + testUserClientOpts := e2e.WithAuth("test-user", "abc") + for i := 0; i <= 10; i++ { + if err := clus.Client(testUserClientOpts).Put(ctx, "key", "test", config.PutOptions{}); err != nil { + panic(err) + } + } + if err := clus.StartNewProc(ctx, nil, t, e2e.WithAuth("root", "123")); err != nil { + return err + } + return nil +} + +func (f memberAddFailpoint) Name() string { + return "MemberReconfiguration" +} diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index 9749c5b8a78e..2d9112133723 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -26,6 +26,8 @@ import ( "github.com/anishathalye/porcupine" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" "golang.org/x/time/rate" ) @@ -43,13 +45,19 @@ const ( func TestLinearizability(t *testing.T) { testRunner.BeforeTest(t) tcs := []struct { - name string - failpoint Failpoint - config e2e.EtcdProcessClusterConfig + name string + failpoint FailpointConfig + config e2e.EtcdProcessClusterConfig + clientCount int + trafficType Traffic }{ { - name: "ClusterOfSize1", - failpoint: RandomFailpoint, + name: "ClusterOfSize1", + failpoint: FailpointConfig{ + failpoint: RandomFailpoint, + count: failpointTriggersCount, + waitBetweenTriggers: waitBetweenFailpointTriggers, + }, config: *e2e.NewConfig( e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true), @@ -57,63 +65,92 @@ func TestLinearizability(t *testing.T) { ), }, { - name: "ClusterOfSize3", - failpoint: RandomFailpoint, + name: "ClusterOfSize3", + failpoint: FailpointConfig{ + failpoint: RandomFailpoint, + count: failpointTriggersCount, + waitBetweenTriggers: waitBetweenFailpointTriggers, + }, config: *e2e.NewConfig( e2e.WithGoFailEnabled(true), e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints ), }, { - name: "Issue14370", - failpoint: RaftBeforeSavePanic, + name: "Issue14370", + failpoint: FailpointConfig{ + failpoint: RaftBeforeSavePanic, + count: failpointTriggersCount, + waitBetweenTriggers: waitBetweenFailpointTriggers, + }, config: *e2e.NewConfig( e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true), ), }, + { + name: "Issue14571", + failpoint: FailpointConfig{ + failpoint: MemberAddFailpoint, + count: 1, + }, + config: e2e.EtcdProcessClusterConfig{ + ClusterSize: 1, + SnapshotCount: 2, + }, + clientCount: 3, + trafficType: AuthTraffic, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - failpoint := FailpointConfig{ - failpoint: tc.failpoint, - count: failpointTriggersCount, - waitBetweenTriggers: waitBetweenFailpointTriggers, + lg := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())).Named(tc.name) + if tc.clientCount == 0 { + tc.clientCount = 8 + } + if tc.trafficType == nil { + tc.trafficType = DefaultTraffic } traffic := trafficConfig{ minimalQPS: minimalQPS, maximalQPS: maximalQPS, - clientCount: 8, - traffic: DefaultTraffic, + clientCount: tc.clientCount, + traffic: tc.trafficType, } - testLinearizability(context.Background(), t, tc.config, failpoint, traffic) + testLinearizability(context.Background(), t, tc.config, tc.failpoint, traffic, lg) }) } } -func testLinearizability(ctx context.Context, t *testing.T, config e2e.EtcdProcessClusterConfig, failpoint FailpointConfig, traffic trafficConfig) { +func testLinearizability(ctx context.Context, t *testing.T, config e2e.EtcdProcessClusterConfig, failpoint FailpointConfig, traffic trafficConfig, lg *zap.Logger) { clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&config)) if err != nil { t.Fatal(err) } defer clus.Close() + if traffic.traffic.AuthEnabled() { + lg.Info("setting up auth") + setupAuth(ctx, t, clus.Client()) + } ctx, cancel := context.WithCancel(ctx) + failpointTriggered := make(chan struct{}) go func() { - defer cancel() - err := triggerFailpoints(ctx, t, clus, failpoint) + err := triggerFailpoints(ctx, t, clus, failpoint, failpointTriggered) if err != nil { t.Error(err) + cancel() } }() - operations := simulateTraffic(ctx, t, clus, traffic) + operations := simulateTraffic(ctx, t, clus, traffic, lg, failpointTriggered) err = clus.Stop() if err != nil { t.Error(err) } checkOperationsAndPersistResults(t, operations, clus) + cancel() } -func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config FailpointConfig) error { +func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config FailpointConfig, failpointTriggered chan<- struct{}) error { var err error successes := 0 failures := 0 @@ -126,11 +163,13 @@ func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessC continue } successes++ + failpointTriggered <- struct{}{} time.Sleep(config.waitBetweenTriggers) } if successes < config.count || failures >= config.count { return fmt.Errorf("failed to trigger failpoints enough times, err: %v", err) } + close(failpointTriggered) return nil } @@ -140,26 +179,31 @@ type FailpointConfig struct { waitBetweenTriggers time.Duration } -func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config trafficConfig) (operations []porcupine.Operation) { +func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config trafficConfig, lg *zap.Logger, failpointTriggered <-chan struct{}) (operations []porcupine.Operation) { mux := sync.Mutex{} - endpoints := clus.EndpointsV3() + // block for failpoint triggered when creating clients + select { + case <-failpointTriggered: + } + endpoints := clus.EndpointsV3() limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200) startTime := time.Now() wg := sync.WaitGroup{} + for i := 0; i < config.clientCount; i++ { wg.Add(1) endpoints := []string{endpoints[i%len(endpoints)]} - c, err := NewClient(endpoints, i) + c, err := NewClient(endpoints, i, config.traffic.AuthEnabled(), lg) if err != nil { - t.Fatal(err) + t.Fatalf("New client creation failed with %+v", err) } go func(c *recordingClient) { defer wg.Done() defer c.Close() - config.traffic.Run(ctx, c, limiter) + config.traffic.Run(ctx, c, limiter, lg) mux.Lock() operations = append(operations, c.operations...) mux.Unlock() diff --git a/tests/linearizability/traffic.go b/tests/linearizability/traffic.go index 83c0e101ca56..0eeb223987a3 100644 --- a/tests/linearizability/traffic.go +++ b/tests/linearizability/traffic.go @@ -18,17 +18,21 @@ import ( "context" "fmt" "math/rand" + "strings" "time" + "go.uber.org/zap" "golang.org/x/time/rate" ) var ( DefaultTraffic Traffic = readWriteSingleKey{key: "key", writes: []opChance{{operation: Put, chance: 100}}} + AuthTraffic Traffic = authTraffic{readWriteSingleKey{key: "key", writes: []opChance{{operation: Put, chance: 100}}}} ) type Traffic interface { - Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) + Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, lg *zap.Logger) + AuthEnabled() bool } type readWriteSingleKey struct { @@ -41,7 +45,7 @@ type opChance struct { chance int } -func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) { +func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, lg *zap.Logger) { maxOperationsPerClient := 1000000 minId := maxOperationsPerClient * c.id maxId := maxOperationsPerClient * (c.id + 1) @@ -105,3 +109,37 @@ func (t readWriteSingleKey) pickWriteOperation() Operation { } panic("unexpected") } + +func (t readWriteSingleKey) AuthEnabled() bool { return false } + +type authTraffic struct { + readWriteSingleKey +} + +func (t authTraffic) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, lg *zap.Logger) { + lg.Info("start running") + maxOperationsPerClient := 100 + minId := maxOperationsPerClient * c.id + maxId := maxOperationsPerClient * (c.id + 1) + + for writeId := minId; writeId < maxId; { + select { + case <-ctx.Done(): + lg.Warn("context is done, cancelling the request") + return + default: + } + // Execute one read per one write to avoid operation history include too many failed writes when etcd is down. + err := t.Read(ctx, c, limiter) + if err != nil && !strings.Contains(err.Error(), "etcdserver: permission denied") { + lg.Warn("failed to read", zap.Int("client-id", c.id), zap.Int("write-id", writeId), zap.Error(err)) + continue + } + // Provide each write with unique id to make it easier to validate operation history. + t.Write(ctx, c, limiter, writeId) + writeId++ + } + return +} + +func (t authTraffic) AuthEnabled() bool { return true }