Skip to content

Commit

Permalink
reproduce write latency
Browse files Browse the repository at this point in the history
Signed-off-by: Chao Chen <chaochn@amazon.com>
  • Loading branch information
chaochn47 committed Jun 4, 2024
1 parent a0aee63 commit cf72b3e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 41 deletions.
2 changes: 1 addition & 1 deletion pkg/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewReport(precision string) Report { return newReport(precision) }

func newReport(precision string) *report {
r := &report{
results: make(chan Result, 16),
results: make(chan Result, 65536),
precision: precision,
}
r.stats.ErrorDist = make(map[string]int)
Expand Down
62 changes: 22 additions & 40 deletions tools/benchmark/cmd/watch_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
"context"
"fmt"
"os"
"sync/atomic"
"time"

"github.com/cheggaaa/pb/v3"
"github.com/spf13/cobra"
"golang.org/x/time/rate"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/report"
Expand Down Expand Up @@ -61,31 +61,23 @@ func init() {
}

func watchLatencyFunc(_ *cobra.Command, _ []string) {
key := string(mustRandBytes(watchLKeySize))
key := "/registry/pods"
value := string(mustRandBytes(watchLValueSize))
wchs := setupWatchChannels(key)
putClient := mustCreateConn()

bar = pb.New(watchLPutTotal * len(wchs))
bar.Start()

limiter := rate.NewLimiter(rate.Limit(watchLPutRate), watchLPutRate)

putTimes := make([]time.Time, watchLPutTotal)
eventTimes := make([][]time.Time, len(wchs))

for i, wch := range wchs {
for _, wch := range wchs {
wch := wch
i := i
eventTimes[i] = make([]time.Time, watchLPutTotal)
wg.Add(1)
go func() {
defer wg.Done()
eventCount := 0
for eventCount < watchLPutTotal {
resp := <-wch
for range resp.Events {
eventTimes[i][eventCount] = time.Now()
eventCount++
bar.Increment()
}
Expand All @@ -95,40 +87,30 @@ func watchLatencyFunc(_ *cobra.Command, _ []string) {

putReport := newReport()
putReportResults := putReport.Run()
watchReport := newReport()
watchReportResults := watchReport.Run()
for i := 0; i < watchLPutTotal; i++ {
// limit key put as per reqRate
if err := limiter.Wait(context.TODO()); err != nil {
break
}
start := time.Now()
if _, err := putClient.Put(context.TODO(), key, value); err != nil {
fmt.Fprintf(os.Stderr, "Failed to Put for watch latency benchmark: %v\n", err)
os.Exit(1)
}
end := time.Now()
putReport.Results() <- report.Result{Start: start, End: end}
putTimes[i] = end

var putCount atomic.Uint64
for i := 0; i < watchLPutRate; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
for {
if putCount.Load() >= uint64(watchLPutTotal) {
return
}
start := time.Now()
if _, err := putClient.Put(context.TODO(), key, value); err != nil {
fmt.Fprintf(os.Stderr, "Failed to Put for watch latency benchmark: %v\n", err)
}
end := time.Now()
putReport.Results() <- report.Result{Start: start, End: end}
putCount.Add(1)
}
}(i)
}
wg.Wait()
close(putReport.Results())
bar.Finish()
fmt.Printf("\nPut summary:\n%s", <-putReportResults)

for i := 0; i < len(wchs); i++ {
for j := 0; j < watchLPutTotal; j++ {
start := putTimes[j]
end := eventTimes[i][j]
if end.Before(start) {
start = end
}
watchReport.Results() <- report.Result{Start: start, End: end}
}
}

close(watchReport.Results())
fmt.Printf("\nWatch events summary:\n%s", <-watchReportResults)
}

func setupWatchChannels(key string) []clientv3.WatchChan {
Expand Down

0 comments on commit cf72b3e

Please sign in to comment.