Skip to content

Commit

Permalink
fix(dot/sync): sync benchmark (#2234)
Browse files Browse the repository at this point in the history
- only keep 30 latest samples in a ring buffer
- Inject current time in benchmark methods
- Remove force setting blocks to `1` when `0` blocks were processed
  • Loading branch information
qdm12 committed Jan 27, 2022
1 parent a90a6e0 commit 2f3aef8
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 18 deletions.
50 changes: 35 additions & 15 deletions dot/sync/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,64 @@
package sync

import (
"container/ring"
"time"
)

type syncBenchmarker struct {
start time.Time
startBlock uint64
blocksPerSecond []float64
blocksPerSecond *ring.Ring
samplesToKeep int
}

func newSyncBenchmarker() *syncBenchmarker {
func newSyncBenchmarker(samplesToKeep int) *syncBenchmarker {
if samplesToKeep == 0 {
panic("cannot have 0 samples to keep")
}

return &syncBenchmarker{
blocksPerSecond: []float64{},
blocksPerSecond: ring.New(samplesToKeep),
samplesToKeep: samplesToKeep,
}
}

func (b *syncBenchmarker) begin(block uint64) {
b.start = time.Now()
func (b *syncBenchmarker) begin(now time.Time, block uint64) {
b.start = now
b.startBlock = block
}

func (b *syncBenchmarker) end(block uint64) {
duration := time.Since(b.start)
func (b *syncBenchmarker) end(now time.Time, block uint64) {
duration := now.Sub(b.start)
blocks := block - b.startBlock
if blocks == 0 {
blocks = 1
}
bps := float64(blocks) / duration.Seconds()
b.blocksPerSecond = append(b.blocksPerSecond, bps)
b.blocksPerSecond.Value = bps
b.blocksPerSecond = b.blocksPerSecond.Next()
}

func (b *syncBenchmarker) average() float64 {
sum := float64(0)
for _, bps := range b.blocksPerSecond {
var sum float64
var elementsSet int
b.blocksPerSecond.Do(func(x interface{}) {
if x == nil {
return
}
bps := x.(float64)
sum += bps
elementsSet++
})

if elementsSet == 0 {
return 0
}
return sum / float64(len(b.blocksPerSecond))

return sum / float64(elementsSet)
}

func (b *syncBenchmarker) mostRecentAverage() float64 {
return b.blocksPerSecond[len(b.blocksPerSecond)-1]
value := b.blocksPerSecond.Prev().Value
if value == nil {
return 0
}
return value.(float64)
}
237 changes: 237 additions & 0 deletions dot/sync/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
// Copyright 2022 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package sync

import (
"container/ring"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test_newSyncBenchmarker(t *testing.T) {
t.Parallel()

t.Run("10 samples to keep", func(t *testing.T) {
const samplesToKeep = 10
actual := newSyncBenchmarker(samplesToKeep)

expected := &syncBenchmarker{
blocksPerSecond: ring.New(samplesToKeep),
samplesToKeep: samplesToKeep,
}

assert.Equal(t, expected, actual)
})

t.Run("panics on 0 sample to keep", func(t *testing.T) {
const samplesToKeep = 0
assert.PanicsWithValue(t, "cannot have 0 samples to keep", func() {
newSyncBenchmarker(samplesToKeep)
})
})
}

func Test_syncBenchmarker_begin(t *testing.T) {
t.Parallel()

const startSec = 1000
start := time.Unix(startSec, 0)
const startBlock = 10

b := syncBenchmarker{}
b.begin(start, startBlock)

expected := syncBenchmarker{
start: start,
startBlock: startBlock,
}

assert.Equal(t, expected, b)
}

func Test_syncBenchmarker_end(t *testing.T) {
t.Parallel()

const startSec = 1000
start := time.Unix(startSec, 0)

const nowSec = 1010
now := time.Unix(nowSec, 0)

const (
startBlock = 10
endBlock = 12
)

const ringCap = 3

blocksPerSecond := ring.New(ringCap)
blocksPerSecond.Value = 1.00
blocksPerSecond = blocksPerSecond.Next()

b := syncBenchmarker{
start: start,
startBlock: startBlock,
blocksPerSecond: blocksPerSecond,
}
b.end(now, endBlock)

expectedBlocksPerSecond := ring.New(ringCap)
expectedBlocksPerSecond.Value = 1.00
expectedBlocksPerSecond = expectedBlocksPerSecond.Next()
expectedBlocksPerSecond.Value = 0.2
expectedBlocksPerSecond = expectedBlocksPerSecond.Next()

expected := syncBenchmarker{
start: start,
startBlock: startBlock,
blocksPerSecond: expectedBlocksPerSecond,
}

assert.Equal(t, expected, b)
}

func Test_syncBenchmarker_average(t *testing.T) {
t.Parallel()

testCases := map[string]struct {
values []float64
ringCap int
average float64
}{
// zero size ring is not possible due to constructor check
"empty ring": {
ringCap: 1,
},
"single element in one-size ring": {
values: []float64{1.1},
ringCap: 1,
average: 1.1,
},
"single element in two-size ring": {
values: []float64{1.1},
ringCap: 2,
average: 1.1,
},
"two elements in two-size ring": {
values: []float64{1.0, 2.0},
ringCap: 2,
average: 1.5,
},
}

for name, testCase := range testCases {
testCase := testCase
t.Run(name, func(t *testing.T) {
t.Parallel()

blocksPerSecond := ring.New(testCase.ringCap)
for _, value := range testCase.values {
blocksPerSecond.Value = value
blocksPerSecond = blocksPerSecond.Next()
}

benchmarker := syncBenchmarker{
blocksPerSecond: blocksPerSecond,
samplesToKeep: testCase.ringCap,
}

avg := benchmarker.average()

assert.Equal(t, testCase.average, avg)
})
}
}

func Test_syncBenchmarker_mostRecentAverage(t *testing.T) {
t.Parallel()

testCases := map[string]struct {
values []float64
ringCap int
average float64
}{
// zero size ring is not possible due to constructor check
"empty ring": {
ringCap: 1,
},
"single element in one-size ring": {
values: []float64{1.1},
ringCap: 1,
average: 1.1,
},
"single element in two-size ring": {
values: []float64{1.1},
ringCap: 2,
average: 1.1,
},
"two elements in two-size ring": {
values: []float64{1.0, 2.0},
ringCap: 2,
average: 2.0,
},
"three elements in two-size ring": {
values: []float64{1.0, 2.0, 3.0},
ringCap: 2,
average: 3.0,
},
}

for name, testCase := range testCases {
testCase := testCase
t.Run(name, func(t *testing.T) {
t.Parallel()

blocksPerSecond := ring.New(testCase.ringCap)
for _, value := range testCase.values {
blocksPerSecond.Value = value
blocksPerSecond = blocksPerSecond.Next()
}

benchmarker := syncBenchmarker{
blocksPerSecond: blocksPerSecond,
}

avg := benchmarker.mostRecentAverage()

assert.Equal(t, testCase.average, avg)
})
}
}

func Test_syncBenchmarker(t *testing.T) {
t.Parallel()

const samplesToKeep = 5
benchmarker := newSyncBenchmarker(samplesToKeep)

const initialBlock = 10
timeZero := time.Unix(0, 0)
const timeIncrement = time.Second
const baseBlocksIncrement uint64 = 1

startTime := timeZero
endTime := startTime.Add(timeIncrement)
var block uint64 = initialBlock

const samples = 10
for i := 0; i < samples; i++ {
benchmarker.begin(startTime, block)
block += baseBlocksIncrement + uint64(i)
benchmarker.end(endTime, block)

startTime = startTime.Add(timeIncrement)
endTime = startTime.Add(timeIncrement)
}

avg := benchmarker.average()
const expectedAvg = 8.0
assert.Equal(t, expectedAvg, avg)

mostRecentAvg := benchmarker.mostRecentAverage()
const expectedMostRecentAvg = 10.0
assert.Equal(t, expectedMostRecentAvg, mostRecentAvg)
}
7 changes: 4 additions & 3 deletions dot/sync/chain_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ type chainSyncConfig struct {

func newChainSync(cfg *chainSyncConfig) *chainSync {
ctx, cancel := context.WithCancel(context.Background())
const syncSamplesToKeep = 30
return &chainSync{
ctx: ctx,
cancel: cancel,
Expand All @@ -174,7 +175,7 @@ func newChainSync(cfg *chainSyncConfig) *chainSync {
pendingBlocks: cfg.pendingBlocks,
state: bootstrap,
handler: newBootstrapSyncer(cfg.bs),
benchmarker: newSyncBenchmarker(),
benchmarker: newSyncBenchmarker(syncSamplesToKeep),
finalisedCh: cfg.bs.GetFinalisedNotifierChannel(),
minPeers: cfg.minPeers,
maxWorkerRetries: uint16(cfg.maxPeers),
Expand Down Expand Up @@ -321,7 +322,7 @@ func (cs *chainSync) logSyncSpeed() {
}

if cs.state == bootstrap {
cs.benchmarker.begin(before.Number.Uint64())
cs.benchmarker.begin(time.Now(), before.Number.Uint64())
}

select {
Expand All @@ -345,7 +346,7 @@ func (cs *chainSync) logSyncSpeed() {

switch cs.state {
case bootstrap:
cs.benchmarker.end(after.Number.Uint64())
cs.benchmarker.end(time.Now(), after.Number.Uint64())
target := cs.getTarget()

logger.Infof(
Expand Down

0 comments on commit 2f3aef8

Please sign in to comment.