From 575fcaa4a009f76af030883392b855c15099c76b Mon Sep 17 00:00:00 2001 From: millken Date: Mon, 19 Jun 2023 15:29:04 +0800 Subject: [PATCH 01/23] fix datarace in blocksync SyncStatus() --- blocksync/blocksync.go | 3 ++- blocksync/blocksync_test.go | 39 +++++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index f4e3f1f845..05f86d97ce 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -330,13 +330,14 @@ func (bs *blockSyncer) syncStageChecker() { func (bs *blockSyncer) SyncStatus() (uint64, uint64, uint64, string) { var syncSpeedDesc string syncBlockIncrease := atomic.LoadUint64(&bs.syncBlockIncrease) + increase := syncBlockIncrease switch { case syncBlockIncrease == 1: syncSpeedDesc = "synced to blockchain tip" case bs.cfg.Interval == 0: syncSpeedDesc = "no sync task" default: - syncSpeedDesc = fmt.Sprintf("sync in progress at %.1f blocks/sec", float64(syncBlockIncrease)/bs.cfg.Interval.Seconds()) + syncSpeedDesc = fmt.Sprintf("sync in progress at %.1f blocks/sec", float64(increase)/bs.cfg.Interval.Seconds()) } return bs.startingHeight, bs.tipHeightHandler(), bs.targetHeight, syncSpeedDesc } diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index 2a7a73e5ef..3e68cfcd88 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -7,6 +7,9 @@ package blocksync import ( "context" + "fmt" + "sync" + "sync/atomic" "testing" "time" @@ -535,3 +538,39 @@ func TestDummyBlockSync(t *testing.T) { require.Zero(targetHeight) require.Empty(desc) } + +func TestBlockSyncerBugIssue3889(t *testing.T) { + var syncBlockIncrease uint64 + SyncStatus := func() (uint64, uint64, uint64, string) { + var syncSpeedDesc string + syncBlockIncrease := atomic.LoadUint64(&syncBlockIncrease) + increase := syncBlockIncrease + syncSpeedDesc = fmt.Sprintf("sync in progress at %.1f blocks/sec", float64(increase)/1) + + return uint64(0), 1, 2, syncSpeedDesc + } + + // BuildReport builds a report of block syncer + BuildReport := func() string { + startingHeight, tipHeight, targetHeight, syncSpeedDesc := SyncStatus() + return fmt.Sprintf( + "BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s", + startingHeight, + tipHeight, + targetHeight, + syncSpeedDesc, + ) + } + wait := sync.WaitGroup{} + wait.Add(6) + // read concurrently + for i := 0; i < 5; i++ { + go func() { + defer wait.Done() + startingHeight, tipHeight, targetHeight, syncSpeedDesc := SyncStatus() + t.Logf("BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s", startingHeight, tipHeight, targetHeight, syncSpeedDesc) + report := BuildReport() + t.Log(report) + }() + } +} From 8c4dfa6d46756d323d161bf1869011698940028a Mon Sep 17 00:00:00 2001 From: millken Date: Tue, 20 Jun 2023 08:53:31 +0800 Subject: [PATCH 02/23] Revert "fix datarace in blocksync SyncStatus()" This reverts commit 575fcaa4a009f76af030883392b855c15099c76b. --- blocksync/blocksync.go | 3 +-- blocksync/blocksync_test.go | 39 ------------------------------------- 2 files changed, 1 insertion(+), 41 deletions(-) diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index 05f86d97ce..f4e3f1f845 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -330,14 +330,13 @@ func (bs *blockSyncer) syncStageChecker() { func (bs *blockSyncer) SyncStatus() (uint64, uint64, uint64, string) { var syncSpeedDesc string syncBlockIncrease := atomic.LoadUint64(&bs.syncBlockIncrease) - increase := syncBlockIncrease switch { case syncBlockIncrease == 1: syncSpeedDesc = "synced to blockchain tip" case bs.cfg.Interval == 0: syncSpeedDesc = "no sync task" default: - syncSpeedDesc = fmt.Sprintf("sync in progress at %.1f blocks/sec", float64(increase)/bs.cfg.Interval.Seconds()) + syncSpeedDesc = fmt.Sprintf("sync in progress at %.1f blocks/sec", float64(syncBlockIncrease)/bs.cfg.Interval.Seconds()) } return bs.startingHeight, bs.tipHeightHandler(), bs.targetHeight, syncSpeedDesc } diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index 3e68cfcd88..2a7a73e5ef 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -7,9 +7,6 @@ package blocksync import ( "context" - "fmt" - "sync" - "sync/atomic" "testing" "time" @@ -538,39 +535,3 @@ func TestDummyBlockSync(t *testing.T) { require.Zero(targetHeight) require.Empty(desc) } - -func TestBlockSyncerBugIssue3889(t *testing.T) { - var syncBlockIncrease uint64 - SyncStatus := func() (uint64, uint64, uint64, string) { - var syncSpeedDesc string - syncBlockIncrease := atomic.LoadUint64(&syncBlockIncrease) - increase := syncBlockIncrease - syncSpeedDesc = fmt.Sprintf("sync in progress at %.1f blocks/sec", float64(increase)/1) - - return uint64(0), 1, 2, syncSpeedDesc - } - - // BuildReport builds a report of block syncer - BuildReport := func() string { - startingHeight, tipHeight, targetHeight, syncSpeedDesc := SyncStatus() - return fmt.Sprintf( - "BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s", - startingHeight, - tipHeight, - targetHeight, - syncSpeedDesc, - ) - } - wait := sync.WaitGroup{} - wait.Add(6) - // read concurrently - for i := 0; i < 5; i++ { - go func() { - defer wait.Done() - startingHeight, tipHeight, targetHeight, syncSpeedDesc := SyncStatus() - t.Logf("BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s", startingHeight, tipHeight, targetHeight, syncSpeedDesc) - report := BuildReport() - t.Log(report) - }() - } -} From d8a8374fbe32b5e4163eafefccc9dfe5e745a5b1 Mon Sep 17 00:00:00 2001 From: millken Date: Tue, 20 Jun 2023 15:08:26 +0800 Subject: [PATCH 03/23] fix datarace --- blocksync/blocksync.go | 2 ++ blocksync/blocksync_test.go | 66 +++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index f4e3f1f845..c621188cff 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -328,6 +328,8 @@ func (bs *blockSyncer) syncStageChecker() { } func (bs *blockSyncer) SyncStatus() (uint64, uint64, uint64, string) { + bs.mu.RLock() + defer bs.mu.RUnlock() var syncSpeedDesc string syncBlockIncrease := atomic.LoadUint64(&bs.syncBlockIncrease) switch { diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index 2a7a73e5ef..c6ba3235f5 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -7,6 +7,9 @@ package blocksync import ( "context" + "fmt" + "sync" + "sync/atomic" "testing" "time" @@ -535,3 +538,66 @@ func TestDummyBlockSync(t *testing.T) { require.Zero(targetHeight) require.Empty(desc) } + +type test1 struct { + syncBlockIncrease uint64 + startingHeight uint64 + targetHeight uint64 + mu sync.RWMutex +} + +func (bs *test1) sync() { + for i := 0; i >= 0; i++ { + time.Sleep(time.Millisecond * 2) + bs.mu.Lock() + atomic.StoreUint64(&bs.syncBlockIncrease, uint64(i)) + bs.targetHeight = uint64(i) + bs.startingHeight = uint64(i) + bs.mu.Unlock() + } +} +func (bs *test1) tipHeightHandler() uint64 { + return 1 +} +func (bs *test1) SyncStatus() (uint64, uint64, uint64, string) { + bs.mu.RLock() + defer bs.mu.RUnlock() + var syncSpeedDesc string + syncBlockIncrease := atomic.LoadUint64(&bs.syncBlockIncrease) + switch { + case syncBlockIncrease == 1: + syncSpeedDesc = "synced to blockchain tip" + default: + syncSpeedDesc = fmt.Sprintf("sync in progress at %.1f blocks/sec", float64(syncBlockIncrease)) + } + return bs.startingHeight, bs.tipHeightHandler(), bs.targetHeight, syncSpeedDesc +} + +func (bs *test1) BuildReport() string { + startingHeight, tipHeight, targetHeight, syncSpeedDesc := bs.SyncStatus() + return fmt.Sprintf( + "BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s", + startingHeight, + tipHeight, + targetHeight, + syncSpeedDesc, + ) +} + +func TestBlockSyncerBugIssue3889(t *testing.T) { + bs := &test1{} + go bs.sync() + time.Sleep(time.Millisecond * 10) + wait := sync.WaitGroup{} + wait.Add(5) + for i := 0; i < 5; i++ { + go func(i int) { + defer wait.Done() + startingHeight, tipHeight, targetHeight, syncSpeedDesc := bs.SyncStatus() + t.Logf("BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s", startingHeight, tipHeight, targetHeight, syncSpeedDesc) + report := bs.BuildReport() + t.Log(report) + }(i) + } + wait.Wait() +} From 04b0bd9e6acbd94d9ec03084308b2548be4b0e99 Mon Sep 17 00:00:00 2001 From: millken Date: Mon, 26 Jun 2023 11:11:29 +0800 Subject: [PATCH 04/23] save --- blocksync/blocksync_test.go | 4 +- blocksync/blocksync_v2.go | 65 +++++++++++++++++++++++++++++++ chainservice/builder.go | 2 +- dispatcher/dispatcher.go | 2 +- server/itx/nodestats/nodestats.go | 2 +- 5 files changed, 70 insertions(+), 5 deletions(-) create mode 100644 blocksync/blocksync_v2.go diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index c6ba3235f5..b2b26f75cf 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -560,8 +560,8 @@ func (bs *test1) tipHeightHandler() uint64 { return 1 } func (bs *test1) SyncStatus() (uint64, uint64, uint64, string) { - bs.mu.RLock() - defer bs.mu.RUnlock() + // bs.mu.RLock() + // defer bs.mu.RUnlock() var syncSpeedDesc string syncBlockIncrease := atomic.LoadUint64(&bs.syncBlockIncrease) switch { diff --git a/blocksync/blocksync_v2.go b/blocksync/blocksync_v2.go new file mode 100644 index 0000000000..5c8b5b91ae --- /dev/null +++ b/blocksync/blocksync_v2.go @@ -0,0 +1,65 @@ +package blocksync + +import ( + "context" + + "github.com/iotexproject/iotex-core/blockchain/block" + "github.com/libp2p/go-libp2p-core/peer" +) + +type blockSyncerV2 struct { + tipHeightHandler TipHeight + blockByHeightHandler BlockByHeight + commitBlockHandler CommitBlock + p2pNeighbor Neighbors + unicastOutbound UniCastOutbound + blockP2pPeer BlockPeer +} + +func NewBlockSyncerV2( + cfg Config, + tipHeightHandler TipHeight, + blockByHeightHandler BlockByHeight, + commitBlockHandler CommitBlock, + p2pNeighbor Neighbors, + uniCastHandler UniCastOutbound, + blockP2pPeer BlockPeer, +) (BlockSync, error) { + bs := &blockSyncerV2{ + tipHeightHandler: tipHeightHandler, + blockByHeightHandler: blockByHeightHandler, + commitBlockHandler: commitBlockHandler, + p2pNeighbor: p2pNeighbor, + unicastOutbound: uniCastHandler, + blockP2pPeer: blockP2pPeer, + } + return bs, nil +} + +func (*blockSyncerV2) Start(context.Context) error { + return nil +} + +func (*blockSyncerV2) Stop(context.Context) error { + return nil +} + +func (*blockSyncerV2) TargetHeight() uint64 { + return 0 +} + +func (*blockSyncerV2) ProcessSyncRequest(context.Context, peer.AddrInfo, uint64, uint64) error { + return nil +} + +func (*blockSyncerV2) ProcessBlock(context.Context, string, *block.Block) error { + return nil +} + +func (*blockSyncerV2) SyncStatus() (uint64, uint64, uint64, string) { + return 0, 0, 0, "" +} + +func (*blockSyncerV2) BuildReport() string { + return "" +} diff --git a/chainservice/builder.go b/chainservice/builder.go index c397f5d4b9..c3f5f8b611 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -500,7 +500,7 @@ func (builder *Builder) buildBlockSyncer() error { log.L().Debug("Failed to commit block.", zap.Error(err), zap.Uint64("height", blk.Height())) return err } - log.L().Info("Successfully committed block.", zap.Uint64("height", blk.Height())) + //log.L().Info("Successfully committed block.", zap.Uint64("height", blk.Height())) consens.Calibrate(blk.Height()) return nil }, diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index 19215e16f4..3d826a46c7 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -220,7 +220,7 @@ func (d *IotxDispatcher) actionHandler() { case a := <-d.actionChan: d.handleActionMsg(a) case <-d.quit: - log.L().Info("action handler is terminated.") + //log.L().Info("action handler is terminated.") return } } diff --git a/server/itx/nodestats/nodestats.go b/server/itx/nodestats/nodestats.go index 77dee15c99..d16b14fde8 100644 --- a/server/itx/nodestats/nodestats.go +++ b/server/itx/nodestats/nodestats.go @@ -11,7 +11,7 @@ import ( const ( // PeriodicReportInterval is the interval for generating periodic reports - PeriodicReportInterval = 5 * time.Minute + PeriodicReportInterval = 20 * time.Second ) // StatsReporter is the interface for stats reporter From dc4c078ca4aea3569b5e579af0daf7f7f62c0dac Mon Sep 17 00:00:00 2001 From: millken Date: Fri, 7 Jul 2023 12:15:41 +0800 Subject: [PATCH 05/23] savework --- blocksync/blocksync_test.go | 174 +++++++++++++++++++++++++- blocksync/blocksync_v2.go | 242 ++++++++++++++++++++++++++++++++++-- blocksync/buffer_test.go | 7 ++ 3 files changed, 406 insertions(+), 17 deletions(-) diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index b2b26f75cf..3274c58ae0 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -15,12 +15,6 @@ import ( "github.com/golang/mock/gomock" "github.com/iotexproject/go-pkgs/hash" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" - "github.com/iotexproject/iotex-core/action/protocol" "github.com/iotexproject/iotex-core/action/protocol/account" accountutil "github.com/iotexproject/iotex-core/action/protocol/account/util" @@ -40,6 +34,13 @@ import ( "github.com/iotexproject/iotex-core/test/mock/mock_blocksync" "github.com/iotexproject/iotex-core/test/mock/mock_consensus" "github.com/iotexproject/iotex-core/testutil" + goproto "github.com/iotexproject/iotex-proto/golang" + "github.com/iotexproject/iotex-proto/golang/iotexrpc" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" ) type testConfig struct { @@ -601,3 +602,164 @@ func TestBlockSyncerBugIssue3889(t *testing.T) { } wait.Wait() } + +func TestBlockSyncV1(t *testing.T) { + require := require.New(t) + cfg := DefaultConfig + cfg.Interval = 10 * time.Second + cfg.BufferSize = 200 + cfg.MaxRepeat = 3 + cfg.RepeatDecayStep = 3 + + var tipHeight uint64 + chanProcessBlk := make(chan *block.Block) + + bs, err := NewBlockSyncer(cfg, func() uint64 { + return atomic.LoadUint64(&tipHeight) + }, nil, func(b *block.Block) error { + //t.Logf("commit block %d", b.Height()) + // rnd := rand.Intn(200) + // time.Sleep(time.Duration(rnd) * time.Millisecond) + atomic.StoreUint64(&tipHeight, b.Height()) + return nil + }, func() ([]peer.AddrInfo, error) { + return []peer.AddrInfo{{ + ID: peer.ID("test"), + }}, nil + }, func(ctx context.Context, ai peer.AddrInfo, m proto.Message) error { + msgType, err := goproto.GetTypeFromRPCMsg(m) + if err != nil { + return err + } + switch msgType { + case iotexrpc.MessageType_BLOCK_REQUEST: + msgBody, err := proto.Marshal(m) + if err != nil { + return err + } + var req iotexrpc.BlockSync + if err := proto.Unmarshal(msgBody, &req); err != nil { + return err + } + //simulate network delay + // rnd := rand.Intn(500) + // time.Sleep(time.Duration(rnd) * time.Millisecond) + // send block + + for i := req.Start; i <= req.End; i++ { + blk, _ := block.NewTestingBuilder(). + SetHeight(i).SignAndBuild(identityset.PrivateKey(0)) + chanProcessBlk <- &blk + } + } + return nil + }, func(s string) { + t.Logf("block p2p %s", s) + }) + ctx := context.Background() + require.NoError(err) + require.NotNil(bs) + go func(bs BlockSync) { + for blk := range chanProcessBlk { + bs.ProcessBlock(ctx, "test", blk) + } + }(bs) + go func(bs BlockSync) { + for { + blk, _ := block.NewTestingBuilder(). + SetHeight(20000000).SignAndBuild(identityset.PrivateKey(0)) + bs.ProcessBlock(ctx, "test", &blk) + time.Sleep(time.Second * 1) + } + }(bs) + require.NoError(bs.Start(ctx)) + + go func(bs BlockSync) { + for { + time.Sleep(cfg.Interval) + t.Log(bs.BuildReport()) + } + }(bs) + + defer func() { + require.NoError(bs.Stop(ctx)) + }() + + time.Sleep(time.Second * 70) +} + +func TestBlockSyncV2(t *testing.T) { + require := require.New(t) + cfg := DefaultConfig + cfg.Interval = 10 * time.Second + cfg.BufferSize = 200 + cfg.MaxRepeat = 3 + cfg.RepeatDecayStep = 3 + var tipHeight uint64 + chanProcessBlk := make(chan *block.Block) + + bs, err := NewBlockSyncerV2(cfg, func() uint64 { + return atomic.LoadUint64(&tipHeight) + }, nil, func(b *block.Block) error { + //t.Logf("commit block %d", b.Height()) + atomic.StoreUint64(&tipHeight, b.Height()) + return nil + }, func() ([]peer.AddrInfo, error) { + return []peer.AddrInfo{{ + ID: peer.ID("test"), + }}, nil + }, func(ctx context.Context, ai peer.AddrInfo, m proto.Message) error { + msgType, err := goproto.GetTypeFromRPCMsg(m) + if err != nil { + return err + } + switch msgType { + case iotexrpc.MessageType_BLOCK_REQUEST: + msgBody, err := proto.Marshal(m) + if err != nil { + return err + } + var req iotexrpc.BlockSync + if err := proto.Unmarshal(msgBody, &req); err != nil { + return err + } + // send block + for i := req.Start; i <= req.End; i++ { + blk, _ := block.NewTestingBuilder(). + SetHeight(i).SignAndBuild(identityset.PrivateKey(0)) + chanProcessBlk <- &blk + } + } + return nil + }, func(s string) { + t.Logf("block p2p %s", s) + }) + ctx := context.Background() + require.NoError(err) + require.NotNil(bs) + go func(bs BlockSync) { + for blk := range chanProcessBlk { + bs.ProcessBlock(ctx, "test", blk) + } + }(bs) + go func(bs BlockSync) { + for { + blk, _ := block.NewTestingBuilder(). + SetHeight(20000000).SignAndBuild(identityset.PrivateKey(0)) + bs.ProcessBlock(ctx, "test", &blk) + time.Sleep(time.Second * 1) + } + }(bs) + require.NoError(bs.Start(ctx)) + go func(bs BlockSync) { + for { + time.Sleep(cfg.Interval) + t.Log(bs.BuildReport()) + } + }(bs) + defer func() { + require.NoError(bs.Stop(ctx)) + }() + + time.Sleep(time.Second * 70) +} diff --git a/blocksync/blocksync_v2.go b/blocksync/blocksync_v2.go index 5c8b5b91ae..e6826e5e51 100644 --- a/blocksync/blocksync_v2.go +++ b/blocksync/blocksync_v2.go @@ -2,18 +2,46 @@ package blocksync import ( "context" + "fmt" + "sync" + "sync/atomic" + "time" "github.com/iotexproject/iotex-core/blockchain/block" + "github.com/iotexproject/iotex-core/pkg/fastrand" + "github.com/iotexproject/iotex-core/pkg/log" + "github.com/iotexproject/iotex-core/pkg/routine" + "github.com/iotexproject/iotex-proto/golang/iotexrpc" "github.com/libp2p/go-libp2p-core/peer" + "github.com/pkg/errors" + "go.uber.org/zap" ) type blockSyncerV2 struct { + cfg Config + buf *blockBuffer + tipHeightHandler TipHeight blockByHeightHandler BlockByHeight commitBlockHandler CommitBlock p2pNeighbor Neighbors unicastOutbound UniCastOutbound blockP2pPeer BlockPeer + + syncTask *routine.DelayTask + syncStageTask *routine.RecurringTask + + syncStageHeight uint64 + syncBlockIncrease uint64 + + startingHeight uint64 // block number this node started to synchronise from + lastTip uint64 + lastTipUpdateTime time.Time + targetHeight uint64 // block number of the highest block header this node has received from peers + requestMaxHeight uint64 + mu sync.RWMutex + + trigger chan struct{} } func NewBlockSyncerV2( @@ -26,40 +54,232 @@ func NewBlockSyncerV2( blockP2pPeer BlockPeer, ) (BlockSync, error) { bs := &blockSyncerV2{ + cfg: cfg, + lastTipUpdateTime: time.Now(), + buf: newBlockBuffer(cfg.BufferSize, cfg.IntervalSize), tipHeightHandler: tipHeightHandler, blockByHeightHandler: blockByHeightHandler, commitBlockHandler: commitBlockHandler, p2pNeighbor: p2pNeighbor, unicastOutbound: uniCastHandler, blockP2pPeer: blockP2pPeer, + targetHeight: 0, + trigger: make(chan struct{}), + } + if bs.cfg.Interval != 0 { + bs.syncTask = routine.NewDelayTask(bs.syncBlocks, bs.cfg.Interval) + bs.syncStageTask = routine.NewRecurringTask(bs.syncStageChecker, bs.cfg.Interval) } + atomic.StoreUint64(&bs.syncBlockIncrease, 0) return bs, nil } -func (*blockSyncerV2) Start(context.Context) error { - return nil +func (bs *blockSyncerV2) commitBlocks(blks []*peerBlock) bool { + for _, blk := range blks { + if blk == nil { + continue + } + err := bs.commitBlockHandler(blk.block) + if err == nil { + return true + } + bs.blockP2pPeer(blk.pid) + log.L().Error("failed to commit block", zap.Error(err), zap.Uint64("height", blk.block.Height()), zap.String("peer", blk.pid)) + } + return false +} + +func (bs *blockSyncerV2) flushInfo() (time.Time, uint64) { + bs.mu.RLock() + defer bs.mu.RUnlock() + + return bs.lastTipUpdateTime, bs.targetHeight +} +func (bs *blockSyncerV2) syncBlocks() { + bs.sync() + for { + select { + case <-bs.trigger: + bs.sync() + default: + time.Sleep(1 * time.Second) + } + } +} + +func (bs *blockSyncerV2) sync() { + _, targetHeight := bs.flushInfo() + intervals := bs.buf.GetBlocksIntervalsToSync(bs.tipHeightHandler(), targetHeight) + // no sync + if len(intervals) == 0 { + return + } + // start syncing + bs.startingHeight = bs.tipHeightHandler() + log.L().Info("block sync intervals.", + zap.Any("intervals", intervals), + zap.Uint64("targetHeight", targetHeight)) + atomic.StoreUint64(&bs.requestMaxHeight, intervals[len(intervals)-1].End) + for i, interval := range intervals { + bs.requestBlock(context.Background(), interval.Start, interval.End, bs.cfg.MaxRepeat-i/bs.cfg.RepeatDecayStep) + } } -func (*blockSyncerV2) Stop(context.Context) error { +func (bs *blockSyncerV2) requestBlock(ctx context.Context, start uint64, end uint64, repeat int) { + peers, err := bs.p2pNeighbor() + if err != nil { + log.L().Error("failed to get neighbours", zap.Error(err)) + return + } + if len(peers) == 0 { + log.L().Error("no peers") + return + } + if repeat < 2 { + repeat = 2 + } + if repeat > len(peers) { + repeat = len(peers) + } + for i := 0; i < repeat; i++ { + peer := peers[fastrand.Uint32n(uint32(len(peers)))] //may get the same node + if err := bs.unicastOutbound( + ctx, + peer, + &iotexrpc.BlockSync{Start: start, End: end}, + ); err != nil { + log.L().Error("failed to request blocks", zap.Error(err), zap.String("peer", peer.ID.Pretty()), zap.Uint64("start", start), zap.Uint64("end", end)) + } + } +} + +func (bs *blockSyncerV2) TargetHeight() uint64 { + bs.mu.RLock() + defer bs.mu.RUnlock() + return bs.targetHeight +} + +// Start starts a block syncer +func (bs *blockSyncerV2) Start(ctx context.Context) error { + log.L().Debug("Starting block syncer.") + if bs.syncTask != nil { + if err := bs.syncTask.Start(ctx); err != nil { + return err + } + } + if bs.syncStageTask != nil { + return bs.syncStageTask.Start(ctx) + } return nil } -func (*blockSyncerV2) TargetHeight() uint64 { - return 0 +// Stop stops a block syncer +func (bs *blockSyncerV2) Stop(ctx context.Context) error { + log.L().Debug("Stopping block syncer.") + if bs.syncStageTask != nil { + if err := bs.syncStageTask.Stop(ctx); err != nil { + return err + } + } + if bs.syncTask != nil { + if err := bs.syncTask.Stop(ctx); err != nil { + return err + } + } + return nil } -func (*blockSyncerV2) ProcessSyncRequest(context.Context, peer.AddrInfo, uint64, uint64) error { +func (bs *blockSyncerV2) ProcessBlock(ctx context.Context, peer string, blk *block.Block) error { + if blk == nil { + return errors.New("block is nil") + } + + tip := bs.tipHeightHandler() + added, targetHeight := bs.buf.AddBlock(tip, newPeerBlock(peer, blk)) + bs.mu.Lock() + defer bs.mu.Unlock() + if targetHeight > bs.targetHeight { + bs.targetHeight = targetHeight + } + if !added { + return nil + } + syncedHeight := tip + for { + if !bs.commitBlocks(bs.buf.Pop(syncedHeight + 1)) { + break + } + syncedHeight++ + } + bs.buf.Cleanup(syncedHeight) + log.L().Debug("flush blocks", zap.Uint64("start", tip), zap.Uint64("end", syncedHeight)) + if syncedHeight > bs.lastTip { + bs.lastTip = syncedHeight + bs.lastTipUpdateTime = time.Now() + } + requestMaxHeight := atomic.LoadUint64(&bs.requestMaxHeight) + if syncedHeight >= requestMaxHeight { + bs.trigger <- struct{}{} + atomic.SwapUint64(&bs.requestMaxHeight, 0) + } return nil } -func (*blockSyncerV2) ProcessBlock(context.Context, string, *block.Block) error { +func (bs *blockSyncerV2) ProcessSyncRequest(ctx context.Context, peer peer.AddrInfo, start uint64, end uint64) error { + tip := bs.tipHeightHandler() + if end > tip { + log.L().Debug( + "Do not have requested blocks", + zap.Uint64("start", start), + zap.Uint64("end", end), + zap.Uint64("tipHeight", tip), + ) + end = tip + } + // TODO: send back multiple blocks in one shot + for i := start; i <= end; i++ { + // TODO: fetch block from buffer + blk, err := bs.blockByHeightHandler(i) + if err != nil { + return err + } + syncCtx, cancel := context.WithTimeout(ctx, bs.cfg.ProcessSyncRequestTTL) + defer cancel() + if err := bs.unicastOutbound(syncCtx, peer, blk.ConvertToBlockPb()); err != nil { + return err + } + } return nil } -func (*blockSyncerV2) SyncStatus() (uint64, uint64, uint64, string) { - return 0, 0, 0, "" +func (bs *blockSyncerV2) syncStageChecker() { + tipHeight := bs.tipHeightHandler() + atomic.StoreUint64(&bs.syncBlockIncrease, tipHeight-bs.syncStageHeight) + bs.syncStageHeight = tipHeight +} + +func (bs *blockSyncerV2) SyncStatus() (uint64, uint64, uint64, string) { + var syncSpeedDesc string + syncBlockIncrease := atomic.LoadUint64(&bs.syncBlockIncrease) + switch { + case syncBlockIncrease == 1: + syncSpeedDesc = "synced to blockchain tip" + case bs.cfg.Interval == 0: + syncSpeedDesc = "no sync task" + default: + syncSpeedDesc = fmt.Sprintf("sync in progress at %.1f blocks/sec", float64(syncBlockIncrease)/bs.cfg.Interval.Seconds()) + } + return bs.startingHeight, bs.tipHeightHandler(), bs.targetHeight, syncSpeedDesc } -func (*blockSyncerV2) BuildReport() string { - return "" +// BuildReport builds a report of block syncer +func (bs *blockSyncerV2) BuildReport() string { + startingHeight, tipHeight, targetHeight, syncSpeedDesc := bs.SyncStatus() + return fmt.Sprintf( + "BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s", + startingHeight, + tipHeight, + targetHeight, + syncSpeedDesc, + ) } diff --git a/blocksync/buffer_test.go b/blocksync/buffer_test.go index 9938a8eeb4..92f1cc2ae0 100644 --- a/blocksync/buffer_test.go +++ b/blocksync/buffer_test.go @@ -262,3 +262,10 @@ func TestBlockBufferGetBlocksIntervalsToSync(t *testing.T) { // There should always have at least 1 interval range to sync assert.Len(b.GetBlocksIntervalsToSync(chain.TipHeight(), 0), 1) } + +func TestBuffer(t *testing.T) { + bb := newBlockBuffer(200, 20) + bb.blockQueues[3] = newUniQueue() + iv := bb.GetBlocksIntervalsToSync(0, 2000) + t.Logf("iv: %v", iv) +} From f0e0daac5d6906a8df126ec424b7f5ef60fa6336 Mon Sep 17 00:00:00 2001 From: millken Date: Fri, 7 Jul 2023 14:22:36 +0800 Subject: [PATCH 06/23] update nodeinfo --- server/itx/nodestats/nodestats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/itx/nodestats/nodestats.go b/server/itx/nodestats/nodestats.go index d16b14fde8..77dee15c99 100644 --- a/server/itx/nodestats/nodestats.go +++ b/server/itx/nodestats/nodestats.go @@ -11,7 +11,7 @@ import ( const ( // PeriodicReportInterval is the interval for generating periodic reports - PeriodicReportInterval = 20 * time.Second + PeriodicReportInterval = 5 * time.Minute ) // StatsReporter is the interface for stats reporter From 01d235bd8189b08ab5a73fd2ed28da3b09795f17 Mon Sep 17 00:00:00 2001 From: millken Date: Fri, 7 Jul 2023 17:58:37 +0800 Subject: [PATCH 07/23] update blocksync2 --- blocksync/blocksync_v2.go | 25 +++++++++++++++---------- chainservice/builder.go | 2 +- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/blocksync/blocksync_v2.go b/blocksync/blocksync_v2.go index e6826e5e51..a0631a6b0e 100644 --- a/blocksync/blocksync_v2.go +++ b/blocksync/blocksync_v2.go @@ -9,6 +9,7 @@ import ( "github.com/iotexproject/iotex-core/blockchain/block" "github.com/iotexproject/iotex-core/pkg/fastrand" + "github.com/iotexproject/iotex-core/pkg/lifecycle" "github.com/iotexproject/iotex-core/pkg/log" "github.com/iotexproject/iotex-core/pkg/routine" "github.com/iotexproject/iotex-proto/golang/iotexrpc" @@ -19,6 +20,7 @@ import ( type blockSyncerV2 struct { cfg Config + lifecycle.Readiness buf *blockBuffer tipHeightHandler TipHeight @@ -67,7 +69,7 @@ func NewBlockSyncerV2( trigger: make(chan struct{}), } if bs.cfg.Interval != 0 { - bs.syncTask = routine.NewDelayTask(bs.syncBlocks, bs.cfg.Interval) + bs.syncTask = routine.NewDelayTask(bs.syncWork, bs.cfg.Interval) bs.syncStageTask = routine.NewRecurringTask(bs.syncStageChecker, bs.cfg.Interval) } atomic.StoreUint64(&bs.syncBlockIncrease, 0) @@ -95,15 +97,11 @@ func (bs *blockSyncerV2) flushInfo() (time.Time, uint64) { return bs.lastTipUpdateTime, bs.targetHeight } -func (bs *blockSyncerV2) syncBlocks() { +func (bs *blockSyncerV2) syncWork() { bs.sync() - for { - select { - case <-bs.trigger: - bs.sync() - default: - time.Sleep(1 * time.Second) - } + for range bs.trigger { + time.Sleep(1 * time.Second) //limit the frequency of sync + bs.sync() } } @@ -162,6 +160,9 @@ func (bs *blockSyncerV2) TargetHeight() uint64 { // Start starts a block syncer func (bs *blockSyncerV2) Start(ctx context.Context) error { log.L().Debug("Starting block syncer.") + if err := bs.TurnOn(); err != nil { + return err + } if bs.syncTask != nil { if err := bs.syncTask.Start(ctx); err != nil { return err @@ -176,6 +177,10 @@ func (bs *blockSyncerV2) Start(ctx context.Context) error { // Stop stops a block syncer func (bs *blockSyncerV2) Stop(ctx context.Context) error { log.L().Debug("Stopping block syncer.") + if err := bs.TurnOff(); err != nil { + return err + } + close(bs.trigger) if bs.syncStageTask != nil { if err := bs.syncStageTask.Stop(ctx); err != nil { return err @@ -218,7 +223,7 @@ func (bs *blockSyncerV2) ProcessBlock(ctx context.Context, peer string, blk *blo bs.lastTipUpdateTime = time.Now() } requestMaxHeight := atomic.LoadUint64(&bs.requestMaxHeight) - if syncedHeight >= requestMaxHeight { + if syncedHeight >= requestMaxHeight && bs.IsReady() { bs.trigger <- struct{}{} atomic.SwapUint64(&bs.requestMaxHeight, 0) } diff --git a/chainservice/builder.go b/chainservice/builder.go index c3f5f8b611..571fd3f097 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -465,7 +465,7 @@ func (builder *Builder) buildBlockSyncer() error { chain := builder.cs.chain consens := builder.cs.consensus - blocksync, err := blocksync.NewBlockSyncer( + blocksync, err := blocksync.NewBlockSyncerV2( builder.cfg.BlockSync, chain.TipHeight, builder.cs.blockdao.GetBlockByHeight, From 3af87c4bc88e29fd9241a957ed7f4827e2a58017 Mon Sep 17 00:00:00 2001 From: millken Date: Tue, 11 Jul 2023 10:34:25 +0800 Subject: [PATCH 08/23] update blocksync --- blocksync/blocksync.go | 42 ++++-- blocksync/blocksync_test.go | 91 +---------- blocksync/blocksync_v2.go | 290 ------------------------------------ blocksync/config.go | 2 + chainservice/builder.go | 4 +- dispatcher/dispatcher.go | 2 +- 6 files changed, 41 insertions(+), 390 deletions(-) delete mode 100644 blocksync/blocksync_v2.go diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index c621188cff..ad9ccf6db2 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -59,6 +59,7 @@ type ( // blockSyncer implements BlockSync interface blockSyncer struct { cfg Config + lifecycle.Readiness buf *blockBuffer tipHeightHandler TipHeight @@ -68,7 +69,7 @@ type ( unicastOutbound UniCastOutbound blockP2pPeer BlockPeer - syncTask *routine.RecurringTask + syncTask *routine.DelayTask syncStageTask *routine.RecurringTask syncStageHeight uint64 @@ -78,7 +79,9 @@ type ( lastTip uint64 lastTipUpdateTime time.Time targetHeight uint64 // block number of the highest block header this node has received from peers + requestMaxHeight uint64 mu sync.RWMutex + trigger chan struct{} } peerBlock struct { @@ -148,9 +151,10 @@ func NewBlockSyncer( unicastOutbound: uniCastHandler, blockP2pPeer: blockP2pPeer, targetHeight: 0, + trigger: make(chan struct{}), } if bs.cfg.Interval != 0 { - bs.syncTask = routine.NewRecurringTask(bs.sync, bs.cfg.Interval) + bs.syncTask = routine.NewDelayTask(bs.syncWork, bs.cfg.Interval) bs.syncStageTask = routine.NewRecurringTask(bs.syncStageChecker, bs.cfg.Interval) } atomic.StoreUint64(&bs.syncBlockIncrease, 0) @@ -173,17 +177,21 @@ func (bs *blockSyncer) commitBlocks(blks []*peerBlock) bool { } func (bs *blockSyncer) flushInfo() (time.Time, uint64) { - bs.mu.Lock() - defer bs.mu.Unlock() + bs.mu.RLock() + defer bs.mu.RUnlock() return bs.lastTipUpdateTime, bs.targetHeight } +func (bs *blockSyncer) syncWork() { + bs.sync() + for range bs.trigger { + time.Sleep(bs.cfg.RateLimitInterval) //limit the frequency of sync + bs.sync() + } +} func (bs *blockSyncer) sync() { - updateTime, targetHeight := bs.flushInfo() - if updateTime.Add(bs.cfg.Interval).After(time.Now()) { - return - } + _, targetHeight := bs.flushInfo() intervals := bs.buf.GetBlocksIntervalsToSync(bs.tipHeightHandler(), targetHeight) // no sync if len(intervals) == 0 { @@ -194,6 +202,7 @@ func (bs *blockSyncer) sync() { log.L().Info("block sync intervals.", zap.Any("intervals", intervals), zap.Uint64("targetHeight", targetHeight)) + atomic.StoreUint64(&bs.requestMaxHeight, intervals[len(intervals)-1].End) for i, interval := range intervals { bs.requestBlock(context.Background(), interval.Start, interval.End, bs.cfg.MaxRepeat-i/bs.cfg.RepeatDecayStep) } @@ -216,7 +225,7 @@ func (bs *blockSyncer) requestBlock(ctx context.Context, start uint64, end uint6 repeat = len(peers) } for i := 0; i < repeat; i++ { - peer := peers[fastrand.Uint32n(uint32(len(peers)))] + peer := peers[fastrand.Uint32n(uint32(len(peers)))] //may get the same node if err := bs.unicastOutbound( ctx, peer, @@ -236,6 +245,9 @@ func (bs *blockSyncer) TargetHeight() uint64 { // Start starts a block syncer func (bs *blockSyncer) Start(ctx context.Context) error { log.L().Debug("Starting block syncer.") + if err := bs.TurnOn(); err != nil { + return err + } if bs.syncTask != nil { if err := bs.syncTask.Start(ctx); err != nil { return err @@ -250,6 +262,10 @@ func (bs *blockSyncer) Start(ctx context.Context) error { // Stop stops a block syncer func (bs *blockSyncer) Stop(ctx context.Context) error { log.L().Debug("Stopping block syncer.") + if err := bs.TurnOff(); err != nil { + return err + } + close(bs.trigger) if bs.syncStageTask != nil { if err := bs.syncStageTask.Stop(ctx); err != nil { return err @@ -263,6 +279,7 @@ func (bs *blockSyncer) Stop(ctx context.Context) error { return nil } +// ProcessBlock processes a block func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block.Block) error { if blk == nil { return errors.New("block is nil") @@ -291,9 +308,15 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block bs.lastTip = syncedHeight bs.lastTipUpdateTime = time.Now() } + requestMaxHeight := atomic.LoadUint64(&bs.requestMaxHeight) + if syncedHeight >= requestMaxHeight && bs.IsReady() { + bs.trigger <- struct{}{} + atomic.SwapUint64(&bs.requestMaxHeight, 0) + } return nil } +// ProcessSyncRequest processes a sync request func (bs *blockSyncer) ProcessSyncRequest(ctx context.Context, peer peer.AddrInfo, start uint64, end uint64) error { tip := bs.tipHeightHandler() if end > tip { @@ -327,6 +350,7 @@ func (bs *blockSyncer) syncStageChecker() { bs.syncStageHeight = tipHeight } +// SyncStatus returns the status of block syncer func (bs *blockSyncer) SyncStatus() (uint64, uint64, uint64, string) { bs.mu.RLock() defer bs.mu.RUnlock() diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index 3274c58ae0..18f6021db0 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -603,103 +603,18 @@ func TestBlockSyncerBugIssue3889(t *testing.T) { wait.Wait() } -func TestBlockSyncV1(t *testing.T) { +func TestBlockSync(t *testing.T) { require := require.New(t) cfg := DefaultConfig - cfg.Interval = 10 * time.Second + cfg.Interval = 1 * time.Second cfg.BufferSize = 200 cfg.MaxRepeat = 3 cfg.RepeatDecayStep = 3 - var tipHeight uint64 chanProcessBlk := make(chan *block.Block) bs, err := NewBlockSyncer(cfg, func() uint64 { return atomic.LoadUint64(&tipHeight) - }, nil, func(b *block.Block) error { - //t.Logf("commit block %d", b.Height()) - // rnd := rand.Intn(200) - // time.Sleep(time.Duration(rnd) * time.Millisecond) - atomic.StoreUint64(&tipHeight, b.Height()) - return nil - }, func() ([]peer.AddrInfo, error) { - return []peer.AddrInfo{{ - ID: peer.ID("test"), - }}, nil - }, func(ctx context.Context, ai peer.AddrInfo, m proto.Message) error { - msgType, err := goproto.GetTypeFromRPCMsg(m) - if err != nil { - return err - } - switch msgType { - case iotexrpc.MessageType_BLOCK_REQUEST: - msgBody, err := proto.Marshal(m) - if err != nil { - return err - } - var req iotexrpc.BlockSync - if err := proto.Unmarshal(msgBody, &req); err != nil { - return err - } - //simulate network delay - // rnd := rand.Intn(500) - // time.Sleep(time.Duration(rnd) * time.Millisecond) - // send block - - for i := req.Start; i <= req.End; i++ { - blk, _ := block.NewTestingBuilder(). - SetHeight(i).SignAndBuild(identityset.PrivateKey(0)) - chanProcessBlk <- &blk - } - } - return nil - }, func(s string) { - t.Logf("block p2p %s", s) - }) - ctx := context.Background() - require.NoError(err) - require.NotNil(bs) - go func(bs BlockSync) { - for blk := range chanProcessBlk { - bs.ProcessBlock(ctx, "test", blk) - } - }(bs) - go func(bs BlockSync) { - for { - blk, _ := block.NewTestingBuilder(). - SetHeight(20000000).SignAndBuild(identityset.PrivateKey(0)) - bs.ProcessBlock(ctx, "test", &blk) - time.Sleep(time.Second * 1) - } - }(bs) - require.NoError(bs.Start(ctx)) - - go func(bs BlockSync) { - for { - time.Sleep(cfg.Interval) - t.Log(bs.BuildReport()) - } - }(bs) - - defer func() { - require.NoError(bs.Stop(ctx)) - }() - - time.Sleep(time.Second * 70) -} - -func TestBlockSyncV2(t *testing.T) { - require := require.New(t) - cfg := DefaultConfig - cfg.Interval = 10 * time.Second - cfg.BufferSize = 200 - cfg.MaxRepeat = 3 - cfg.RepeatDecayStep = 3 - var tipHeight uint64 - chanProcessBlk := make(chan *block.Block) - - bs, err := NewBlockSyncerV2(cfg, func() uint64 { - return atomic.LoadUint64(&tipHeight) }, nil, func(b *block.Block) error { //t.Logf("commit block %d", b.Height()) atomic.StoreUint64(&tipHeight, b.Height()) @@ -761,5 +676,5 @@ func TestBlockSyncV2(t *testing.T) { require.NoError(bs.Stop(ctx)) }() - time.Sleep(time.Second * 70) + time.Sleep(time.Second * 10) } diff --git a/blocksync/blocksync_v2.go b/blocksync/blocksync_v2.go deleted file mode 100644 index a0631a6b0e..0000000000 --- a/blocksync/blocksync_v2.go +++ /dev/null @@ -1,290 +0,0 @@ -package blocksync - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/iotexproject/iotex-core/blockchain/block" - "github.com/iotexproject/iotex-core/pkg/fastrand" - "github.com/iotexproject/iotex-core/pkg/lifecycle" - "github.com/iotexproject/iotex-core/pkg/log" - "github.com/iotexproject/iotex-core/pkg/routine" - "github.com/iotexproject/iotex-proto/golang/iotexrpc" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -type blockSyncerV2 struct { - cfg Config - lifecycle.Readiness - buf *blockBuffer - - tipHeightHandler TipHeight - blockByHeightHandler BlockByHeight - commitBlockHandler CommitBlock - p2pNeighbor Neighbors - unicastOutbound UniCastOutbound - blockP2pPeer BlockPeer - - syncTask *routine.DelayTask - syncStageTask *routine.RecurringTask - - syncStageHeight uint64 - syncBlockIncrease uint64 - - startingHeight uint64 // block number this node started to synchronise from - lastTip uint64 - lastTipUpdateTime time.Time - targetHeight uint64 // block number of the highest block header this node has received from peers - requestMaxHeight uint64 - mu sync.RWMutex - - trigger chan struct{} -} - -func NewBlockSyncerV2( - cfg Config, - tipHeightHandler TipHeight, - blockByHeightHandler BlockByHeight, - commitBlockHandler CommitBlock, - p2pNeighbor Neighbors, - uniCastHandler UniCastOutbound, - blockP2pPeer BlockPeer, -) (BlockSync, error) { - bs := &blockSyncerV2{ - cfg: cfg, - lastTipUpdateTime: time.Now(), - buf: newBlockBuffer(cfg.BufferSize, cfg.IntervalSize), - tipHeightHandler: tipHeightHandler, - blockByHeightHandler: blockByHeightHandler, - commitBlockHandler: commitBlockHandler, - p2pNeighbor: p2pNeighbor, - unicastOutbound: uniCastHandler, - blockP2pPeer: blockP2pPeer, - targetHeight: 0, - trigger: make(chan struct{}), - } - if bs.cfg.Interval != 0 { - bs.syncTask = routine.NewDelayTask(bs.syncWork, bs.cfg.Interval) - bs.syncStageTask = routine.NewRecurringTask(bs.syncStageChecker, bs.cfg.Interval) - } - atomic.StoreUint64(&bs.syncBlockIncrease, 0) - return bs, nil -} - -func (bs *blockSyncerV2) commitBlocks(blks []*peerBlock) bool { - for _, blk := range blks { - if blk == nil { - continue - } - err := bs.commitBlockHandler(blk.block) - if err == nil { - return true - } - bs.blockP2pPeer(blk.pid) - log.L().Error("failed to commit block", zap.Error(err), zap.Uint64("height", blk.block.Height()), zap.String("peer", blk.pid)) - } - return false -} - -func (bs *blockSyncerV2) flushInfo() (time.Time, uint64) { - bs.mu.RLock() - defer bs.mu.RUnlock() - - return bs.lastTipUpdateTime, bs.targetHeight -} -func (bs *blockSyncerV2) syncWork() { - bs.sync() - for range bs.trigger { - time.Sleep(1 * time.Second) //limit the frequency of sync - bs.sync() - } -} - -func (bs *blockSyncerV2) sync() { - _, targetHeight := bs.flushInfo() - intervals := bs.buf.GetBlocksIntervalsToSync(bs.tipHeightHandler(), targetHeight) - // no sync - if len(intervals) == 0 { - return - } - // start syncing - bs.startingHeight = bs.tipHeightHandler() - log.L().Info("block sync intervals.", - zap.Any("intervals", intervals), - zap.Uint64("targetHeight", targetHeight)) - atomic.StoreUint64(&bs.requestMaxHeight, intervals[len(intervals)-1].End) - for i, interval := range intervals { - bs.requestBlock(context.Background(), interval.Start, interval.End, bs.cfg.MaxRepeat-i/bs.cfg.RepeatDecayStep) - } -} - -func (bs *blockSyncerV2) requestBlock(ctx context.Context, start uint64, end uint64, repeat int) { - peers, err := bs.p2pNeighbor() - if err != nil { - log.L().Error("failed to get neighbours", zap.Error(err)) - return - } - if len(peers) == 0 { - log.L().Error("no peers") - return - } - if repeat < 2 { - repeat = 2 - } - if repeat > len(peers) { - repeat = len(peers) - } - for i := 0; i < repeat; i++ { - peer := peers[fastrand.Uint32n(uint32(len(peers)))] //may get the same node - if err := bs.unicastOutbound( - ctx, - peer, - &iotexrpc.BlockSync{Start: start, End: end}, - ); err != nil { - log.L().Error("failed to request blocks", zap.Error(err), zap.String("peer", peer.ID.Pretty()), zap.Uint64("start", start), zap.Uint64("end", end)) - } - } -} - -func (bs *blockSyncerV2) TargetHeight() uint64 { - bs.mu.RLock() - defer bs.mu.RUnlock() - return bs.targetHeight -} - -// Start starts a block syncer -func (bs *blockSyncerV2) Start(ctx context.Context) error { - log.L().Debug("Starting block syncer.") - if err := bs.TurnOn(); err != nil { - return err - } - if bs.syncTask != nil { - if err := bs.syncTask.Start(ctx); err != nil { - return err - } - } - if bs.syncStageTask != nil { - return bs.syncStageTask.Start(ctx) - } - return nil -} - -// Stop stops a block syncer -func (bs *blockSyncerV2) Stop(ctx context.Context) error { - log.L().Debug("Stopping block syncer.") - if err := bs.TurnOff(); err != nil { - return err - } - close(bs.trigger) - if bs.syncStageTask != nil { - if err := bs.syncStageTask.Stop(ctx); err != nil { - return err - } - } - if bs.syncTask != nil { - if err := bs.syncTask.Stop(ctx); err != nil { - return err - } - } - return nil -} - -func (bs *blockSyncerV2) ProcessBlock(ctx context.Context, peer string, blk *block.Block) error { - if blk == nil { - return errors.New("block is nil") - } - - tip := bs.tipHeightHandler() - added, targetHeight := bs.buf.AddBlock(tip, newPeerBlock(peer, blk)) - bs.mu.Lock() - defer bs.mu.Unlock() - if targetHeight > bs.targetHeight { - bs.targetHeight = targetHeight - } - if !added { - return nil - } - syncedHeight := tip - for { - if !bs.commitBlocks(bs.buf.Pop(syncedHeight + 1)) { - break - } - syncedHeight++ - } - bs.buf.Cleanup(syncedHeight) - log.L().Debug("flush blocks", zap.Uint64("start", tip), zap.Uint64("end", syncedHeight)) - if syncedHeight > bs.lastTip { - bs.lastTip = syncedHeight - bs.lastTipUpdateTime = time.Now() - } - requestMaxHeight := atomic.LoadUint64(&bs.requestMaxHeight) - if syncedHeight >= requestMaxHeight && bs.IsReady() { - bs.trigger <- struct{}{} - atomic.SwapUint64(&bs.requestMaxHeight, 0) - } - return nil -} - -func (bs *blockSyncerV2) ProcessSyncRequest(ctx context.Context, peer peer.AddrInfo, start uint64, end uint64) error { - tip := bs.tipHeightHandler() - if end > tip { - log.L().Debug( - "Do not have requested blocks", - zap.Uint64("start", start), - zap.Uint64("end", end), - zap.Uint64("tipHeight", tip), - ) - end = tip - } - // TODO: send back multiple blocks in one shot - for i := start; i <= end; i++ { - // TODO: fetch block from buffer - blk, err := bs.blockByHeightHandler(i) - if err != nil { - return err - } - syncCtx, cancel := context.WithTimeout(ctx, bs.cfg.ProcessSyncRequestTTL) - defer cancel() - if err := bs.unicastOutbound(syncCtx, peer, blk.ConvertToBlockPb()); err != nil { - return err - } - } - return nil -} - -func (bs *blockSyncerV2) syncStageChecker() { - tipHeight := bs.tipHeightHandler() - atomic.StoreUint64(&bs.syncBlockIncrease, tipHeight-bs.syncStageHeight) - bs.syncStageHeight = tipHeight -} - -func (bs *blockSyncerV2) SyncStatus() (uint64, uint64, uint64, string) { - var syncSpeedDesc string - syncBlockIncrease := atomic.LoadUint64(&bs.syncBlockIncrease) - switch { - case syncBlockIncrease == 1: - syncSpeedDesc = "synced to blockchain tip" - case bs.cfg.Interval == 0: - syncSpeedDesc = "no sync task" - default: - syncSpeedDesc = fmt.Sprintf("sync in progress at %.1f blocks/sec", float64(syncBlockIncrease)/bs.cfg.Interval.Seconds()) - } - return bs.startingHeight, bs.tipHeightHandler(), bs.targetHeight, syncSpeedDesc -} - -// BuildReport builds a report of block syncer -func (bs *blockSyncerV2) BuildReport() string { - startingHeight, tipHeight, targetHeight, syncSpeedDesc := bs.SyncStatus() - return fmt.Sprintf( - "BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s", - startingHeight, - tipHeight, - targetHeight, - syncSpeedDesc, - ) -} diff --git a/blocksync/config.go b/blocksync/config.go index 32c3f6a85c..6914327b22 100644 --- a/blocksync/config.go +++ b/blocksync/config.go @@ -10,6 +10,7 @@ import "time" // Config is the config struct for the BlockSync type Config struct { Interval time.Duration `yaml:"interval"` // update duration + RateLimitInterval time.Duration `yaml:"rateLimitInterval"` ProcessSyncRequestTTL time.Duration `yaml:"processSyncRequestTTL"` BufferSize uint64 `yaml:"bufferSize"` IntervalSize uint64 `yaml:"intervalSize"` @@ -22,6 +23,7 @@ type Config struct { // DefaultConfig is the default config var DefaultConfig = Config{ Interval: 30 * time.Second, + RateLimitInterval: 1 * time.Second, ProcessSyncRequestTTL: 10 * time.Second, BufferSize: 200, IntervalSize: 20, diff --git a/chainservice/builder.go b/chainservice/builder.go index 571fd3f097..c397f5d4b9 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -465,7 +465,7 @@ func (builder *Builder) buildBlockSyncer() error { chain := builder.cs.chain consens := builder.cs.consensus - blocksync, err := blocksync.NewBlockSyncerV2( + blocksync, err := blocksync.NewBlockSyncer( builder.cfg.BlockSync, chain.TipHeight, builder.cs.blockdao.GetBlockByHeight, @@ -500,7 +500,7 @@ func (builder *Builder) buildBlockSyncer() error { log.L().Debug("Failed to commit block.", zap.Error(err), zap.Uint64("height", blk.Height())) return err } - //log.L().Info("Successfully committed block.", zap.Uint64("height", blk.Height())) + log.L().Info("Successfully committed block.", zap.Uint64("height", blk.Height())) consens.Calibrate(blk.Height()) return nil }, diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index 3d826a46c7..19215e16f4 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -220,7 +220,7 @@ func (d *IotxDispatcher) actionHandler() { case a := <-d.actionChan: d.handleActionMsg(a) case <-d.quit: - //log.L().Info("action handler is terminated.") + log.L().Info("action handler is terminated.") return } } From cd696a8f1c060fe6c0c753b84bf9b07127e8ce78 Mon Sep 17 00:00:00 2001 From: millken Date: Tue, 11 Jul 2023 11:38:45 +0800 Subject: [PATCH 09/23] skip test --- blocksync/blocksync_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index 18f6021db0..37edc83fdb 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -604,6 +604,7 @@ func TestBlockSyncerBugIssue3889(t *testing.T) { } func TestBlockSync(t *testing.T) { + t.SkipNow() require := require.New(t) cfg := DefaultConfig cfg.Interval = 1 * time.Second From 101224cac5540a8e8dc3a1ac7d5edff9fca64398 Mon Sep 17 00:00:00 2001 From: millken Date: Tue, 11 Jul 2023 15:46:13 +0800 Subject: [PATCH 10/23] fix test --- blocksync/blocksync.go | 6 ++++-- blocksync/blocksync_test.go | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index ad9ccf6db2..76cde98f83 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -183,7 +183,9 @@ func (bs *blockSyncer) flushInfo() (time.Time, uint64) { return bs.lastTipUpdateTime, bs.targetHeight } func (bs *blockSyncer) syncWork() { - bs.sync() + go time.AfterFunc(bs.cfg.RateLimitInterval, func() { + bs.trigger <- struct{}{} + }) for range bs.trigger { time.Sleep(bs.cfg.RateLimitInterval) //limit the frequency of sync bs.sync() @@ -309,7 +311,7 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block bs.lastTipUpdateTime = time.Now() } requestMaxHeight := atomic.LoadUint64(&bs.requestMaxHeight) - if syncedHeight >= requestMaxHeight && bs.IsReady() { + if requestMaxHeight > 0 && syncedHeight >= requestMaxHeight && bs.IsReady() { bs.trigger <- struct{}{} atomic.SwapUint64(&bs.requestMaxHeight, 0) } diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index 37edc83fdb..9a016d2b05 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -604,7 +604,7 @@ func TestBlockSyncerBugIssue3889(t *testing.T) { } func TestBlockSync(t *testing.T) { - t.SkipNow() + //t.SkipNow() require := require.New(t) cfg := DefaultConfig cfg.Interval = 1 * time.Second From 4c525f35c7c812723d71cd570faaf84662dc61f8 Mon Sep 17 00:00:00 2001 From: millken Date: Tue, 11 Jul 2023 20:31:15 +0800 Subject: [PATCH 11/23] fix test --- blocksync/blocksync_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index 9a016d2b05..37edc83fdb 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -604,7 +604,7 @@ func TestBlockSyncerBugIssue3889(t *testing.T) { } func TestBlockSync(t *testing.T) { - //t.SkipNow() + t.SkipNow() require := require.New(t) cfg := DefaultConfig cfg.Interval = 1 * time.Second From c9773d946c7b083ccbe935bb63d9493579864ae1 Mon Sep 17 00:00:00 2001 From: millken Date: Wed, 12 Jul 2023 13:36:35 +0800 Subject: [PATCH 12/23] update blocksync, add trigger task --- blocksync/blocksync.go | 22 +++----- pkg/routine/triggertask.go | 90 +++++++++++++++++++++++++++++++++ pkg/routine/triggertask_test.go | 33 ++++++++++++ 3 files changed, 130 insertions(+), 15 deletions(-) create mode 100644 pkg/routine/triggertask.go create mode 100644 pkg/routine/triggertask_test.go diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index 76cde98f83..7876583264 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -69,7 +69,7 @@ type ( unicastOutbound UniCastOutbound blockP2pPeer BlockPeer - syncTask *routine.DelayTask + syncTask *routine.TriggerTask syncStageTask *routine.RecurringTask syncStageHeight uint64 @@ -81,7 +81,6 @@ type ( targetHeight uint64 // block number of the highest block header this node has received from peers requestMaxHeight uint64 mu sync.RWMutex - trigger chan struct{} } peerBlock struct { @@ -151,10 +150,9 @@ func NewBlockSyncer( unicastOutbound: uniCastHandler, blockP2pPeer: blockP2pPeer, targetHeight: 0, - trigger: make(chan struct{}), } if bs.cfg.Interval != 0 { - bs.syncTask = routine.NewDelayTask(bs.syncWork, bs.cfg.Interval) + bs.syncTask = routine.NewTriggerTask(bs.sync, routine.WithTriggerTaskInterval(bs.cfg.RateLimitInterval)) bs.syncStageTask = routine.NewRecurringTask(bs.syncStageChecker, bs.cfg.Interval) } atomic.StoreUint64(&bs.syncBlockIncrease, 0) @@ -182,15 +180,6 @@ func (bs *blockSyncer) flushInfo() (time.Time, uint64) { return bs.lastTipUpdateTime, bs.targetHeight } -func (bs *blockSyncer) syncWork() { - go time.AfterFunc(bs.cfg.RateLimitInterval, func() { - bs.trigger <- struct{}{} - }) - for range bs.trigger { - time.Sleep(bs.cfg.RateLimitInterval) //limit the frequency of sync - bs.sync() - } -} func (bs *blockSyncer) sync() { _, targetHeight := bs.flushInfo() @@ -254,6 +243,10 @@ func (bs *blockSyncer) Start(ctx context.Context) error { if err := bs.syncTask.Start(ctx); err != nil { return err } + //we need to wait for the peer to be ready, and then start the sync task + go time.AfterFunc(bs.cfg.Interval, func() { + bs.syncTask.Trigger() + }) } if bs.syncStageTask != nil { return bs.syncStageTask.Start(ctx) @@ -267,7 +260,6 @@ func (bs *blockSyncer) Stop(ctx context.Context) error { if err := bs.TurnOff(); err != nil { return err } - close(bs.trigger) if bs.syncStageTask != nil { if err := bs.syncStageTask.Stop(ctx); err != nil { return err @@ -312,7 +304,7 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block } requestMaxHeight := atomic.LoadUint64(&bs.requestMaxHeight) if requestMaxHeight > 0 && syncedHeight >= requestMaxHeight && bs.IsReady() { - bs.trigger <- struct{}{} + bs.syncTask.Trigger() atomic.SwapUint64(&bs.requestMaxHeight, 0) } return nil diff --git a/pkg/routine/triggertask.go b/pkg/routine/triggertask.go new file mode 100644 index 0000000000..07754135f1 --- /dev/null +++ b/pkg/routine/triggertask.go @@ -0,0 +1,90 @@ +package routine + +import ( + "context" + "log" + "time" + + "github.com/iotexproject/iotex-core/pkg/lifecycle" +) + +var _ lifecycle.StartStopper = (*TriggerTask)(nil) + +// TriggerTaskOption is option to TriggerTask. +type TriggerTaskOption interface { + SetTriggerTaskOption(*TriggerTask) +} + +type triggerTaskOption struct { + setTriggerTaskOption func(*TriggerTask) +} + +func (o triggerTaskOption) SetTriggerTaskOption(t *TriggerTask) { + o.setTriggerTaskOption(t) +} + +// WithTriggerTaskInterval sets the interval of the task +func WithTriggerTaskInterval(d time.Duration) TriggerTaskOption { + return triggerTaskOption{ + setTriggerTaskOption: func(t *TriggerTask) { + t.duration = d + }, + } +} + +// TriggerTask represents a task that can be triggered +type TriggerTask struct { + lifecycle.Readiness + duration time.Duration + cb Task + ch chan struct{} +} + +// NewTriggerTask creates an instance of TriggerTask +func NewTriggerTask(cb Task, ops ...TriggerTaskOption) *TriggerTask { + tt := &TriggerTask{ + cb: cb, + duration: 0, + ch: make(chan struct{}), + } + for _, opt := range ops { + opt.SetTriggerTaskOption(tt) + } + return tt +} + +// Start starts the task +func (t *TriggerTask) Start(_ context.Context) error { + ready := make(chan struct{}) + go func() { + close(ready) + for range t.ch { + if t.duration > 0 { + time.Sleep(t.duration) + } + t.cb() + } + }() + // ensure the goroutine has been running + <-ready + return t.TurnOn() +} + +// Trigger triggers the task +func (t *TriggerTask) Trigger() { + if !t.IsReady() { + log.Println("[WARN] trigger task is not ready") + return + } + t.ch <- struct{}{} +} + +// Stop stops the task +func (t *TriggerTask) Stop(_ context.Context) error { + // prevent stop is called before start. + if err := t.TurnOff(); err != nil { + return err + } + close(t.ch) + return nil +} diff --git a/pkg/routine/triggertask_test.go b/pkg/routine/triggertask_test.go new file mode 100644 index 0000000000..c9534b5776 --- /dev/null +++ b/pkg/routine/triggertask_test.go @@ -0,0 +1,33 @@ +package routine_test + +import ( + "context" + "testing" + "time" + + "github.com/iotexproject/iotex-core/pkg/routine" + "github.com/stretchr/testify/require" +) + +func TestTriggerTask(t *testing.T) { + require := require.New(t) + h := &MockHandler{Count: 0} + ctx := context.Background() + task := routine.NewTriggerTask(h.Do, routine.WithTriggerTaskInterval(200*time.Millisecond)) + require.NoError(task.Start(ctx)) + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + for { + select { + case <-ctx.Done(): + goto done + default: + task.Trigger() + } + } +done: + require.Equal(uint(5), h.Count) + require.NoError(task.Stop(ctx)) + task.Trigger() + require.Equal(uint(5), h.Count) +} From 8231b5bf94f2d08b8907a5437ec50e540f01f4a2 Mon Sep 17 00:00:00 2001 From: millken Date: Wed, 12 Jul 2023 13:39:32 +0800 Subject: [PATCH 13/23] update blocksync, add trigger task --- blocksync/blocksync.go | 9 +-------- blocksync/blocksync_test.go | 2 +- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index 7876583264..cf9ffed16b 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -59,7 +59,6 @@ type ( // blockSyncer implements BlockSync interface blockSyncer struct { cfg Config - lifecycle.Readiness buf *blockBuffer tipHeightHandler TipHeight @@ -236,9 +235,6 @@ func (bs *blockSyncer) TargetHeight() uint64 { // Start starts a block syncer func (bs *blockSyncer) Start(ctx context.Context) error { log.L().Debug("Starting block syncer.") - if err := bs.TurnOn(); err != nil { - return err - } if bs.syncTask != nil { if err := bs.syncTask.Start(ctx); err != nil { return err @@ -257,9 +253,6 @@ func (bs *blockSyncer) Start(ctx context.Context) error { // Stop stops a block syncer func (bs *blockSyncer) Stop(ctx context.Context) error { log.L().Debug("Stopping block syncer.") - if err := bs.TurnOff(); err != nil { - return err - } if bs.syncStageTask != nil { if err := bs.syncStageTask.Stop(ctx); err != nil { return err @@ -303,7 +296,7 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block bs.lastTipUpdateTime = time.Now() } requestMaxHeight := atomic.LoadUint64(&bs.requestMaxHeight) - if requestMaxHeight > 0 && syncedHeight >= requestMaxHeight && bs.IsReady() { + if requestMaxHeight > 0 && syncedHeight >= requestMaxHeight { bs.syncTask.Trigger() atomic.SwapUint64(&bs.requestMaxHeight, 0) } diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index 37edc83fdb..9a016d2b05 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -604,7 +604,7 @@ func TestBlockSyncerBugIssue3889(t *testing.T) { } func TestBlockSync(t *testing.T) { - t.SkipNow() + //t.SkipNow() require := require.New(t) cfg := DefaultConfig cfg.Interval = 1 * time.Second From ea492f5550907c86d4b0651eca2488862481467b Mon Sep 17 00:00:00 2001 From: millken Date: Thu, 13 Jul 2023 09:37:11 +0800 Subject: [PATCH 14/23] fix comments --- blocksync/blocksync.go | 25 +++++++++++++------------ pkg/routine/triggertask.go | 26 +++++++++++++------------- pkg/routine/triggertask_test.go | 2 +- 3 files changed, 27 insertions(+), 26 deletions(-) diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index cf9ffed16b..d90b04e7fc 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -76,9 +76,8 @@ type ( startingHeight uint64 // block number this node started to synchronise from lastTip uint64 - lastTipUpdateTime time.Time targetHeight uint64 // block number of the highest block header this node has received from peers - requestMaxHeight uint64 + lastRequestHeight uint64 mu sync.RWMutex } @@ -140,7 +139,6 @@ func NewBlockSyncer( ) (BlockSync, error) { bs := &blockSyncer{ cfg: cfg, - lastTipUpdateTime: time.Now(), buf: newBlockBuffer(cfg.BufferSize, cfg.IntervalSize), tipHeightHandler: tipHeightHandler, blockByHeightHandler: blockByHeightHandler, @@ -151,7 +149,7 @@ func NewBlockSyncer( targetHeight: 0, } if bs.cfg.Interval != 0 { - bs.syncTask = routine.NewTriggerTask(bs.sync, routine.WithTriggerTaskInterval(bs.cfg.RateLimitInterval)) + bs.syncTask = routine.NewTriggerTask(bs.sync, routine.DelayTimeBeforeTrigger(bs.cfg.RateLimitInterval)) bs.syncStageTask = routine.NewRecurringTask(bs.syncStageChecker, bs.cfg.Interval) } atomic.StoreUint64(&bs.syncBlockIncrease, 0) @@ -173,15 +171,15 @@ func (bs *blockSyncer) commitBlocks(blks []*peerBlock) bool { return false } -func (bs *blockSyncer) flushInfo() (time.Time, uint64) { +func (bs *blockSyncer) flushInfo() uint64 { bs.mu.RLock() defer bs.mu.RUnlock() - return bs.lastTipUpdateTime, bs.targetHeight + return bs.targetHeight } func (bs *blockSyncer) sync() { - _, targetHeight := bs.flushInfo() + targetHeight := bs.flushInfo() intervals := bs.buf.GetBlocksIntervalsToSync(bs.tipHeightHandler(), targetHeight) // no sync if len(intervals) == 0 { @@ -192,7 +190,7 @@ func (bs *blockSyncer) sync() { log.L().Info("block sync intervals.", zap.Any("intervals", intervals), zap.Uint64("targetHeight", targetHeight)) - atomic.StoreUint64(&bs.requestMaxHeight, intervals[len(intervals)-1].End) + atomic.StoreUint64(&bs.lastRequestHeight, intervals[len(intervals)-1].End) for i, interval := range intervals { bs.requestBlock(context.Background(), interval.Start, interval.End, bs.cfg.MaxRepeat-i/bs.cfg.RepeatDecayStep) } @@ -293,14 +291,17 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block log.L().Debug("flush blocks", zap.Uint64("start", tip), zap.Uint64("end", syncedHeight)) if syncedHeight > bs.lastTip { bs.lastTip = syncedHeight - bs.lastTipUpdateTime = time.Now() } - requestMaxHeight := atomic.LoadUint64(&bs.requestMaxHeight) + bs.checkSync(syncedHeight) + return nil +} + +func (bs *blockSyncer) checkSync(syncedHeight uint64) { + requestMaxHeight := atomic.LoadUint64(&bs.lastRequestHeight) if requestMaxHeight > 0 && syncedHeight >= requestMaxHeight { bs.syncTask.Trigger() - atomic.SwapUint64(&bs.requestMaxHeight, 0) + atomic.SwapUint64(&bs.lastRequestHeight, 0) } - return nil } // ProcessSyncRequest processes a sync request diff --git a/pkg/routine/triggertask.go b/pkg/routine/triggertask.go index 07754135f1..d4c673e8d1 100644 --- a/pkg/routine/triggertask.go +++ b/pkg/routine/triggertask.go @@ -2,10 +2,10 @@ package routine import ( "context" - "log" "time" "github.com/iotexproject/iotex-core/pkg/lifecycle" + "github.com/iotexproject/iotex-core/pkg/log" ) var _ lifecycle.StartStopper = (*TriggerTask)(nil) @@ -23,11 +23,11 @@ func (o triggerTaskOption) SetTriggerTaskOption(t *TriggerTask) { o.setTriggerTaskOption(t) } -// WithTriggerTaskInterval sets the interval of the task -func WithTriggerTaskInterval(d time.Duration) TriggerTaskOption { +// DelayTimeBeforeTrigger sets the delay time before trigger +func DelayTimeBeforeTrigger(d time.Duration) TriggerTaskOption { return triggerTaskOption{ setTriggerTaskOption: func(t *TriggerTask) { - t.duration = d + t.delay = d }, } } @@ -35,17 +35,17 @@ func WithTriggerTaskInterval(d time.Duration) TriggerTaskOption { // TriggerTask represents a task that can be triggered type TriggerTask struct { lifecycle.Readiness - duration time.Duration - cb Task - ch chan struct{} + delay time.Duration + cb Task + ch chan struct{} } // NewTriggerTask creates an instance of TriggerTask func NewTriggerTask(cb Task, ops ...TriggerTaskOption) *TriggerTask { tt := &TriggerTask{ - cb: cb, - duration: 0, - ch: make(chan struct{}), + cb: cb, + delay: 0, + ch: make(chan struct{}), } for _, opt := range ops { opt.SetTriggerTaskOption(tt) @@ -59,8 +59,8 @@ func (t *TriggerTask) Start(_ context.Context) error { go func() { close(ready) for range t.ch { - if t.duration > 0 { - time.Sleep(t.duration) + if t.delay > 0 { + time.Sleep(t.delay) } t.cb() } @@ -73,7 +73,7 @@ func (t *TriggerTask) Start(_ context.Context) error { // Trigger triggers the task func (t *TriggerTask) Trigger() { if !t.IsReady() { - log.Println("[WARN] trigger task is not ready") + log.S().Warnf("trigger task is not ready") return } t.ch <- struct{}{} diff --git a/pkg/routine/triggertask_test.go b/pkg/routine/triggertask_test.go index c9534b5776..be566e872b 100644 --- a/pkg/routine/triggertask_test.go +++ b/pkg/routine/triggertask_test.go @@ -13,7 +13,7 @@ func TestTriggerTask(t *testing.T) { require := require.New(t) h := &MockHandler{Count: 0} ctx := context.Background() - task := routine.NewTriggerTask(h.Do, routine.WithTriggerTaskInterval(200*time.Millisecond)) + task := routine.NewTriggerTask(h.Do, routine.DelayTimeBeforeTrigger(200*time.Millisecond)) require.NoError(task.Start(ctx)) ctx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() From 496ce3832d371ca842a4b450cd3effe80293baab Mon Sep 17 00:00:00 2001 From: millken Date: Thu, 13 Jul 2023 12:56:13 +0800 Subject: [PATCH 15/23] fix comments --- blocksync/blocksync.go | 8 +++++++- blocksync/blocksync_test.go | 9 +++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index d90b04e7fc..d5104e5a70 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -213,7 +213,7 @@ func (bs *blockSyncer) requestBlock(ctx context.Context, start uint64, end uint6 repeat = len(peers) } for i := 0; i < repeat; i++ { - peer := peers[fastrand.Uint32n(uint32(len(peers)))] //may get the same node + peer := peers[fastrand.Uint32n(uint32(len(peers)))] if err := bs.unicastOutbound( ctx, peer, @@ -334,6 +334,12 @@ func (bs *blockSyncer) ProcessSyncRequest(ctx context.Context, peer peer.AddrInf func (bs *blockSyncer) syncStageChecker() { tipHeight := bs.tipHeightHandler() + log.S().Errorf("sync stage checker, tip height %d, sync stage height %d", tipHeight, bs.syncStageHeight) + // if tipHeight is equal to targetHeight, it means tried to sync to the tip, but failed. + // we need to trigger a sync task + if tipHeight == bs.syncStageHeight { + bs.syncTask.Trigger() + } atomic.StoreUint64(&bs.syncBlockIncrease, tipHeight-bs.syncStageHeight) bs.syncStageHeight = tipHeight } diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index 9a016d2b05..cbcd75abf3 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -603,14 +603,19 @@ func TestBlockSyncerBugIssue3889(t *testing.T) { wait.Wait() } +/* +RateLimitInterval = 1 sec, syncd 9600 blocks in 60 sec, 160 blocks/sec +RateLimitInterval = 2 sec, syncd 4800 blocks in 60 sec, 80 blocks/sec +*/ func TestBlockSync(t *testing.T) { //t.SkipNow() require := require.New(t) cfg := DefaultConfig - cfg.Interval = 1 * time.Second + cfg.Interval = 10 * time.Second cfg.BufferSize = 200 cfg.MaxRepeat = 3 cfg.RepeatDecayStep = 3 + cfg.RateLimitInterval = 2 * time.Second var tipHeight uint64 chanProcessBlk := make(chan *block.Block) @@ -677,5 +682,5 @@ func TestBlockSync(t *testing.T) { require.NoError(bs.Stop(ctx)) }() - time.Sleep(time.Second * 10) + time.Sleep(time.Second * 60) } From 79455f4d4062cca4895bc0d7cd5119fc5a62a2d3 Mon Sep 17 00:00:00 2001 From: millken Date: Fri, 14 Jul 2023 15:22:04 +0800 Subject: [PATCH 16/23] fix datarace --- blocksync/blocksync.go | 39 +++++++++++++------------------------ blocksync/blocksync_test.go | 6 +++--- pkg/routine/triggertask.go | 11 ++++++++++- 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index d5104e5a70..b8d1c36add 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -75,7 +75,6 @@ type ( syncBlockIncrease uint64 startingHeight uint64 // block number this node started to synchronise from - lastTip uint64 targetHeight uint64 // block number of the highest block header this node has received from peers lastRequestHeight uint64 mu sync.RWMutex @@ -171,22 +170,15 @@ func (bs *blockSyncer) commitBlocks(blks []*peerBlock) bool { return false } -func (bs *blockSyncer) flushInfo() uint64 { - bs.mu.RLock() - defer bs.mu.RUnlock() - - return bs.targetHeight -} - func (bs *blockSyncer) sync() { - targetHeight := bs.flushInfo() + targetHeight := atomic.LoadUint64(&bs.targetHeight) intervals := bs.buf.GetBlocksIntervalsToSync(bs.tipHeightHandler(), targetHeight) // no sync if len(intervals) == 0 { return } // start syncing - bs.startingHeight = bs.tipHeightHandler() + atomic.StoreUint64(&bs.startingHeight, bs.tipHeightHandler()) log.L().Info("block sync intervals.", zap.Any("intervals", intervals), zap.Uint64("targetHeight", targetHeight)) @@ -225,9 +217,7 @@ func (bs *blockSyncer) requestBlock(ctx context.Context, start uint64, end uint6 } func (bs *blockSyncer) TargetHeight() uint64 { - bs.mu.RLock() - defer bs.mu.RUnlock() - return bs.targetHeight + return atomic.LoadUint64(&bs.targetHeight) } // Start starts a block syncer @@ -269,13 +259,14 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block if blk == nil { return errors.New("block is nil") } - + //fmt.Println("block syncer process block", blk.Height()) tip := bs.tipHeightHandler() added, targetHeight := bs.buf.AddBlock(tip, newPeerBlock(peer, blk)) bs.mu.Lock() defer bs.mu.Unlock() - if targetHeight > bs.targetHeight { - bs.targetHeight = targetHeight + loadTargetHeight := atomic.LoadUint64(&bs.targetHeight) + if targetHeight > loadTargetHeight { + atomic.StoreUint64(&bs.targetHeight, targetHeight) } if !added { return nil @@ -289,19 +280,16 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block } bs.buf.Cleanup(syncedHeight) log.L().Debug("flush blocks", zap.Uint64("start", tip), zap.Uint64("end", syncedHeight)) - if syncedHeight > bs.lastTip { - bs.lastTip = syncedHeight - } - bs.checkSync(syncedHeight) - return nil -} - -func (bs *blockSyncer) checkSync(syncedHeight uint64) { requestMaxHeight := atomic.LoadUint64(&bs.lastRequestHeight) if requestMaxHeight > 0 && syncedHeight >= requestMaxHeight { bs.syncTask.Trigger() atomic.SwapUint64(&bs.lastRequestHeight, 0) } + return nil +} + +func (bs *blockSyncer) checkSync(syncedHeight uint64) { + } // ProcessSyncRequest processes a sync request @@ -350,6 +338,7 @@ func (bs *blockSyncer) SyncStatus() (uint64, uint64, uint64, string) { defer bs.mu.RUnlock() var syncSpeedDesc string syncBlockIncrease := atomic.LoadUint64(&bs.syncBlockIncrease) + targetHeight := atomic.LoadUint64(&bs.targetHeight) switch { case syncBlockIncrease == 1: syncSpeedDesc = "synced to blockchain tip" @@ -358,7 +347,7 @@ func (bs *blockSyncer) SyncStatus() (uint64, uint64, uint64, string) { default: syncSpeedDesc = fmt.Sprintf("sync in progress at %.1f blocks/sec", float64(syncBlockIncrease)/bs.cfg.Interval.Seconds()) } - return bs.startingHeight, bs.tipHeightHandler(), bs.targetHeight, syncSpeedDesc + return atomic.LoadUint64(&bs.startingHeight), bs.tipHeightHandler(), targetHeight, syncSpeedDesc } // BuildReport builds a report of block syncer diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index cbcd75abf3..e67c85917d 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -611,11 +611,11 @@ func TestBlockSync(t *testing.T) { //t.SkipNow() require := require.New(t) cfg := DefaultConfig - cfg.Interval = 10 * time.Second + cfg.Interval = 2 * time.Second cfg.BufferSize = 200 cfg.MaxRepeat = 3 cfg.RepeatDecayStep = 3 - cfg.RateLimitInterval = 2 * time.Second + cfg.RateLimitInterval = 1 * time.Second var tipHeight uint64 chanProcessBlk := make(chan *block.Block) @@ -682,5 +682,5 @@ func TestBlockSync(t *testing.T) { require.NoError(bs.Stop(ctx)) }() - time.Sleep(time.Second * 60) + time.Sleep(time.Second * 20) } diff --git a/pkg/routine/triggertask.go b/pkg/routine/triggertask.go index d4c673e8d1..c0652c65aa 100644 --- a/pkg/routine/triggertask.go +++ b/pkg/routine/triggertask.go @@ -2,6 +2,7 @@ package routine import ( "context" + "sync" "time" "github.com/iotexproject/iotex-core/pkg/lifecycle" @@ -38,6 +39,7 @@ type TriggerTask struct { delay time.Duration cb Task ch chan struct{} + mu sync.Mutex } // NewTriggerTask creates an instance of TriggerTask @@ -76,7 +78,12 @@ func (t *TriggerTask) Trigger() { log.S().Warnf("trigger task is not ready") return } - t.ch <- struct{}{} + t.mu.Lock() + defer t.mu.Unlock() + select { + case t.ch <- struct{}{}: + default: + } } // Stop stops the task @@ -85,6 +92,8 @@ func (t *TriggerTask) Stop(_ context.Context) error { if err := t.TurnOff(); err != nil { return err } + t.mu.Lock() + defer t.mu.Unlock() close(t.ch) return nil } From 6b8176add48951cd0786e18e43cfc4bae0f14768 Mon Sep 17 00:00:00 2001 From: millken Date: Fri, 14 Jul 2023 15:39:29 +0800 Subject: [PATCH 17/23] clean code --- blocksync/blocksync.go | 11 +----- blocksync/blocksync_test.go | 67 +-------------------------------- pkg/routine/triggertask_test.go | 2 +- 3 files changed, 4 insertions(+), 76 deletions(-) diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index b8d1c36add..296448bc7a 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -259,7 +259,6 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block if blk == nil { return errors.New("block is nil") } - //fmt.Println("block syncer process block", blk.Height()) tip := bs.tipHeightHandler() added, targetHeight := bs.buf.AddBlock(tip, newPeerBlock(peer, blk)) bs.mu.Lock() @@ -288,10 +287,6 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block return nil } -func (bs *blockSyncer) checkSync(syncedHeight uint64) { - -} - // ProcessSyncRequest processes a sync request func (bs *blockSyncer) ProcessSyncRequest(ctx context.Context, peer peer.AddrInfo, start uint64, end uint64) error { tip := bs.tipHeightHandler() @@ -322,7 +317,6 @@ func (bs *blockSyncer) ProcessSyncRequest(ctx context.Context, peer peer.AddrInf func (bs *blockSyncer) syncStageChecker() { tipHeight := bs.tipHeightHandler() - log.S().Errorf("sync stage checker, tip height %d, sync stage height %d", tipHeight, bs.syncStageHeight) // if tipHeight is equal to targetHeight, it means tried to sync to the tip, but failed. // we need to trigger a sync task if tipHeight == bs.syncStageHeight { @@ -334,11 +328,10 @@ func (bs *blockSyncer) syncStageChecker() { // SyncStatus returns the status of block syncer func (bs *blockSyncer) SyncStatus() (uint64, uint64, uint64, string) { - bs.mu.RLock() - defer bs.mu.RUnlock() var syncSpeedDesc string syncBlockIncrease := atomic.LoadUint64(&bs.syncBlockIncrease) targetHeight := atomic.LoadUint64(&bs.targetHeight) + startingHeight := atomic.LoadUint64(&bs.startingHeight) switch { case syncBlockIncrease == 1: syncSpeedDesc = "synced to blockchain tip" @@ -347,7 +340,7 @@ func (bs *blockSyncer) SyncStatus() (uint64, uint64, uint64, string) { default: syncSpeedDesc = fmt.Sprintf("sync in progress at %.1f blocks/sec", float64(syncBlockIncrease)/bs.cfg.Interval.Seconds()) } - return atomic.LoadUint64(&bs.startingHeight), bs.tipHeightHandler(), targetHeight, syncSpeedDesc + return startingHeight, bs.tipHeightHandler(), targetHeight, syncSpeedDesc } // BuildReport builds a report of block syncer diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index e67c85917d..d96d3faeb9 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -7,8 +7,6 @@ package blocksync import ( "context" - "fmt" - "sync" "sync/atomic" "testing" "time" @@ -540,69 +538,6 @@ func TestDummyBlockSync(t *testing.T) { require.Empty(desc) } -type test1 struct { - syncBlockIncrease uint64 - startingHeight uint64 - targetHeight uint64 - mu sync.RWMutex -} - -func (bs *test1) sync() { - for i := 0; i >= 0; i++ { - time.Sleep(time.Millisecond * 2) - bs.mu.Lock() - atomic.StoreUint64(&bs.syncBlockIncrease, uint64(i)) - bs.targetHeight = uint64(i) - bs.startingHeight = uint64(i) - bs.mu.Unlock() - } -} -func (bs *test1) tipHeightHandler() uint64 { - return 1 -} -func (bs *test1) SyncStatus() (uint64, uint64, uint64, string) { - // bs.mu.RLock() - // defer bs.mu.RUnlock() - var syncSpeedDesc string - syncBlockIncrease := atomic.LoadUint64(&bs.syncBlockIncrease) - switch { - case syncBlockIncrease == 1: - syncSpeedDesc = "synced to blockchain tip" - default: - syncSpeedDesc = fmt.Sprintf("sync in progress at %.1f blocks/sec", float64(syncBlockIncrease)) - } - return bs.startingHeight, bs.tipHeightHandler(), bs.targetHeight, syncSpeedDesc -} - -func (bs *test1) BuildReport() string { - startingHeight, tipHeight, targetHeight, syncSpeedDesc := bs.SyncStatus() - return fmt.Sprintf( - "BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s", - startingHeight, - tipHeight, - targetHeight, - syncSpeedDesc, - ) -} - -func TestBlockSyncerBugIssue3889(t *testing.T) { - bs := &test1{} - go bs.sync() - time.Sleep(time.Millisecond * 10) - wait := sync.WaitGroup{} - wait.Add(5) - for i := 0; i < 5; i++ { - go func(i int) { - defer wait.Done() - startingHeight, tipHeight, targetHeight, syncSpeedDesc := bs.SyncStatus() - t.Logf("BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s", startingHeight, tipHeight, targetHeight, syncSpeedDesc) - report := bs.BuildReport() - t.Log(report) - }(i) - } - wait.Wait() -} - /* RateLimitInterval = 1 sec, syncd 9600 blocks in 60 sec, 160 blocks/sec RateLimitInterval = 2 sec, syncd 4800 blocks in 60 sec, 80 blocks/sec @@ -682,5 +617,5 @@ func TestBlockSync(t *testing.T) { require.NoError(bs.Stop(ctx)) }() - time.Sleep(time.Second * 20) + time.Sleep(time.Second * 10) } diff --git a/pkg/routine/triggertask_test.go b/pkg/routine/triggertask_test.go index be566e872b..526308f1af 100644 --- a/pkg/routine/triggertask_test.go +++ b/pkg/routine/triggertask_test.go @@ -13,7 +13,7 @@ func TestTriggerTask(t *testing.T) { require := require.New(t) h := &MockHandler{Count: 0} ctx := context.Background() - task := routine.NewTriggerTask(h.Do, routine.DelayTimeBeforeTrigger(200*time.Millisecond)) + task := routine.NewTriggerTask(h.Do, routine.DelayTimeBeforeTrigger(180*time.Millisecond)) require.NoError(task.Start(ctx)) ctx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() From 0b3e73cf295a0384fef0e39a7561f71b97d3803e Mon Sep 17 00:00:00 2001 From: millken Date: Tue, 18 Jul 2023 15:49:07 +0800 Subject: [PATCH 18/23] update sync strategy --- blocksync/blocksync.go | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index 296448bc7a..106c421b5d 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -58,6 +58,7 @@ type ( // blockSyncer implements BlockSync interface blockSyncer struct { + lifecycle.Readiness cfg Config buf *blockBuffer @@ -77,6 +78,7 @@ type ( startingHeight uint64 // block number this node started to synchronise from targetHeight uint64 // block number of the highest block header this node has received from peers lastRequestHeight uint64 + lastTipHeight uint64 //store the last committed block height mu sync.RWMutex } @@ -259,14 +261,28 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block if blk == nil { return errors.New("block is nil") } + log.L().Error("Processing block.", zap.Uint64("height", blk.Height()), zap.String("peer", peer)) tip := bs.tipHeightHandler() added, targetHeight := bs.buf.AddBlock(tip, newPeerBlock(peer, blk)) - bs.mu.Lock() - defer bs.mu.Unlock() loadTargetHeight := atomic.LoadUint64(&bs.targetHeight) if targetHeight > loadTargetHeight { atomic.StoreUint64(&bs.targetHeight, targetHeight) } + // If a block in the requested block is lost, try to resend request block in the next interval + if !bs.IsReady() { + if err := bs.TurnOn(); err == nil { + defer func() { + time.AfterFunc(bs.cfg.Interval, func() { + lastTip := atomic.LoadUint64(&bs.lastTipHeight) + if lastTip == bs.tipHeightHandler() && bs.TurnOff() == nil { + bs.syncTask.Trigger() + } + }) + }() + } + } + bs.mu.Lock() + defer bs.mu.Unlock() if !added { return nil } @@ -277,6 +293,7 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block } syncedHeight++ } + atomic.StoreUint64(&bs.lastTipHeight, syncedHeight) bs.buf.Cleanup(syncedHeight) log.L().Debug("flush blocks", zap.Uint64("start", tip), zap.Uint64("end", syncedHeight)) requestMaxHeight := atomic.LoadUint64(&bs.lastRequestHeight) @@ -317,11 +334,6 @@ func (bs *blockSyncer) ProcessSyncRequest(ctx context.Context, peer peer.AddrInf func (bs *blockSyncer) syncStageChecker() { tipHeight := bs.tipHeightHandler() - // if tipHeight is equal to targetHeight, it means tried to sync to the tip, but failed. - // we need to trigger a sync task - if tipHeight == bs.syncStageHeight { - bs.syncTask.Trigger() - } atomic.StoreUint64(&bs.syncBlockIncrease, tipHeight-bs.syncStageHeight) bs.syncStageHeight = tipHeight } From 5852cce587ff4cb4670cb758df8884bc9093b9a3 Mon Sep 17 00:00:00 2001 From: millken Date: Tue, 18 Jul 2023 15:53:08 +0800 Subject: [PATCH 19/23] remove error log --- blocksync/blocksync.go | 1 - 1 file changed, 1 deletion(-) diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index 106c421b5d..91f314f6d6 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -261,7 +261,6 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block if blk == nil { return errors.New("block is nil") } - log.L().Error("Processing block.", zap.Uint64("height", blk.Height()), zap.String("peer", peer)) tip := bs.tipHeightHandler() added, targetHeight := bs.buf.AddBlock(tip, newPeerBlock(peer, blk)) loadTargetHeight := atomic.LoadUint64(&bs.targetHeight) From 1dddb9b35ec2817ed6f99b29b9d6ae74e94fbf8e Mon Sep 17 00:00:00 2001 From: millken Date: Wed, 19 Jul 2023 14:23:53 +0800 Subject: [PATCH 20/23] update sync strategy --- blocksync/blocksync.go | 41 ++++++++++++------------ blocksync/config.go | 2 ++ pkg/routine/triggertask.go | 22 ++++++++++--- pkg/routine/triggertask_test.go | 56 ++++++++++++++++++++++++++++++--- 4 files changed, 91 insertions(+), 30 deletions(-) diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index 91f314f6d6..ac217ded9c 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -58,7 +58,6 @@ type ( // blockSyncer implements BlockSync interface blockSyncer struct { - lifecycle.Readiness cfg Config buf *blockBuffer @@ -73,12 +72,13 @@ type ( syncStageTask *routine.RecurringTask syncStageHeight uint64 + syncRetryHeight uint64 + syncReady int32 syncBlockIncrease uint64 startingHeight uint64 // block number this node started to synchronise from targetHeight uint64 // block number of the highest block header this node has received from peers lastRequestHeight uint64 - lastTipHeight uint64 //store the last committed block height mu sync.RWMutex } @@ -150,7 +150,10 @@ func NewBlockSyncer( targetHeight: 0, } if bs.cfg.Interval != 0 { - bs.syncTask = routine.NewTriggerTask(bs.sync, routine.DelayTimeBeforeTrigger(bs.cfg.RateLimitInterval)) + bs.syncTask = routine.NewTriggerTask(bs.sync, + routine.DelayTimeBeforeTrigger(bs.cfg.RateLimitInterval), + routine.TriggerBufferSize(bs.cfg.TriggerBufferSize), + ) bs.syncStageTask = routine.NewRecurringTask(bs.syncStageChecker, bs.cfg.Interval) } atomic.StoreUint64(&bs.syncBlockIncrease, 0) @@ -229,10 +232,6 @@ func (bs *blockSyncer) Start(ctx context.Context) error { if err := bs.syncTask.Start(ctx); err != nil { return err } - //we need to wait for the peer to be ready, and then start the sync task - go time.AfterFunc(bs.cfg.Interval, func() { - bs.syncTask.Trigger() - }) } if bs.syncStageTask != nil { return bs.syncStageTask.Start(ctx) @@ -268,17 +267,9 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block atomic.StoreUint64(&bs.targetHeight, targetHeight) } // If a block in the requested block is lost, try to resend request block in the next interval - if !bs.IsReady() { - if err := bs.TurnOn(); err == nil { - defer func() { - time.AfterFunc(bs.cfg.Interval, func() { - lastTip := atomic.LoadUint64(&bs.lastTipHeight) - if lastTip == bs.tipHeightHandler() && bs.TurnOff() == nil { - bs.syncTask.Trigger() - } - }) - }() - } + if atomic.LoadInt32(&bs.syncReady) == 0 { + atomic.StoreInt32(&bs.syncReady, 1) + time.AfterFunc(bs.cfg.Interval, bs.syncRetryChecker) } bs.mu.Lock() defer bs.mu.Unlock() @@ -292,11 +283,10 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block } syncedHeight++ } - atomic.StoreUint64(&bs.lastTipHeight, syncedHeight) bs.buf.Cleanup(syncedHeight) log.L().Debug("flush blocks", zap.Uint64("start", tip), zap.Uint64("end", syncedHeight)) - requestMaxHeight := atomic.LoadUint64(&bs.lastRequestHeight) - if requestMaxHeight > 0 && syncedHeight >= requestMaxHeight { + lastRequestHeight := atomic.LoadUint64(&bs.lastRequestHeight) + if lastRequestHeight > 0 && syncedHeight >= lastRequestHeight { bs.syncTask.Trigger() atomic.SwapUint64(&bs.lastRequestHeight, 0) } @@ -331,6 +321,15 @@ func (bs *blockSyncer) ProcessSyncRequest(ctx context.Context, peer peer.AddrInf return nil } +func (bs *blockSyncer) syncRetryChecker() { + tipHeight := bs.tipHeightHandler() + if bs.syncRetryHeight == 0 || bs.syncRetryHeight == tipHeight { + bs.syncTask.Trigger() + } + bs.syncRetryHeight = tipHeight + atomic.StoreInt32(&bs.syncReady, 0) +} + func (bs *blockSyncer) syncStageChecker() { tipHeight := bs.tipHeightHandler() atomic.StoreUint64(&bs.syncBlockIncrease, tipHeight-bs.syncStageHeight) diff --git a/blocksync/config.go b/blocksync/config.go index 6914327b22..42a6a5b573 100644 --- a/blocksync/config.go +++ b/blocksync/config.go @@ -13,6 +13,7 @@ type Config struct { RateLimitInterval time.Duration `yaml:"rateLimitInterval"` ProcessSyncRequestTTL time.Duration `yaml:"processSyncRequestTTL"` BufferSize uint64 `yaml:"bufferSize"` + TriggerBufferSize int `yaml:"triggerBufferSize"` IntervalSize uint64 `yaml:"intervalSize"` // MaxRepeat is the maximal number of repeat of a block sync request MaxRepeat int `yaml:"maxRepeat"` @@ -26,6 +27,7 @@ var DefaultConfig = Config{ RateLimitInterval: 1 * time.Second, ProcessSyncRequestTTL: 10 * time.Second, BufferSize: 200, + TriggerBufferSize: 2, IntervalSize: 20, MaxRepeat: 3, RepeatDecayStep: 1, diff --git a/pkg/routine/triggertask.go b/pkg/routine/triggertask.go index c0652c65aa..c95237fe31 100644 --- a/pkg/routine/triggertask.go +++ b/pkg/routine/triggertask.go @@ -33,11 +33,21 @@ func DelayTimeBeforeTrigger(d time.Duration) TriggerTaskOption { } } +// TriggerBufferSize sets the buffer size of trigger channel +func TriggerBufferSize(sz int) TriggerTaskOption { + return triggerTaskOption{ + setTriggerTaskOption: func(t *TriggerTask) { + t.sz = sz + }, + } +} + // TriggerTask represents a task that can be triggered type TriggerTask struct { lifecycle.Readiness delay time.Duration cb Task + sz int ch chan struct{} mu sync.Mutex } @@ -47,11 +57,12 @@ func NewTriggerTask(cb Task, ops ...TriggerTaskOption) *TriggerTask { tt := &TriggerTask{ cb: cb, delay: 0, - ch: make(chan struct{}), + sz: 0, } for _, opt := range ops { opt.SetTriggerTaskOption(tt) } + tt.ch = make(chan struct{}, tt.sz) return tt } @@ -72,18 +83,21 @@ func (t *TriggerTask) Start(_ context.Context) error { return t.TurnOn() } -// Trigger triggers the task -func (t *TriggerTask) Trigger() { +// Trigger triggers the task, return true if the task is triggered successfully +// this function is non-blocking +func (t *TriggerTask) Trigger() bool { if !t.IsReady() { log.S().Warnf("trigger task is not ready") - return + return false } t.mu.Lock() defer t.mu.Unlock() select { case t.ch <- struct{}{}: + return true default: } + return false } // Stop stops the task diff --git a/pkg/routine/triggertask_test.go b/pkg/routine/triggertask_test.go index 526308f1af..871b66c3ee 100644 --- a/pkg/routine/triggertask_test.go +++ b/pkg/routine/triggertask_test.go @@ -2,6 +2,8 @@ package routine_test import ( "context" + "sync" + "sync/atomic" "testing" "time" @@ -17,17 +19,61 @@ func TestTriggerTask(t *testing.T) { require.NoError(task.Start(ctx)) ctx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() + var succ uint32 +done: for { select { case <-ctx.Done(): - goto done + break done default: - task.Trigger() + if task.Trigger() { + succ++ + } } } -done: - require.Equal(uint(5), h.Count) + time.Sleep(200 * time.Millisecond) + require.Equal(uint32(6), succ) + require.Equal(uint(6), h.Count) + require.NoError(task.Stop(ctx)) + task.Trigger() + require.Equal(uint(6), h.Count) +} + +func TestTriggerTaskWithBufferSize(t *testing.T) { + require := require.New(t) + h := &MockHandler{Count: 0} + ctx := context.Background() + task := routine.NewTriggerTask(h.Do, + routine.DelayTimeBeforeTrigger(180*time.Millisecond), + routine.TriggerBufferSize(2)) + require.NoError(task.Start(ctx)) + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + wg := sync.WaitGroup{} + wg.Add(5) + var succ uint32 + for i := 0; i < 5; i++ { + go func(i int) { + defer wg.Done() + done: + for { + select { + case <-ctx.Done(): + //t.Logf("exit %d\n", i) + break done + default: + if task.Trigger() { + atomic.AddUint32(&succ, 1) + } + } + } + }(i) + } + wg.Wait() + time.Sleep(500 * time.Millisecond) + require.Equal(uint32(8), succ) + require.Equal(uint(8), h.Count) require.NoError(task.Stop(ctx)) task.Trigger() - require.Equal(uint(5), h.Count) + require.Equal(uint(8), h.Count) } From 579164c5c533d96dcbb4fc8cad7830b807207dea Mon Sep 17 00:00:00 2001 From: millken Date: Wed, 19 Jul 2023 14:49:26 +0800 Subject: [PATCH 21/23] fix data race in tests --- pkg/routine/recurringtask_test.go | 6 ++++++ pkg/routine/triggertask_test.go | 8 ++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/routine/recurringtask_test.go b/pkg/routine/recurringtask_test.go index 40e6eb7fc0..0ea2a9e629 100644 --- a/pkg/routine/recurringtask_test.go +++ b/pkg/routine/recurringtask_test.go @@ -28,6 +28,12 @@ func (h *MockHandler) Do() { h.mu.Unlock() } +func (h *MockHandler) GetCount() uint { + h.mu.RLock() + defer h.mu.RUnlock() + return h.Count +} + func TestRecurringTask(t *testing.T) { require := require.New(t) h := &MockHandler{Count: 0} diff --git a/pkg/routine/triggertask_test.go b/pkg/routine/triggertask_test.go index 871b66c3ee..308167343d 100644 --- a/pkg/routine/triggertask_test.go +++ b/pkg/routine/triggertask_test.go @@ -33,10 +33,10 @@ done: } time.Sleep(200 * time.Millisecond) require.Equal(uint32(6), succ) - require.Equal(uint(6), h.Count) + require.Equal(uint(6), h.GetCount()) require.NoError(task.Stop(ctx)) task.Trigger() - require.Equal(uint(6), h.Count) + require.Equal(uint(6), h.GetCount()) } func TestTriggerTaskWithBufferSize(t *testing.T) { @@ -72,8 +72,8 @@ func TestTriggerTaskWithBufferSize(t *testing.T) { wg.Wait() time.Sleep(500 * time.Millisecond) require.Equal(uint32(8), succ) - require.Equal(uint(8), h.Count) + require.Equal(uint(8), h.GetCount()) require.NoError(task.Stop(ctx)) task.Trigger() - require.Equal(uint(8), h.Count) + require.Equal(uint(8), h.GetCount()) } From 513273103b3d787ade17315744f4468e4e4b10d7 Mon Sep 17 00:00:00 2001 From: millken Date: Wed, 19 Jul 2023 15:53:47 +0800 Subject: [PATCH 22/23] fix tests --- blocksync/blocksync.go | 4 ++++ dispatcher/dispatcher.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index ac217ded9c..26e0cc0223 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -232,6 +232,10 @@ func (bs *blockSyncer) Start(ctx context.Context) error { if err := bs.syncTask.Start(ctx); err != nil { return err } + // we keep this for testing, in production, no need it, other peers will boradcast latest blocks + go time.AfterFunc(bs.cfg.Interval, func() { + bs.syncTask.Trigger() + }) } if bs.syncStageTask != nil { return bs.syncStageTask.Start(ctx) diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index 19215e16f4..3d826a46c7 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -220,7 +220,7 @@ func (d *IotxDispatcher) actionHandler() { case a := <-d.actionChan: d.handleActionMsg(a) case <-d.quit: - log.L().Info("action handler is terminated.") + //log.L().Info("action handler is terminated.") return } } From bb15f7ca31eee679dec32ae94cbd95720c68ef3a Mon Sep 17 00:00:00 2001 From: millken Date: Wed, 19 Jul 2023 15:56:16 +0800 Subject: [PATCH 23/23] fix tests --- blocksync/blocksync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index 26e0cc0223..233e38f35f 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -233,7 +233,7 @@ func (bs *blockSyncer) Start(ctx context.Context) error { return err } // we keep this for testing, in production, no need it, other peers will boradcast latest blocks - go time.AfterFunc(bs.cfg.Interval, func() { + time.AfterFunc(bs.cfg.Interval, func() { bs.syncTask.Trigger() }) }