diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index f4e3f1f845..233e38f35f 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -68,16 +68,17 @@ type ( unicastOutbound UniCastOutbound blockP2pPeer BlockPeer - syncTask *routine.RecurringTask + syncTask *routine.TriggerTask syncStageTask *routine.RecurringTask syncStageHeight uint64 + syncRetryHeight uint64 + syncReady int32 syncBlockIncrease uint64 startingHeight uint64 // block number this node started to synchronise from - lastTip uint64 - lastTipUpdateTime time.Time targetHeight uint64 // block number of the highest block header this node has received from peers + lastRequestHeight uint64 mu sync.RWMutex } @@ -139,7 +140,6 @@ func NewBlockSyncer( ) (BlockSync, error) { bs := &blockSyncer{ cfg: cfg, - lastTipUpdateTime: time.Now(), buf: newBlockBuffer(cfg.BufferSize, cfg.IntervalSize), tipHeightHandler: tipHeightHandler, blockByHeightHandler: blockByHeightHandler, @@ -150,7 +150,10 @@ func NewBlockSyncer( targetHeight: 0, } if bs.cfg.Interval != 0 { - bs.syncTask = routine.NewRecurringTask(bs.sync, bs.cfg.Interval) + bs.syncTask = routine.NewTriggerTask(bs.sync, + routine.DelayTimeBeforeTrigger(bs.cfg.RateLimitInterval), + routine.TriggerBufferSize(bs.cfg.TriggerBufferSize), + ) bs.syncStageTask = routine.NewRecurringTask(bs.syncStageChecker, bs.cfg.Interval) } atomic.StoreUint64(&bs.syncBlockIncrease, 0) @@ -172,28 +175,19 @@ func (bs *blockSyncer) commitBlocks(blks []*peerBlock) bool { return false } -func (bs *blockSyncer) flushInfo() (time.Time, uint64) { - bs.mu.Lock() - defer bs.mu.Unlock() - - return bs.lastTipUpdateTime, bs.targetHeight -} - func (bs *blockSyncer) sync() { - updateTime, targetHeight := bs.flushInfo() - if updateTime.Add(bs.cfg.Interval).After(time.Now()) { - return - } + targetHeight := atomic.LoadUint64(&bs.targetHeight) intervals := bs.buf.GetBlocksIntervalsToSync(bs.tipHeightHandler(), targetHeight) // no sync if len(intervals) == 0 { return } // start syncing - bs.startingHeight = bs.tipHeightHandler() + atomic.StoreUint64(&bs.startingHeight, bs.tipHeightHandler()) log.L().Info("block sync intervals.", zap.Any("intervals", intervals), zap.Uint64("targetHeight", targetHeight)) + atomic.StoreUint64(&bs.lastRequestHeight, intervals[len(intervals)-1].End) for i, interval := range intervals { bs.requestBlock(context.Background(), interval.Start, interval.End, bs.cfg.MaxRepeat-i/bs.cfg.RepeatDecayStep) } @@ -228,9 +222,7 @@ func (bs *blockSyncer) requestBlock(ctx context.Context, start uint64, end uint6 } func (bs *blockSyncer) TargetHeight() uint64 { - bs.mu.RLock() - defer bs.mu.RUnlock() - return bs.targetHeight + return atomic.LoadUint64(&bs.targetHeight) } // Start starts a block syncer @@ -240,6 +232,10 @@ func (bs *blockSyncer) Start(ctx context.Context) error { if err := bs.syncTask.Start(ctx); err != nil { return err } + // we keep this for testing, in production, no need it, other peers will boradcast latest blocks + time.AfterFunc(bs.cfg.Interval, func() { + bs.syncTask.Trigger() + }) } if bs.syncStageTask != nil { return bs.syncStageTask.Start(ctx) @@ -263,18 +259,24 @@ func (bs *blockSyncer) Stop(ctx context.Context) error { return nil } +// ProcessBlock processes a block func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block.Block) error { if blk == nil { return errors.New("block is nil") } - tip := bs.tipHeightHandler() added, targetHeight := bs.buf.AddBlock(tip, newPeerBlock(peer, blk)) + loadTargetHeight := atomic.LoadUint64(&bs.targetHeight) + if targetHeight > loadTargetHeight { + atomic.StoreUint64(&bs.targetHeight, targetHeight) + } + // If a block in the requested block is lost, try to resend request block in the next interval + if atomic.LoadInt32(&bs.syncReady) == 0 { + atomic.StoreInt32(&bs.syncReady, 1) + time.AfterFunc(bs.cfg.Interval, bs.syncRetryChecker) + } bs.mu.Lock() defer bs.mu.Unlock() - if targetHeight > bs.targetHeight { - bs.targetHeight = targetHeight - } if !added { return nil } @@ -287,13 +289,15 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block } bs.buf.Cleanup(syncedHeight) log.L().Debug("flush blocks", zap.Uint64("start", tip), zap.Uint64("end", syncedHeight)) - if syncedHeight > bs.lastTip { - bs.lastTip = syncedHeight - bs.lastTipUpdateTime = time.Now() + lastRequestHeight := atomic.LoadUint64(&bs.lastRequestHeight) + if lastRequestHeight > 0 && syncedHeight >= lastRequestHeight { + bs.syncTask.Trigger() + atomic.SwapUint64(&bs.lastRequestHeight, 0) } return nil } +// ProcessSyncRequest processes a sync request func (bs *blockSyncer) ProcessSyncRequest(ctx context.Context, peer peer.AddrInfo, start uint64, end uint64) error { tip := bs.tipHeightHandler() if end > tip { @@ -321,15 +325,27 @@ func (bs *blockSyncer) ProcessSyncRequest(ctx context.Context, peer peer.AddrInf return nil } +func (bs *blockSyncer) syncRetryChecker() { + tipHeight := bs.tipHeightHandler() + if bs.syncRetryHeight == 0 || bs.syncRetryHeight == tipHeight { + bs.syncTask.Trigger() + } + bs.syncRetryHeight = tipHeight + atomic.StoreInt32(&bs.syncReady, 0) +} + func (bs *blockSyncer) syncStageChecker() { tipHeight := bs.tipHeightHandler() atomic.StoreUint64(&bs.syncBlockIncrease, tipHeight-bs.syncStageHeight) bs.syncStageHeight = tipHeight } +// SyncStatus returns the status of block syncer func (bs *blockSyncer) SyncStatus() (uint64, uint64, uint64, string) { var syncSpeedDesc string syncBlockIncrease := atomic.LoadUint64(&bs.syncBlockIncrease) + targetHeight := atomic.LoadUint64(&bs.targetHeight) + startingHeight := atomic.LoadUint64(&bs.startingHeight) switch { case syncBlockIncrease == 1: syncSpeedDesc = "synced to blockchain tip" @@ -338,7 +354,7 @@ func (bs *blockSyncer) SyncStatus() (uint64, uint64, uint64, string) { default: syncSpeedDesc = fmt.Sprintf("sync in progress at %.1f blocks/sec", float64(syncBlockIncrease)/bs.cfg.Interval.Seconds()) } - return bs.startingHeight, bs.tipHeightHandler(), bs.targetHeight, syncSpeedDesc + return startingHeight, bs.tipHeightHandler(), targetHeight, syncSpeedDesc } // BuildReport builds a report of block syncer diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index 2a7a73e5ef..d96d3faeb9 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -7,17 +7,12 @@ package blocksync import ( "context" + "sync/atomic" "testing" "time" "github.com/golang/mock/gomock" "github.com/iotexproject/go-pkgs/hash" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" - "github.com/iotexproject/iotex-core/action/protocol" "github.com/iotexproject/iotex-core/action/protocol/account" accountutil "github.com/iotexproject/iotex-core/action/protocol/account/util" @@ -37,6 +32,13 @@ import ( "github.com/iotexproject/iotex-core/test/mock/mock_blocksync" "github.com/iotexproject/iotex-core/test/mock/mock_consensus" "github.com/iotexproject/iotex-core/testutil" + goproto "github.com/iotexproject/iotex-proto/golang" + "github.com/iotexproject/iotex-proto/golang/iotexrpc" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" ) type testConfig struct { @@ -535,3 +537,85 @@ func TestDummyBlockSync(t *testing.T) { require.Zero(targetHeight) require.Empty(desc) } + +/* +RateLimitInterval = 1 sec, syncd 9600 blocks in 60 sec, 160 blocks/sec +RateLimitInterval = 2 sec, syncd 4800 blocks in 60 sec, 80 blocks/sec +*/ +func TestBlockSync(t *testing.T) { + //t.SkipNow() + require := require.New(t) + cfg := DefaultConfig + cfg.Interval = 2 * time.Second + cfg.BufferSize = 200 + cfg.MaxRepeat = 3 + cfg.RepeatDecayStep = 3 + cfg.RateLimitInterval = 1 * time.Second + var tipHeight uint64 + chanProcessBlk := make(chan *block.Block) + + bs, err := NewBlockSyncer(cfg, func() uint64 { + return atomic.LoadUint64(&tipHeight) + }, nil, func(b *block.Block) error { + //t.Logf("commit block %d", b.Height()) + atomic.StoreUint64(&tipHeight, b.Height()) + return nil + }, func() ([]peer.AddrInfo, error) { + return []peer.AddrInfo{{ + ID: peer.ID("test"), + }}, nil + }, func(ctx context.Context, ai peer.AddrInfo, m proto.Message) error { + msgType, err := goproto.GetTypeFromRPCMsg(m) + if err != nil { + return err + } + switch msgType { + case iotexrpc.MessageType_BLOCK_REQUEST: + msgBody, err := proto.Marshal(m) + if err != nil { + return err + } + var req iotexrpc.BlockSync + if err := proto.Unmarshal(msgBody, &req); err != nil { + return err + } + // send block + for i := req.Start; i <= req.End; i++ { + blk, _ := block.NewTestingBuilder(). + SetHeight(i).SignAndBuild(identityset.PrivateKey(0)) + chanProcessBlk <- &blk + } + } + return nil + }, func(s string) { + t.Logf("block p2p %s", s) + }) + ctx := context.Background() + require.NoError(err) + require.NotNil(bs) + go func(bs BlockSync) { + for blk := range chanProcessBlk { + bs.ProcessBlock(ctx, "test", blk) + } + }(bs) + go func(bs BlockSync) { + for { + blk, _ := block.NewTestingBuilder(). + SetHeight(20000000).SignAndBuild(identityset.PrivateKey(0)) + bs.ProcessBlock(ctx, "test", &blk) + time.Sleep(time.Second * 1) + } + }(bs) + require.NoError(bs.Start(ctx)) + go func(bs BlockSync) { + for { + time.Sleep(cfg.Interval) + t.Log(bs.BuildReport()) + } + }(bs) + defer func() { + require.NoError(bs.Stop(ctx)) + }() + + time.Sleep(time.Second * 10) +} diff --git a/blocksync/buffer_test.go b/blocksync/buffer_test.go index 9938a8eeb4..92f1cc2ae0 100644 --- a/blocksync/buffer_test.go +++ b/blocksync/buffer_test.go @@ -262,3 +262,10 @@ func TestBlockBufferGetBlocksIntervalsToSync(t *testing.T) { // There should always have at least 1 interval range to sync assert.Len(b.GetBlocksIntervalsToSync(chain.TipHeight(), 0), 1) } + +func TestBuffer(t *testing.T) { + bb := newBlockBuffer(200, 20) + bb.blockQueues[3] = newUniQueue() + iv := bb.GetBlocksIntervalsToSync(0, 2000) + t.Logf("iv: %v", iv) +} diff --git a/blocksync/config.go b/blocksync/config.go index 32c3f6a85c..42a6a5b573 100644 --- a/blocksync/config.go +++ b/blocksync/config.go @@ -10,8 +10,10 @@ import "time" // Config is the config struct for the BlockSync type Config struct { Interval time.Duration `yaml:"interval"` // update duration + RateLimitInterval time.Duration `yaml:"rateLimitInterval"` ProcessSyncRequestTTL time.Duration `yaml:"processSyncRequestTTL"` BufferSize uint64 `yaml:"bufferSize"` + TriggerBufferSize int `yaml:"triggerBufferSize"` IntervalSize uint64 `yaml:"intervalSize"` // MaxRepeat is the maximal number of repeat of a block sync request MaxRepeat int `yaml:"maxRepeat"` @@ -22,8 +24,10 @@ type Config struct { // DefaultConfig is the default config var DefaultConfig = Config{ Interval: 30 * time.Second, + RateLimitInterval: 1 * time.Second, ProcessSyncRequestTTL: 10 * time.Second, BufferSize: 200, + TriggerBufferSize: 2, IntervalSize: 20, MaxRepeat: 3, RepeatDecayStep: 1, diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index 19215e16f4..3d826a46c7 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -220,7 +220,7 @@ func (d *IotxDispatcher) actionHandler() { case a := <-d.actionChan: d.handleActionMsg(a) case <-d.quit: - log.L().Info("action handler is terminated.") + //log.L().Info("action handler is terminated.") return } } diff --git a/pkg/routine/recurringtask_test.go b/pkg/routine/recurringtask_test.go index 40e6eb7fc0..0ea2a9e629 100644 --- a/pkg/routine/recurringtask_test.go +++ b/pkg/routine/recurringtask_test.go @@ -28,6 +28,12 @@ func (h *MockHandler) Do() { h.mu.Unlock() } +func (h *MockHandler) GetCount() uint { + h.mu.RLock() + defer h.mu.RUnlock() + return h.Count +} + func TestRecurringTask(t *testing.T) { require := require.New(t) h := &MockHandler{Count: 0} diff --git a/pkg/routine/triggertask.go b/pkg/routine/triggertask.go new file mode 100644 index 0000000000..c95237fe31 --- /dev/null +++ b/pkg/routine/triggertask.go @@ -0,0 +1,113 @@ +package routine + +import ( + "context" + "sync" + "time" + + "github.com/iotexproject/iotex-core/pkg/lifecycle" + "github.com/iotexproject/iotex-core/pkg/log" +) + +var _ lifecycle.StartStopper = (*TriggerTask)(nil) + +// TriggerTaskOption is option to TriggerTask. +type TriggerTaskOption interface { + SetTriggerTaskOption(*TriggerTask) +} + +type triggerTaskOption struct { + setTriggerTaskOption func(*TriggerTask) +} + +func (o triggerTaskOption) SetTriggerTaskOption(t *TriggerTask) { + o.setTriggerTaskOption(t) +} + +// DelayTimeBeforeTrigger sets the delay time before trigger +func DelayTimeBeforeTrigger(d time.Duration) TriggerTaskOption { + return triggerTaskOption{ + setTriggerTaskOption: func(t *TriggerTask) { + t.delay = d + }, + } +} + +// TriggerBufferSize sets the buffer size of trigger channel +func TriggerBufferSize(sz int) TriggerTaskOption { + return triggerTaskOption{ + setTriggerTaskOption: func(t *TriggerTask) { + t.sz = sz + }, + } +} + +// TriggerTask represents a task that can be triggered +type TriggerTask struct { + lifecycle.Readiness + delay time.Duration + cb Task + sz int + ch chan struct{} + mu sync.Mutex +} + +// NewTriggerTask creates an instance of TriggerTask +func NewTriggerTask(cb Task, ops ...TriggerTaskOption) *TriggerTask { + tt := &TriggerTask{ + cb: cb, + delay: 0, + sz: 0, + } + for _, opt := range ops { + opt.SetTriggerTaskOption(tt) + } + tt.ch = make(chan struct{}, tt.sz) + return tt +} + +// Start starts the task +func (t *TriggerTask) Start(_ context.Context) error { + ready := make(chan struct{}) + go func() { + close(ready) + for range t.ch { + if t.delay > 0 { + time.Sleep(t.delay) + } + t.cb() + } + }() + // ensure the goroutine has been running + <-ready + return t.TurnOn() +} + +// Trigger triggers the task, return true if the task is triggered successfully +// this function is non-blocking +func (t *TriggerTask) Trigger() bool { + if !t.IsReady() { + log.S().Warnf("trigger task is not ready") + return false + } + t.mu.Lock() + defer t.mu.Unlock() + select { + case t.ch <- struct{}{}: + return true + default: + } + return false +} + +// Stop stops the task +func (t *TriggerTask) Stop(_ context.Context) error { + // prevent stop is called before start. + if err := t.TurnOff(); err != nil { + return err + } + t.mu.Lock() + defer t.mu.Unlock() + close(t.ch) + return nil +} diff --git a/pkg/routine/triggertask_test.go b/pkg/routine/triggertask_test.go new file mode 100644 index 0000000000..308167343d --- /dev/null +++ b/pkg/routine/triggertask_test.go @@ -0,0 +1,79 @@ +package routine_test + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/iotexproject/iotex-core/pkg/routine" + "github.com/stretchr/testify/require" +) + +func TestTriggerTask(t *testing.T) { + require := require.New(t) + h := &MockHandler{Count: 0} + ctx := context.Background() + task := routine.NewTriggerTask(h.Do, routine.DelayTimeBeforeTrigger(180*time.Millisecond)) + require.NoError(task.Start(ctx)) + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + var succ uint32 +done: + for { + select { + case <-ctx.Done(): + break done + default: + if task.Trigger() { + succ++ + } + } + } + time.Sleep(200 * time.Millisecond) + require.Equal(uint32(6), succ) + require.Equal(uint(6), h.GetCount()) + require.NoError(task.Stop(ctx)) + task.Trigger() + require.Equal(uint(6), h.GetCount()) +} + +func TestTriggerTaskWithBufferSize(t *testing.T) { + require := require.New(t) + h := &MockHandler{Count: 0} + ctx := context.Background() + task := routine.NewTriggerTask(h.Do, + routine.DelayTimeBeforeTrigger(180*time.Millisecond), + routine.TriggerBufferSize(2)) + require.NoError(task.Start(ctx)) + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + wg := sync.WaitGroup{} + wg.Add(5) + var succ uint32 + for i := 0; i < 5; i++ { + go func(i int) { + defer wg.Done() + done: + for { + select { + case <-ctx.Done(): + //t.Logf("exit %d\n", i) + break done + default: + if task.Trigger() { + atomic.AddUint32(&succ, 1) + } + } + } + }(i) + } + wg.Wait() + time.Sleep(500 * time.Millisecond) + require.Equal(uint32(8), succ) + require.Equal(uint(8), h.GetCount()) + require.NoError(task.Stop(ctx)) + task.Trigger() + require.Equal(uint(8), h.GetCount()) +}