Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

functional-tester: randomize failure injection sequence (by default) #9515

Merged
merged 14 commits into from
Apr 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 13 additions & 20 deletions tools/functional-tester/agent/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ func (srv *Server) handleInitialStartEtcd(req *rpcpb.Request) (*rpcpb.Response,
}
srv.creatEtcdCmd()

srv.logger.Info("starting etcd process")
srv.logger.Info("starting etcd")
err = srv.startEtcdCmd()
if err != nil {
return nil, err
}
srv.logger.Info("started etcd process", zap.String("command-path", srv.etcdCmd.Path))
srv.logger.Info("started etcd", zap.String("command-path", srv.etcdCmd.Path))

// wait some time for etcd listener start
// before setting up proxy
Expand Down Expand Up @@ -248,15 +248,17 @@ func (srv *Server) startEtcdCmd() error {
func (srv *Server) handleRestartEtcd() (*rpcpb.Response, error) {
srv.creatEtcdCmd()

srv.logger.Info("restarting etcd process")
srv.logger.Info("restarting etcd")
err := srv.startEtcdCmd()
if err != nil {
return nil, err
}
srv.logger.Info("restarted etcd process", zap.String("command-path", srv.etcdCmd.Path))
srv.logger.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path))

// wait some time for etcd listener start
// before setting up proxy
// TODO: local tests should handle port conflicts
// with clients on restart
time.Sleep(time.Second)
if err = srv.startProxy(); err != nil {
return nil, err
Expand All @@ -269,21 +271,14 @@ func (srv *Server) handleRestartEtcd() (*rpcpb.Response, error) {
}

func (srv *Server) handleKillEtcd() (*rpcpb.Response, error) {
if srv.last != rpcpb.Operation_InitialStartEtcd && srv.last != rpcpb.Operation_RestartEtcd {
return &rpcpb.Response{
Success: false,
Status: fmt.Sprintf("%q is not valid; last server operation was %q", rpcpb.Operation_KillEtcd.String(), srv.last.String()),
}, nil
}

srv.stopProxy()

srv.logger.Info("killing etcd process", zap.String("signal", syscall.SIGTERM.String()))
srv.logger.Info("killing etcd", zap.String("signal", syscall.SIGTERM.String()))
err := stopWithSig(srv.etcdCmd, syscall.SIGTERM)
if err != nil {
return nil, err
}
srv.logger.Info("killed etcd process", zap.String("signal", syscall.SIGTERM.String()))
srv.logger.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String()))

return &rpcpb.Response{
Success: true,
Expand All @@ -292,17 +287,15 @@ func (srv *Server) handleKillEtcd() (*rpcpb.Response, error) {
}

func (srv *Server) handleFailArchive() (*rpcpb.Response, error) {
// TODO: stop/restart proxy?
// for now, just keep using the old ones
// if len(srv.advertisePortToProxy) > 0
srv.stopProxy()

// exit with stackstrace
srv.logger.Info("killing etcd process", zap.String("signal", syscall.SIGQUIT.String()))
srv.logger.Info("killing etcd", zap.String("signal", syscall.SIGQUIT.String()))
err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
if err != nil {
return nil, err
}
srv.logger.Info("killed etcd process", zap.String("signal", syscall.SIGQUIT.String()))
srv.logger.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))

srv.etcdLogFile.Sync()
srv.etcdLogFile.Close()
Expand Down Expand Up @@ -336,12 +329,12 @@ func (srv *Server) handleFailArchive() (*rpcpb.Response, error) {

// stop proxy, etcd, delete data directory
func (srv *Server) handleDestroyEtcdAgent() (*rpcpb.Response, error) {
srv.logger.Info("killing etcd process", zap.String("signal", syscall.SIGTERM.String()))
srv.logger.Info("killing etcd", zap.String("signal", syscall.SIGTERM.String()))
err := stopWithSig(srv.etcdCmd, syscall.SIGTERM)
if err != nil {
return nil, err
}
srv.logger.Info("killed etcd process", zap.String("signal", syscall.SIGTERM.String()))
srv.logger.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String()))

srv.logger.Info("removing base directory", zap.String("dir", srv.Member.BaseDir))
err = os.RemoveAll(srv.Member.BaseDir)
Expand Down
1 change: 1 addition & 0 deletions tools/functional-tester/cmd/etcd-tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func main() {
}
defer clus.DestroyEtcdAgents()

