Skip to content

Commit

Permalink
reproduce watch starvation and event loss
Browse files Browse the repository at this point in the history
Signed-off-by: Chao Chen <chaochn@amazon.com>
  • Loading branch information
chaochn47 committed Mar 5, 2024
1 parent ae3b43a commit 001bbdd
Showing 1 changed file with 276 additions and 1 deletion.
277 changes: 276 additions & 1 deletion tests/e2e/watch_delay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,24 @@ package e2e

import (
"context"
"errors"
"fmt"
"math/rand"
"path"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"

v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -166,6 +174,273 @@ func TestWatchDelayForEvent(t *testing.T) {
}
}

var watchKeyPrefix = "/registry/pods/"

type randomStringAlphabet string

func (a randomStringAlphabet) makeString(minLen, maxLen int) string {
n := minLen
if minLen < maxLen {
n += rand.Intn(maxLen - minLen)
}
var s string
for i := 0; i < n; i++ {
s += string(a[rand.Intn(len(a))])
}
return s
}

var randomStringMaker = randomStringAlphabet("abcdefghijklmnopqrstuvwxyz0123456789")

func TestWatchDelayOnStreamMultiplex(t *testing.T) {
e2e.BeforeTest(t)
clus, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ClusterSize: 1, LogLevel: "debug"})
require.NoError(t, err)
defer clus.Close()
endpoints := clus.EndpointsV3()
c := newClient(t, endpoints, e2e.ClientNonTLS, false)
rootCtx, cancel := context.WithCancel(context.Background())
defer cancel()
g := errgroup.Group{}

commonWatchOpts := []clientv3.OpOption{
clientv3.WithPrefix(),
clientv3.WithPrevKV(),
}

watchCacheEventsReceived := atomic.Int64{}
watchCacheWatchInitialized := make(chan struct{})
watchCacheWatcherErrCompacted := make(chan struct{})
g.Go(func() error {
// simulate watch cache
lg := zaptest.NewLogger(t).Named("watch-cache")
gresp, err := c.Get(rootCtx, "foo")
if err != nil {
panic(err)
}
rev := gresp.Header.Revision
watchCacheWatchOpts := append([]clientv3.OpOption{clientv3.WithCreatedNotify(), clientv3.WithRev(rev), clientv3.WithProgressNotify()}, commonWatchOpts...)
lastTimeGotResponse := time.Now()
for wres := range c.Watch(rootCtx, watchKeyPrefix, watchCacheWatchOpts...) {
if wres.Err() != nil {
lg.Warn("got watch response error", zap.String("error", wres.Err().Error()), zap.Int64("compact-revision", wres.CompactRevision))
close(watchCacheWatcherErrCompacted)
return wres.Err()
}
if wres.Created {
close(watchCacheWatchInitialized)
}
watchCacheEventsReceived.Add(int64(len(wres.Events)))
elapsed := time.Since(lastTimeGotResponse)
if elapsed > 10*time.Second {
handleWatchResponse(lg, &wres, false)
}
lastTimeGotResponse = time.Now()
}
return nil
})
<-watchCacheWatchInitialized

var wg sync.WaitGroup
numOfDirectWatches := 1030 // a little over 1024
for i := 0; i < numOfDirectWatches; i++ {
wg.Add(1)
lg := zaptest.NewLogger(t).Named(fmt.Sprintf("direct-watch-%d", i))
g.Go(func() error {
perDirectWatchContext, perDirectWatchCancelFn := context.WithCancel(rootCtx)
retry := 0
for {
var watchOpts = append([]clientv3.OpOption{}, commonWatchOpts...)
if retry == 0 {
watchOpts = append(watchOpts, clientv3.WithCreatedNotify())
}
err := directWatch(perDirectWatchContext, lg, &wg, c, watchKeyPrefix, watchOpts)
if errors.Is(err, v3rpc.ErrCompacted) {
retry++
continue
}
// if watch is cancelled by client or closed by server, we should exit
perDirectWatchCancelFn()
return nil
}
})
}
wg.Wait()

eventsTriggered := atomic.Int64{}
loadCtx, loadCtxCancel := context.WithTimeout(rootCtx, time.Minute)
defer loadCtxCancel()

var clients []*clientv3.Client
for ci := 0; ci < 3; ci++ {
clients = append(clients, newClient(t, endpoints, e2e.ClientNonTLS, false))
}
generateLoad(t, loadCtx, clients, &g, &eventsTriggered)
compaction(t, loadCtx, &g, c)
compareEventsReceivedAndTriggered(t, cancel, loadCtx, &g, &watchCacheEventsReceived, &eventsTriggered, watchCacheWatcherErrCompacted)
require.NoError(t, g.Wait())
}

func directWatch(ctx context.Context, lg *zap.Logger, wg *sync.WaitGroup, c *clientv3.Client, keyPrefix string, watchOpts []clientv3.OpOption) error {
wch := c.Watch(ctx, keyPrefix, watchOpts...)
for wres := range wch {
if wres.Err() != nil {
if errors.Is(wres.Err(), v3rpc.ErrCompacted) {
lg.Debug("got watch response error", zap.String("error", wres.Err().Error()), zap.Int64("compact-revision", wres.CompactRevision))
}
return wres.Err()
}
if wres.Created {
wg.Done()
}
handleWatchResponse(lg, &wres, true)
}
return nil
}

