diff --git a/tests/robustness/failpoint/cluster.go b/tests/robustness/failpoint/cluster.go index 0658def953f0..6da5e1221a4e 100644 --- a/tests/robustness/failpoint/cluster.go +++ b/tests/robustness/failpoint/cluster.go @@ -28,6 +28,8 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/report" ) var ( @@ -36,7 +38,7 @@ var ( type memberReplace struct{} -func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { memberID := uint64(rand.Int() % len(clus.Procs)) member := clus.Procs[memberID] var endpoints []string @@ -50,12 +52,12 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, DialKeepAliveTimeout: 100 * time.Millisecond, }) if err != nil { - return err + return nil, err } defer cc.Close() memberID, found, err := getID(ctx, cc, member.Config().Name) if err != nil { - return err + return nil, err } if !found { t.Fatal("Member not found") @@ -65,11 +67,11 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, lg.Info("Removing member", zap.String("member", member.Config().Name)) _, err = cc.MemberRemove(ctx, memberID) if err != nil { - return err + return nil, err } _, found, err = getID(ctx, cc, member.Config().Name) if err != nil { - return err + return nil, err } if found { t.Fatal("Expected member to be removed") @@ -83,13 +85,13 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, err = member.Wait(ctx) if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { lg.Info("Failed to kill the process", zap.Error(err)) - return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) + return nil, fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) } } lg.Info("Removing member data", zap.String("member", member.Config().Name)) err = os.RemoveAll(member.Config().DataDirPath) if err != nil { - return err + return nil, err } lg.Info("Adding member back", zap.String("member", member.Config().Name)) @@ -97,7 +99,7 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, for { select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() default: } reqCtx, cancel := context.WithTimeout(ctx, time.Second) @@ -109,17 +111,17 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, } err = patchArgs(member.Config().Args, "initial-cluster-state", "existing") if err != nil { - return err + return nil, err } lg.Info("Starting member", zap.String("member", member.Config().Name)) err = member.Start(ctx) if err != nil { - return err + return nil, err } for { select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() default: } _, found, err := getID(ctx, cc, member.Config().Name) @@ -130,7 +132,7 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, break } } - return nil + return nil, nil } func (f memberReplace) Name() string { diff --git a/tests/robustness/failpoint/failpoint.go b/tests/robustness/failpoint/failpoint.go index c21b15c47fe1..3c1d2b1bf979 100644 --- a/tests/robustness/failpoint/failpoint.go +++ b/tests/robustness/failpoint/failpoint.go @@ -26,6 +26,8 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/report" ) const ( @@ -38,9 +40,9 @@ var ( DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic, BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic, BackendAfterWritebackBufPanic, - //CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic, - //CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic, - //CompactAfterCommitBatchPanic, + CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic, + CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic, + CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork, RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot, @@ -77,7 +79,7 @@ func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint) error { return nil } -func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time) (*InjectionReport, error) { +func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time, ids identity.Provider) (*FailpointReport, error) { ctx, cancel := context.WithTimeout(ctx, triggerTimeout) defer cancel() var err error @@ -87,7 +89,7 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro } lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name())) start := time.Since(baseTime) - err = failpoint.Inject(ctx, t, lg, clus) + clientReport, err := failpoint.Inject(ctx, t, lg, clus, baseTime, ids) if err != nil { lg.Error("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err)) return nil, fmt.Errorf("failed triggering failpoint, err: %v", err) @@ -98,14 +100,22 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro lg.Info("Finished triggering failpoint", zap.String("failpoint", failpoint.Name())) end := time.Since(baseTime) - return &InjectionReport{ - Start: start, - End: end, - Name: failpoint.Name(), + return &FailpointReport{ + Injection: Injection{ + Start: start, + End: end, + Name: failpoint.Name(), + }, + Client: clientReport, }, nil } -type InjectionReport struct { +type FailpointReport struct { + Injection + Client []report.ClientReport +} + +type Injection struct { Start, End time.Duration Name string } @@ -139,7 +149,7 @@ func verifyClusterHealth(ctx context.Context, _ *testing.T, clus *e2e.EtcdProces } type Failpoint interface { - Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error + Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) Name() string AvailabilityChecker } diff --git a/tests/robustness/failpoint/gofail.go b/tests/robustness/failpoint/gofail.go index 3d90c5ddd8f8..786862c91b8e 100644 --- a/tests/robustness/failpoint/gofail.go +++ b/tests/robustness/failpoint/gofail.go @@ -25,6 +25,8 @@ import ( "go.uber.org/zap" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/report" ) var ( @@ -73,13 +75,13 @@ const ( Follower failpointTarget = "Follower" ) -func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) (reports []report.ClientReport, err error) { member := f.pickMember(t, clus) for member.IsRunning() { select { case <-ctx.Done(): - return ctx.Err() + return reports, ctx.Err() default: } lg.Info("Setting up gofailpoint", zap.String("failpoint", f.Name())) @@ -94,16 +96,23 @@ func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logg } if f.trigger != nil { lg.Info("Triggering gofailpoint", zap.String("failpoint", f.Name())) - err = f.trigger.Trigger(ctx, t, member, clus) + r, err := f.trigger.Trigger(ctx, t, member, clus, baseTime, ids) if err != nil { lg.Info("gofailpoint trigger failed", zap.String("failpoint", f.Name()), zap.Error(err)) } + if r != nil { + reports = append(reports, r...) + } + } + if !member.IsRunning() { + // TODO: Check member logs that etcd not running is caused panic caused by proper gofailpoint. + break } lg.Info("Waiting for member to exit", zap.String("member", member.Config().Name)) err = member.Wait(ctx) if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { lg.Info("Member didn't exit as expected", zap.String("member", member.Config().Name), zap.Error(err)) - return fmt.Errorf("member didn't exit as expected: %v", err) + return reports, fmt.Errorf("member didn't exit as expected: %v", err) } lg.Info("Member exited as expected", zap.String("member", member.Config().Name)) } @@ -112,11 +121,11 @@ func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logg lg.Info("Removing data that was not fsynced") err := lazyfs.ClearCache(ctx) if err != nil { - return err + return reports, err } } - return member.Start(ctx) + return reports, member.Start(ctx) } func (f goPanicFailpoint) pickMember(t *testing.T, clus *e2e.EtcdProcessCluster) e2e.EtcdProcess { @@ -155,7 +164,7 @@ type killAndGofailSleep struct { time time.Duration } -func (f killAndGofailSleep) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f killAndGofailSleep) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] for member.IsRunning() { err := member.Kill() @@ -165,20 +174,20 @@ func (f killAndGofailSleep) Inject(ctx context.Context, t *testing.T, lg *zap.Lo err = member.Wait(ctx) if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { lg.Info("Failed to kill the process", zap.Error(err)) - return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) + return nil, fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) } } lg.Info("Setting up goFailpoint", zap.String("failpoint", f.Name())) err := member.Failpoints().SetupEnv(f.failpoint, fmt.Sprintf(`sleep(%q)`, f.time)) if err != nil { - return err + return nil, err } err = member.Start(ctx) if err != nil { - return err + return nil, err } // TODO: Check gofail status (https://github.com/etcd-io/gofail/pull/47) and wait for sleep to beis executed at least once. - return nil + return nil, nil } func (f killAndGofailSleep) Name() string { @@ -201,22 +210,22 @@ type gofailSleepAndDeactivate struct { time time.Duration } -func (f gofailSleepAndDeactivate) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f gofailSleepAndDeactivate) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] lg.Info("Setting up gofailpoint", zap.String("failpoint", f.Name())) err := member.Failpoints().SetupHTTP(ctx, f.failpoint, fmt.Sprintf(`sleep(%q)`, f.time)) if err != nil { lg.Info("goFailpoint setup failed", zap.String("failpoint", f.Name()), zap.Error(err)) - return fmt.Errorf("goFailpoint %s setup failed, err:%w", f.Name(), err) + return nil, fmt.Errorf("goFailpoint %s setup failed, err:%w", f.Name(), err) } time.Sleep(f.time) lg.Info("Deactivating gofailpoint", zap.String("failpoint", f.Name())) err = member.Failpoints().DeactivateHTTP(ctx, f.failpoint) if err != nil { lg.Info("goFailpoint deactivate failed", zap.String("failpoint", f.Name()), zap.Error(err)) - return fmt.Errorf("goFailpoint %s deactivate failed, err: %w", f.Name(), err) + return nil, fmt.Errorf("goFailpoint %s deactivate failed, err: %w", f.Name(), err) } - return nil + return nil, nil } func (f gofailSleepAndDeactivate) Name() string { diff --git a/tests/robustness/failpoint/kill.go b/tests/robustness/failpoint/kill.go index ef1d26e1394f..b0b5ff4fb2ce 100644 --- a/tests/robustness/failpoint/kill.go +++ b/tests/robustness/failpoint/kill.go @@ -20,10 +20,13 @@ import ( "math/rand" "strings" "testing" + "time" "go.uber.org/zap" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/report" ) var ( @@ -32,7 +35,7 @@ var ( type killFailpoint struct{} -func (f killFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f killFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] for member.IsRunning() { @@ -43,21 +46,21 @@ func (f killFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, err = member.Wait(ctx) if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { lg.Info("Failed to kill the process", zap.Error(err)) - return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) + return nil, fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) } } if lazyfs := member.LazyFS(); lazyfs != nil { lg.Info("Removing data that was not fsynced") err := lazyfs.ClearCache(ctx) if err != nil { - return err + return nil, err } } err := member.Start(ctx) if err != nil { - return err + return nil, err } - return nil + return nil, nil } func (f killFailpoint) Name() string { diff --git a/tests/robustness/failpoint/network.go b/tests/robustness/failpoint/network.go index 5d59fba3d99c..48edc912e130 100644 --- a/tests/robustness/failpoint/network.go +++ b/tests/robustness/failpoint/network.go @@ -24,6 +24,8 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/report" ) var ( @@ -37,9 +39,9 @@ type blackholePeerNetworkFailpoint struct { triggerBlackhole } -func (f blackholePeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f blackholePeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] - return f.Trigger(ctx, t, member, clus) + return f.Trigger(ctx, t, member, clus, baseTime, ids) } func (f blackholePeerNetworkFailpoint) Name() string { @@ -50,8 +52,8 @@ type triggerBlackhole struct { waitTillSnapshot bool } -func (tb triggerBlackhole) Trigger(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error { - return Blackhole(ctx, t, member, clus, tb.waitTillSnapshot) +func (tb triggerBlackhole) Trigger(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { + return nil, Blackhole(ctx, t, member, clus, tb.waitTillSnapshot) } func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess) bool { @@ -153,7 +155,7 @@ type delayPeerNetworkFailpoint struct { randomizedLatency time.Duration } -func (f delayPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f delayPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] proxy := member.PeerProxy() @@ -164,7 +166,7 @@ func (f delayPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg lg.Info("Traffic delay removed", zap.String("member", member.Config().Name)) proxy.UndelayRx() proxy.UndelayTx() - return nil + return nil, nil } func (f delayPeerNetworkFailpoint) Name() string { @@ -180,7 +182,7 @@ type dropPeerNetworkFailpoint struct { dropProbabilityPercent int } -func (f dropPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f dropPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] proxy := member.PeerProxy() @@ -191,7 +193,7 @@ func (f dropPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg * lg.Info("Traffic drop removed", zap.String("member", member.Config().Name)) proxy.UnmodifyRx() proxy.UnmodifyTx() - return nil + return nil, nil } func (f dropPeerNetworkFailpoint) modifyPacket(data []byte) []byte { diff --git a/tests/robustness/failpoint/trigger.go b/tests/robustness/failpoint/trigger.go index 79316a3abf57..96c4f9e08d80 100644 --- a/tests/robustness/failpoint/trigger.go +++ b/tests/robustness/failpoint/trigger.go @@ -25,16 +25,19 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/report" + "go.etcd.io/etcd/tests/v3/robustness/traffic" ) type trigger interface { - Trigger(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error + Trigger(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) AvailabilityChecker } type triggerDefrag struct{} -func (t triggerDefrag) Trigger(ctx context.Context, _ *testing.T, member e2e.EtcdProcess, _ *e2e.EtcdProcessCluster) error { +func (t triggerDefrag) Trigger(ctx context.Context, _ *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { cc, err := clientv3.New(clientv3.Config{ Endpoints: member.EndpointsGRPC(), Logger: zap.NewNop(), @@ -42,14 +45,14 @@ func (t triggerDefrag) Trigger(ctx context.Context, _ *testing.T, member e2e.Etc DialKeepAliveTimeout: 100 * time.Millisecond, }) if err != nil { - return fmt.Errorf("failed creating client: %w", err) + return nil, fmt.Errorf("failed creating client: %w", err) } defer cc.Close() _, err = cc.Defragment(ctx, member.EndpointsGRPC()[0]) if err != nil && !strings.Contains(err.Error(), "error reading from server: EOF") { - return err + return nil, err } - return nil + return nil, nil } func (t triggerDefrag) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool { @@ -60,37 +63,32 @@ type triggerCompact struct { multiBatchCompaction bool } -func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error { - cc, err := clientv3.New(clientv3.Config{ - Endpoints: member.EndpointsGRPC(), - Logger: zap.NewNop(), - DialKeepAliveTime: 10 * time.Second, - DialKeepAliveTimeout: 100 * time.Millisecond, - }) +func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + cc, err := traffic.NewRecordingClient(member.EndpointsGRPC(), ids, baseTime) if err != nil { - return fmt.Errorf("failed creating client: %w", err) + return nil, fmt.Errorf("failed creating client: %w", err) } defer cc.Close() var rev int64 for { - resp, gerr := cc.Get(ctx, "/") - if gerr != nil { - return gerr + _, rev, err = cc.Get(ctx, "/", 0) + if err != nil { + return nil, err } - rev = resp.Header.Revision if !t.multiBatchCompaction || rev > int64(clus.Cfg.ServerConfig.ExperimentalCompactionBatchLimit) { break } time.Sleep(50 * time.Millisecond) } - _, err = cc.Compact(ctx, rev) if err != nil && !strings.Contains(err.Error(), "error reading from server: EOF") { - return err + return nil, err } - return nil + return []report.ClientReport{cc.Report()}, nil } func (t triggerCompact) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool { diff --git a/tests/robustness/main_test.go b/tests/robustness/main_test.go index db9e483c3683..3e59e5b3c5e5 100644 --- a/tests/robustness/main_test.go +++ b/tests/robustness/main_test.go @@ -108,8 +108,8 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu ctx, cancel := context.WithCancel(ctx) defer cancel() g := errgroup.Group{} - var operationReport, watchReport []report.ClientReport - failpointInjected := make(chan failpoint.InjectionReport, 1) + var operationReport, watchReport, failpointClientReport []report.ClientReport + failpointInjected := make(chan failpoint.Injection, 1) // using baseTime time-measuring operation to get monotonic clock reading // see https://github.com/golang/go/blob/master/src/time/time.go#L17 @@ -119,7 +119,7 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu defer close(failpointInjected) // Give some time for traffic to reach qps target before injecting failpoint. time.Sleep(time.Second) - fr, err := failpoint.Inject(ctx, t, lg, clus, s.failpoint, baseTime) + fr, err := failpoint.Inject(ctx, t, lg, clus, s.failpoint, baseTime, ids) if err != nil { t.Error(err) cancel() @@ -127,7 +127,8 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu // Give some time for traffic to reach qps target after injecting failpoint. time.Sleep(time.Second) if fr != nil { - failpointInjected <- *fr + failpointInjected <- fr.Injection + failpointClientReport = fr.Client } return nil }) @@ -145,7 +146,7 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu return nil }) g.Wait() - return append(operationReport, watchReport...) + return append(operationReport, append(failpointClientReport, watchReport...)...) } func operationsMaxRevision(reports []report.ClientReport) int64 { diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index ec13b79d3943..2fb75aa571cd 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -51,7 +51,7 @@ type TimedWatchEvent struct { Time time.Duration } -func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) { +func NewRecordingClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) { cc, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, Logger: zap.NewNop(), diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index 431a0fb870b7..95624f3cf2a3 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -52,7 +52,7 @@ var ( } ) -func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, failpointInjected <-chan failpoint.InjectionReport, baseTime time.Time, ids identity.Provider) []report.ClientReport { +func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, failpointInjected <-chan failpoint.Injection, baseTime time.Time, ids identity.Provider) []report.ClientReport { mux := sync.Mutex{} endpoints := clus.EndpointsGRPC() @@ -60,7 +60,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 reports := []report.ClientReport{} limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), 200) - cc, err := NewClient(endpoints, ids, baseTime) + cc, err := NewRecordingClient(endpoints, ids, baseTime) if err != nil { t.Fatal(err) } @@ -77,7 +77,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 startTime := time.Since(baseTime) for i := 0; i < profile.ClientCount; i++ { wg.Add(1) - c, nerr := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime) + c, nerr := NewRecordingClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime) if nerr != nil { t.Fatal(nerr) } @@ -91,7 +91,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 mux.Unlock() }(c) } - var fr *failpoint.InjectionReport + var fr *failpoint.Injection select { case frp, ok := <-failpointInjected: if !ok { diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index 2ba77b36f59d..ab798e290c6c 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -32,7 +32,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd reports := make([]report.ClientReport, len(clus.Procs)) memberMaxRevisionChans := make([]chan int64, len(clus.Procs)) for i, member := range clus.Procs { - c, err := traffic.NewClient(member.EndpointsGRPC(), ids, baseTime) + c, err := traffic.NewRecordingClient(member.EndpointsGRPC(), ids, baseTime) if err != nil { t.Fatal(err) }