logger.Info("wait health after bootstrap")
err = clus.WaitHealth()
if err != nil {
logger.Fatal("WaitHealth failed", zap.Error(err))
Expand Down
269 changes: 152 additions & 117 deletions tools/functional-tester/rpcpb/rpc.pb.go

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions tools/functional-tester/rpcpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ message Member {
string EtcdClientEndpoint = 204 [(gogoproto.moretags) = "yaml:\"etcd-client-endpoint\""];

// Etcd defines etcd binary configuration flags.
Etcd Etcd = 301 [(gogoproto.moretags) = "yaml:\"etcd-config\""];
Etcd Etcd = 301 [(gogoproto.moretags) = "yaml:\"etcd\""];
}

enum FailureCase {
Expand Down Expand Up @@ -143,8 +143,7 @@ message Tester {
// TODO: support no-op
repeated string FailureCases = 31 [(gogoproto.moretags) = "yaml:\"failure-cases\""];
// FailureShuffle is true to randomize failure injecting order.
// TODO: support shuffle
// bool FailureShuffle = 32 [(gogoproto.moretags) = "yaml:\"failure-shuffle\""];
bool FailureShuffle = 32 [(gogoproto.moretags) = "yaml:\"failure-shuffle\""];
// FailpointCommands is the list of "gofail" commands (e.g. panic("etcd-tester"),1*sleep(1000)).
repeated string FailpointCommands = 33 [(gogoproto.moretags) = "yaml:\"failpoint-commands\""];

Expand Down
114 changes: 83 additions & 31 deletions tools/functional-tester/tester/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"path/filepath"
"strings"
Expand All @@ -27,10 +28,10 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/debugutil"
"github.com/coreos/etcd/tools/functional-tester/rpcpb"
"golang.org/x/time/rate"

"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"golang.org/x/time/rate"
"google.golang.org/grpc"
yaml "gopkg.in/yaml.v2"
)
Expand Down Expand Up @@ -234,6 +235,33 @@ func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) {
}
go clus.serveTesterServer()

clus.updateFailures()

clus.rateLimiter = rate.NewLimiter(
rate.Limit(int(clus.Tester.StressQPS)),
int(clus.Tester.StressQPS),
)
clus.updateStresserChecker()
return clus, nil
}

func (clus *Cluster) serveTesterServer() {
clus.logger.Info(
"started tester HTTP server",
zap.String("tester-address", clus.Tester.TesterAddr),
)
err := clus.testerHTTPServer.ListenAndServe()
clus.logger.Info(
"tester HTTP server returned",
zap.String("tester-address", clus.Tester.TesterAddr),
zap.Error(err),
)
if err != nil && err != http.ErrServerClosed {
clus.logger.Fatal("tester HTTP errored", zap.Error(err))
}
}

func (clus *Cluster) updateFailures() {
for _, cs := range clus.Tester.FailureCases {
switch cs {
case "KILL_ONE_FOLLOWER":
Expand Down Expand Up @@ -270,33 +298,59 @@ func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) {
clus.failures = append(clus.failures, newFailureNoOp())
case "EXTERNAL":
clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath))
default:
return nil, fmt.Errorf("unknown failure %q", cs)
}
}
}

clus.rateLimiter = rate.NewLimiter(
rate.Limit(int(clus.Tester.StressQPS)),
int(clus.Tester.StressQPS),
)
clus.updateStresserChecker()
return clus, nil
func (clus *Cluster) failureStrings() (fs []string) {
fs = make([]string, len(clus.failures))
for i := range clus.failures {
fs[i] = clus.failures[i].Desc()
}
return fs
}