func handleWatchResponse(lg *zap.Logger, wres *clientv3.WatchResponse, suppressLogging bool) {
if !suppressLogging {
switch {
case wres.Created:
lg.Info("got watch created notification", zap.Int64("revision", wres.Header.Revision))
case len(wres.Events) == 0:
lg.Info("got progress notify watch response", zap.Int64("revision", wres.Header.Revision))
case wres.Canceled:
lg.Warn("got watch cancelled")
default:
for _, ev := range wres.Events {
lg.Info("got watch response", zap.String("event-type", ev.Type.String()), zap.ByteString("key", ev.Kv.Key), zap.Int64("rev", wres.Header.Revision))
}
}
}
}

func generateLoad(t *testing.T, ctx context.Context, clients []*clientv3.Client, group *errgroup.Group, counter *atomic.Int64) {
// 3 (clients) * 200 (updater) * 1000 (per key value size) * 2 * 10 = 12MiB/s per watch
// 800 watches so the throughput is around 9.6GiB/s for all of watches
numOfUpdater := 200
keyValueSize := 1000
keyValueSizeUpperLimit := 1200
for ci, c := range clients {
for i := 0; i < numOfUpdater; i++ {
lg := zaptest.NewLogger(t).Named(fmt.Sprintf("client-%d-updater-%d", ci, i))
writeKeyPrefix := path.Join(watchKeyPrefix, fmt.Sprintf("%d", i))
group.Go(func() error {
count := 0
keyValuePayload := randomStringMaker.makeString(keyValueSize, keyValueSizeUpperLimit)
for {
select {
case <-ctx.Done():
return nil
default:
}
count++
key := path.Join(writeKeyPrefix, fmt.Sprintf("%d", count))
_, err := c.Put(ctx, key, keyValuePayload)
if err != nil {
lg.Warn("failed to write a key value to database", zap.String("key", key), zap.Error(err))
continue
} else {
counter.Add(1)
}
_, err = c.Delete(ctx, key)
if err != nil {
lg.Warn("failed to delete key to database", zap.String("key", key), zap.Error(err))
} else {
counter.Add(1)
}
}
})
}
}
}

func compaction(t *testing.T, ctx context.Context, group *errgroup.Group, c *clientv3.Client) {
group.Go(func() error {
lg := zaptest.NewLogger(t).Named("compaction")
lastCompactRev := int64(-1)
ticker := time.NewTicker(20 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
lg.Warn("context deadline exceeded, exit compaction routine")
return nil
case <-ticker.C:
}
if lastCompactRev < 0 {
gresp, err := c.Get(ctx, "foo")
if err != nil {
panic(err)
}
lastCompactRev = gresp.Header.Revision
continue
}
cres, err := c.Compact(ctx, lastCompactRev, clientv3.WithCompactPhysical())
if err != nil {
panic(err)
}
lg.Debug("compacted rev", zap.Int64("compact-revision", lastCompactRev))
lastCompactRev = cres.Header.Revision
}
})
}

func compareEventsReceivedAndTriggered(
t *testing.T,
rootCtxCancel context.CancelFunc,
loadCtx context.Context,
group *errgroup.Group,
watchCacheEventsReceived *atomic.Int64,
eventsTriggered *atomic.Int64,
watchCacheWatcherErrCompacted <-chan struct{},
) {
group.Go(func() error {
lg := zaptest.NewLogger(t).Named("compareEvents")
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

var once sync.Once
timer := time.NewTimer(2 * time.Hour)
defer timer.Stop()
for {
// block until traffic is done.
select {
case <-loadCtx.Done():
once.Do(func() { lg.Info("load generator context is done") })
}

// in case watch cache watcher channel is closed, reproduce the lost events with another retry
select {
case <-watchCacheWatcherErrCompacted:
lg.Warn("watch cache watcher got compacted error, please retry the reproduce")
return nil
default:
}

select {
case <-ticker.C:
case <-timer.C:
// cancel the watch cache watcher.
rootCtxCancel()
triggered := eventsTriggered.Load()
received := watchCacheEventsReceived.Load()
return fmt.Errorf("10mins passed since load generation is done, watch cache lost event detected; "+
"watch evetns received %d, received %d", received, triggered)
}
triggered := eventsTriggered.Load()
received := watchCacheEventsReceived.Load()
if received >= triggered {
lg.Info("The number of events watch cache received is high than or equal to events triggered on client side")
return nil
}
lg.Warn("watch events received is lagging behind",
zap.Int64("watch-events-received", received),
zap.Int64("events-triggered", triggered))
}
})
}

func validateWatchDelay(t *testing.T, watch clientv3.WatchChan, maxWatchDelay time.Duration) {
start := time.Now()
var maxDelay time.Duration
Expand Down

0 comments on commit 001bbdd

Please sign in to comment.