Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix datarace in blocksync SyncStatus() and the new blocksync design #3890

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions blocksync/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type (
unicastOutbound UniCastOutbound
blockP2pPeer BlockPeer

syncTask *routine.RecurringTask
syncTask *routine.TriggerTask
syncStageTask *routine.RecurringTask

syncStageHeight uint64
Expand All @@ -78,6 +78,7 @@ type (
lastTip uint64
lastTipUpdateTime time.Time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep this for now

  1. so we have a record of the current code/algorithm
  2. maybe the new algorithm can utilize this info in the future to further improve

targetHeight uint64 // block number of the highest block header this node has received from peers
requestMaxHeight uint64
millken marked this conversation as resolved.
Show resolved Hide resolved
mu sync.RWMutex
}

Expand Down Expand Up @@ -150,7 +151,7 @@ func NewBlockSyncer(
targetHeight: 0,
}
if bs.cfg.Interval != 0 {
bs.syncTask = routine.NewRecurringTask(bs.sync, bs.cfg.Interval)
bs.syncTask = routine.NewTriggerTask(bs.sync, routine.WithTriggerTaskInterval(bs.cfg.RateLimitInterval))
bs.syncStageTask = routine.NewRecurringTask(bs.syncStageChecker, bs.cfg.Interval)
}
atomic.StoreUint64(&bs.syncBlockIncrease, 0)
Expand All @@ -173,17 +174,14 @@ func (bs *blockSyncer) commitBlocks(blks []*peerBlock) bool {
}

func (bs *blockSyncer) flushInfo() (time.Time, uint64) {
bs.mu.Lock()
defer bs.mu.Unlock()
bs.mu.RLock()
defer bs.mu.RUnlock()

return bs.lastTipUpdateTime, bs.targetHeight
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep it for now

}

func (bs *blockSyncer) sync() {
updateTime, targetHeight := bs.flushInfo()
if updateTime.Add(bs.cfg.Interval).After(time.Now()) {
return
}
_, targetHeight := bs.flushInfo()
intervals := bs.buf.GetBlocksIntervalsToSync(bs.tipHeightHandler(), targetHeight)
// no sync
if len(intervals) == 0 {
Expand All @@ -194,6 +192,7 @@ func (bs *blockSyncer) sync() {
log.L().Info("block sync intervals.",
zap.Any("intervals", intervals),
zap.Uint64("targetHeight", targetHeight))
atomic.StoreUint64(&bs.requestMaxHeight, intervals[len(intervals)-1].End)
for i, interval := range intervals {
bs.requestBlock(context.Background(), interval.Start, interval.End, bs.cfg.MaxRepeat-i/bs.cfg.RepeatDecayStep)
}
Expand All @@ -216,7 +215,7 @@ func (bs *blockSyncer) requestBlock(ctx context.Context, start uint64, end uint6
repeat = len(peers)
}
for i := 0; i < repeat; i++ {
peer := peers[fastrand.Uint32n(uint32(len(peers)))]
peer := peers[fastrand.Uint32n(uint32(len(peers)))] //may get the same node
if err := bs.unicastOutbound(
ctx,
peer,
Expand All @@ -240,6 +239,10 @@ func (bs *blockSyncer) Start(ctx context.Context) error {
if err := bs.syncTask.Start(ctx); err != nil {
return err
}
//we need to wait for the peer to be ready, and then start the sync task
go time.AfterFunc(bs.cfg.Interval, func() {
bs.syncTask.Trigger()
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not very reliable that ensuring peer ready by waiting a specific period. maybe could trigger it periodically

}
if bs.syncStageTask != nil {
return bs.syncStageTask.Start(ctx)
Expand All @@ -263,6 +266,7 @@ func (bs *blockSyncer) Stop(ctx context.Context) error {
return nil
}

// ProcessBlock processes a block
func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block.Block) error {
if blk == nil {
return errors.New("block is nil")
Expand Down Expand Up @@ -291,9 +295,15 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block
bs.lastTip = syncedHeight
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think we should also use atomic.SwapUint64 to handle bs.lastTip

bs.lastTipUpdateTime = time.Now()
}
requestMaxHeight := atomic.LoadUint64(&bs.requestMaxHeight)
if requestMaxHeight > 0 && syncedHeight >= requestMaxHeight {
bs.syncTask.Trigger()
atomic.SwapUint64(&bs.requestMaxHeight, 0)
}
millken marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// ProcessSyncRequest processes a sync request
func (bs *blockSyncer) ProcessSyncRequest(ctx context.Context, peer peer.AddrInfo, start uint64, end uint64) error {
tip := bs.tipHeightHandler()
if end > tip {
Expand Down Expand Up @@ -327,7 +337,10 @@ func (bs *blockSyncer) syncStageChecker() {
bs.syncStageHeight = tipHeight
}

// SyncStatus returns the status of block syncer
func (bs *blockSyncer) SyncStatus() (uint64, uint64, uint64, string) {
bs.mu.RLock()
defer bs.mu.RUnlock()
var syncSpeedDesc string
syncBlockIncrease := atomic.LoadUint64(&bs.syncBlockIncrease)
switch {
Expand Down
156 changes: 150 additions & 6 deletions blocksync/blocksync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,14 @@ package blocksync

import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/iotexproject/go-pkgs/hash"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-core/action/protocol"
"github.com/iotexproject/iotex-core/action/protocol/account"
accountutil "github.com/iotexproject/iotex-core/action/protocol/account/util"
Expand All @@ -37,6 +34,13 @@ import (
"github.com/iotexproject/iotex-core/test/mock/mock_blocksync"
"github.com/iotexproject/iotex-core/test/mock/mock_consensus"
"github.com/iotexproject/iotex-core/testutil"
goproto "github.com/iotexproject/iotex-proto/golang"
"github.com/iotexproject/iotex-proto/golang/iotexrpc"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
)

type testConfig struct {
Expand Down Expand Up @@ -535,3 +539,143 @@ func TestDummyBlockSync(t *testing.T) {
require.Zero(targetHeight)
require.Empty(desc)
}

type test1 struct {
syncBlockIncrease uint64
startingHeight uint64
targetHeight uint64
mu sync.RWMutex
}

func (bs *test1) sync() {
for i := 0; i >= 0; i++ {
time.Sleep(time.Millisecond * 2)
bs.mu.Lock()
atomic.StoreUint64(&bs.syncBlockIncrease, uint64(i))
bs.targetHeight = uint64(i)
bs.startingHeight = uint64(i)
bs.mu.Unlock()
}
}
func (bs *test1) tipHeightHandler() uint64 {
return 1
}
func (bs *test1) SyncStatus() (uint64, uint64, uint64, string) {
// bs.mu.RLock()
// defer bs.mu.RUnlock()
var syncSpeedDesc string
syncBlockIncrease := atomic.LoadUint64(&bs.syncBlockIncrease)
switch {
case syncBlockIncrease == 1:
syncSpeedDesc = "synced to blockchain tip"
default:
syncSpeedDesc = fmt.Sprintf("sync in progress at %.1f blocks/sec", float64(syncBlockIncrease))
}
return bs.startingHeight, bs.tipHeightHandler(), bs.targetHeight, syncSpeedDesc
}

func (bs *test1) BuildReport() string {
startingHeight, tipHeight, targetHeight, syncSpeedDesc := bs.SyncStatus()
return fmt.Sprintf(
"BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s",
startingHeight,
tipHeight,
targetHeight,
syncSpeedDesc,
)
}

func TestBlockSyncerBugIssue3889(t *testing.T) {
bs := &test1{}
go bs.sync()
time.Sleep(time.Millisecond * 10)
wait := sync.WaitGroup{}
wait.Add(5)
for i := 0; i < 5; i++ {
go func(i int) {
defer wait.Done()
startingHeight, tipHeight, targetHeight, syncSpeedDesc := bs.SyncStatus()
t.Logf("BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s", startingHeight, tipHeight, targetHeight, syncSpeedDesc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also need to verify sync status?

report := bs.BuildReport()
t.Log(report)
}(i)
}
wait.Wait()
}

func TestBlockSync(t *testing.T) {
//t.SkipNow()
require := require.New(t)
cfg := DefaultConfig
cfg.Interval = 1 * time.Second
cfg.BufferSize = 200
cfg.MaxRepeat = 3
cfg.RepeatDecayStep = 3
var tipHeight uint64
chanProcessBlk := make(chan *block.Block)

bs, err := NewBlockSyncer(cfg, func() uint64 {
return atomic.LoadUint64(&tipHeight)
}, nil, func(b *block.Block) error {
//t.Logf("commit block %d", b.Height())
atomic.StoreUint64(&tipHeight, b.Height())
return nil
}, func() ([]peer.AddrInfo, error) {
return []peer.AddrInfo{{
ID: peer.ID("test"),
}}, nil
}, func(ctx context.Context, ai peer.AddrInfo, m proto.Message) error {
msgType, err := goproto.GetTypeFromRPCMsg(m)
if err != nil {
return err
}
switch msgType {
case iotexrpc.MessageType_BLOCK_REQUEST:
msgBody, err := proto.Marshal(m)
if err != nil {
return err
}
var req iotexrpc.BlockSync
if err := proto.Unmarshal(msgBody, &req); err != nil {
return err
}
// send block
for i := req.Start; i <= req.End; i++ {
blk, _ := block.NewTestingBuilder().
SetHeight(i).SignAndBuild(identityset.PrivateKey(0))
chanProcessBlk <- &blk
}
}
return nil
}, func(s string) {
t.Logf("block p2p %s", s)
})
ctx := context.Background()
require.NoError(err)
require.NotNil(bs)
go func(bs BlockSync) {
for blk := range chanProcessBlk {
bs.ProcessBlock(ctx, "test", blk)
}
}(bs)
go func(bs BlockSync) {
for {
blk, _ := block.NewTestingBuilder().
SetHeight(20000000).SignAndBuild(identityset.PrivateKey(0))
bs.ProcessBlock(ctx, "test", &blk)
time.Sleep(time.Second * 1)
}
}(bs)
require.NoError(bs.Start(ctx))
go func(bs BlockSync) {
for {
time.Sleep(cfg.Interval)
t.Log(bs.BuildReport())
}
}(bs)
defer func() {
require.NoError(bs.Stop(ctx))
}()

time.Sleep(time.Second * 10)
}
7 changes: 7 additions & 0 deletions blocksync/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,10 @@ func TestBlockBufferGetBlocksIntervalsToSync(t *testing.T) {
// There should always have at least 1 interval range to sync
assert.Len(b.GetBlocksIntervalsToSync(chain.TipHeight(), 0), 1)
}

func TestBuffer(t *testing.T) {
bb := newBlockBuffer(200, 20)
bb.blockQueues[3] = newUniQueue()
iv := bb.GetBlocksIntervalsToSync(0, 2000)
t.Logf("iv: %v", iv)
}
2 changes: 2 additions & 0 deletions blocksync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import "time"
// Config is the config struct for the BlockSync
type Config struct {
Interval time.Duration `yaml:"interval"` // update duration
RateLimitInterval time.Duration `yaml:"rateLimitInterval"`
ProcessSyncRequestTTL time.Duration `yaml:"processSyncRequestTTL"`
BufferSize uint64 `yaml:"bufferSize"`
IntervalSize uint64 `yaml:"intervalSize"`
Expand All @@ -22,6 +23,7 @@ type Config struct {
// DefaultConfig is the default config
var DefaultConfig = Config{
Interval: 30 * time.Second,
RateLimitInterval: 1 * time.Second,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you do some testing to see the sync speed? for 1 sec and 2 sec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1sec: 29.6 blks/sec
2sec: 14.4 blks/sec

ProcessSyncRequestTTL: 10 * time.Second,
BufferSize: 200,
IntervalSize: 20,
Expand Down
Loading