func (clus *Cluster) serveTesterServer() {
clus.logger.Info(
"started tester HTTP server",
zap.String("tester-address", clus.Tester.TesterAddr),
)
err := clus.testerHTTPServer.ListenAndServe()
clus.logger.Info(
"tester HTTP server returned",
zap.String("tester-address", clus.Tester.TesterAddr),
zap.Error(err),
)
if err != nil && err != http.ErrServerClosed {
clus.logger.Fatal("tester HTTP errored", zap.Error(err))
func (clus *Cluster) shuffleFailures() {
rand.Seed(time.Now().UnixNano())
offset := rand.Intn(1000)
n := len(clus.failures)
cp := coprime(n)

clus.logger.Info("shuffling test failure cases", zap.Int("total", n))
fs := make([]Failure, n)
for i := 0; i < n; i++ {
fs[i] = clus.failures[(cp*i+offset)%n]
}
clus.failures = fs
clus.logger.Info("shuffled test failure cases", zap.Int("total", n))
}

/*
x and y of GCD 1 are coprime to each other

x1 = ( coprime of n * idx1 + offset ) % n
x2 = ( coprime of n * idx2 + offset ) % n
(x2 - x1) = coprime of n * (idx2 - idx1) % n
= (idx2 - idx1) = 1

Consecutive x's are guaranteed to be distinct
*/
func coprime(n int) int {
coprime := 1
for i := n / 2; i < n; i++ {
if gcd(i, n) == 1 {
coprime = i
break
}
}
return coprime
}

func gcd(x, y int) int {
if y == 0 {
return x
}
return gcd(y, x%y)
}

func (clus *Cluster) updateStresserChecker() {
Expand Down Expand Up @@ -522,8 +576,6 @@ func (clus *Cluster) DestroyEtcdAgents() {
clus.logger.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
}

// TODO: closing stresser connections to etcd

if clus.testerHTTPServer != nil {
clus.logger.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr))
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
Expand Down Expand Up @@ -558,7 +610,7 @@ func (clus *Cluster) WaitHealth() error {
break
}
clus.logger.Info(
"successfully wrote health key",
"wrote health key",
zap.Int("retries", i),
zap.String("endpoint", m.EtcdClientEndpoint),
)
Expand Down Expand Up @@ -642,9 +694,9 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
kvc := pb.NewKVClient(conn)

clus.logger.Info(
"starting compaction",
"compacting",
zap.String("endpoint", m.EtcdClientEndpoint),
zap.Int64("revision", rev),
zap.Int64("compact-revision", rev),
zap.Duration("timeout", timeout),
)

Expand All @@ -660,14 +712,14 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
clus.logger.Info(
"compact error is ignored",
zap.String("endpoint", m.EtcdClientEndpoint),
zap.Int64("revision", rev),
zap.Int64("compact-revision", rev),
zap.Error(cerr),
)
} else {
clus.logger.Warn(
"compact failed",
zap.String("endpoint", m.EtcdClientEndpoint),
zap.Int64("revision", rev),
zap.Int64("compact-revision", rev),
zap.Error(cerr),
)
err = cerr
Expand All @@ -677,9 +729,9 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {

if succeed {
clus.logger.Info(
"finished compaction",
"compacted",
zap.String("endpoint", m.EtcdClientEndpoint),
zap.Int64("revision", rev),
zap.Int64("compact-revision", rev),
zap.Duration("timeout", timeout),
zap.Duration("took", time.Since(now)),
)
Expand Down
31 changes: 31 additions & 0 deletions tools/functional-tester/tester/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tester

import (
"reflect"
"sort"
"testing"

"github.com/coreos/etcd/tools/functional-tester/rpcpb"
Expand Down Expand Up @@ -131,6 +132,7 @@ func Test_newCluster(t *testing.T) {
"DELAY_PEER_PORT_TX_RX_LEADER",
"DELAY_PEER_PORT_TX_RX_ALL",
},
FailureShuffle: true,
FailpointCommands: []string{`panic("etcd-tester")`},
RunnerExecPath: "/etcd-runner",
ExternalExecPath: "",
Expand Down Expand Up @@ -159,4 +161,33 @@ func Test_newCluster(t *testing.T) {
if !reflect.DeepEqual(exp, cfg) {
t.Fatalf("expected %+v, got %+v", exp, cfg)
}

cfg.logger = logger

cfg.updateFailures()
fs1 := cfg.failureStrings()

cfg.shuffleFailures()
fs2 := cfg.failureStrings()
if reflect.DeepEqual(fs1, fs2) {
t.Fatalf("expected shuffled failure cases, got %q", fs2)
}

cfg.shuffleFailures()
fs3 := cfg.failureStrings()
if reflect.DeepEqual(fs2, fs3) {
t.Fatalf("expected reshuffled failure cases from %q, got %q", fs2, fs3)
}

// shuffle ensures visit all exactly once
// so when sorted, failure cases must be equal
sort.Strings(fs1)
sort.Strings(fs2)
sort.Strings(fs3)
if !reflect.DeepEqual(fs1, fs2) {
t.Fatalf("expected %q, got %q", fs1, fs2)
}
if !reflect.DeepEqual(fs2, fs3) {
t.Fatalf("expected %q, got %q", fs2, fs3)
}
}
Loading