Skip to content

Commit

Permalink
chore(dot/sync): blockQueue refactored
Browse files Browse the repository at this point in the history
- Context aware `pop`
- Do not block `processReadyBlocks` forever on ctx cancel
- Change mapping to hashes set (less memory usage)
- Remove unneeded `cap int` field
- More explicit variable names in block_queue.go
  • Loading branch information
qdm12 committed May 18, 2022
1 parent 62676bb commit f52f974
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 42 deletions.
53 changes: 28 additions & 25 deletions dot/sync/block_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,52 @@
package sync

import (
"context"
"sync"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
)

type blockQueue struct {
sync.RWMutex
cap int
ch chan *types.BlockData
blocks map[common.Hash]*types.BlockData
queue chan *types.BlockData
hashesSet map[common.Hash]struct{}
hashesSetMutex sync.RWMutex
}

// newBlockQueue initialises a queue of *types.BlockData with the given capacity.
func newBlockQueue(cap int) *blockQueue {
func newBlockQueue(capacity int) *blockQueue {
return &blockQueue{
cap: cap,
ch: make(chan *types.BlockData, cap),
blocks: make(map[common.Hash]*types.BlockData),
queue: make(chan *types.BlockData, capacity),
hashesSet: make(map[common.Hash]struct{}),
}
}

// push pushes an item into the queue. it blocks if the queue is at capacity.
func (q *blockQueue) push(bd *types.BlockData) {
q.Lock()
q.blocks[bd.Hash] = bd
q.Unlock()
// push pushes an item into the queue. It blocks if the queue is at capacity.
func (bq *blockQueue) push(blockData *types.BlockData) {
bq.hashesSetMutex.Lock()
bq.hashesSet[blockData.Hash] = struct{}{}
bq.hashesSetMutex.Unlock()

q.ch <- bd
bq.queue <- blockData
}

// pop pops an item from the queue. it blocks if the queue is empty.
func (q *blockQueue) pop() *types.BlockData {
bd := <-q.ch
q.Lock()
delete(q.blocks, bd.Hash)
q.Unlock()
return bd
// pop pops an item from the queue. It blocks if the queue is empty.
func (bq *blockQueue) pop(ctx context.Context) (blockData *types.BlockData) {
select {
case <-ctx.Done():
return
case blockData = <-bq.queue:
}
bq.hashesSetMutex.Lock()
delete(bq.hashesSet, blockData.Hash)
bq.hashesSetMutex.Unlock()
return blockData
}

func (q *blockQueue) has(hash common.Hash) bool {
q.RLock()
defer q.RUnlock()
_, has := q.blocks[hash]
func (bq *blockQueue) has(blockHash common.Hash) (has bool) {
bq.hashesSetMutex.RLock()
defer bq.hashesSetMutex.RUnlock()
_, has = bq.hashesSet[blockHash]
return has
}
10 changes: 2 additions & 8 deletions dot/sync/chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,9 @@ func (s *chainProcessor) stop() {

func (s *chainProcessor) processReadyBlocks() {
for {
select {
case <-s.ctx.Done():
bd := s.readyBlocks.pop(s.ctx)
if s.ctx.Err() != nil {
return
default:
}

bd := s.readyBlocks.pop()
if bd == nil {
continue
}

if err := s.processBlockData(bd); err != nil {
Expand Down
14 changes: 8 additions & 6 deletions dot/sync/chain_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package sync

import (
"context"
"errors"
"fmt"
"testing"
Expand Down Expand Up @@ -663,7 +664,7 @@ func TestChainSync_doSync(t *testing.T) {

workerErr = cs.doSync(req, make(map[peer.ID]struct{}))
require.Nil(t, workerErr)
bd := readyBlocks.pop()
bd := readyBlocks.pop(context.Background())
require.NotNil(t, bd)
require.Equal(t, resp.BlockData[0], bd)

Expand Down Expand Up @@ -699,11 +700,11 @@ func TestChainSync_doSync(t *testing.T) {
workerErr = cs.doSync(req, make(map[peer.ID]struct{}))
require.Nil(t, workerErr)

bd = readyBlocks.pop()
bd = readyBlocks.pop(context.Background())
require.NotNil(t, bd)
require.Equal(t, resp.BlockData[0], bd)

bd = readyBlocks.pop()
bd = readyBlocks.pop(context.Background())
require.NotNil(t, bd)
require.Equal(t, resp.BlockData[1], bd)
}
Expand Down Expand Up @@ -757,9 +758,10 @@ func TestHandleReadyBlock(t *testing.T) {
require.False(t, cs.pendingBlocks.hasBlock(header3.Hash()))
require.True(t, cs.pendingBlocks.hasBlock(header2NotDescendant.Hash()))

require.Equal(t, block1.ToBlockData(), readyBlocks.pop())
require.Equal(t, block2.ToBlockData(), readyBlocks.pop())
require.Equal(t, block3.ToBlockData(), readyBlocks.pop())
ctx := context.Background()
require.Equal(t, block1.ToBlockData(), readyBlocks.pop(ctx))
require.Equal(t, block2.ToBlockData(), readyBlocks.pop(ctx))
require.Equal(t, block3.ToBlockData(), readyBlocks.pop(ctx))
}

func TestChainSync_determineSyncPeers(t *testing.T) {
Expand Down
9 changes: 6 additions & 3 deletions dot/sync/tip_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package sync

import (
"context"
"testing"

"github.com/ChainSafe/gossamer/dot/network"
Expand Down Expand Up @@ -233,7 +234,8 @@ func TestTipSyncer_handleTick_case3(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []*worker(nil), w)
require.False(t, s.pendingBlocks.hasBlock(header.Hash()))
require.Equal(t, block.ToBlockData(), s.readyBlocks.pop())
readyBlockData := s.readyBlocks.pop(context.Background())
require.Equal(t, block.ToBlockData(), readyBlockData)

// add pending block w/ full block, but block is not ready as parent is unknown
bs := new(syncmocks.BlockState)
Expand Down Expand Up @@ -274,8 +276,9 @@ func TestTipSyncer_handleTick_case3(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []*worker(nil), w)
require.False(t, s.pendingBlocks.hasBlock(header.Hash()))
s.readyBlocks.pop() // first pop will remove parent
require.Equal(t, block.ToBlockData(), s.readyBlocks.pop())
_ = s.readyBlocks.pop(context.Background()) // first pop removes the parent
readyBlockData = s.readyBlocks.pop(context.Background())
require.Equal(t, block.ToBlockData(), readyBlockData)
}

func TestTipSyncer_hasCurrentWorker(t *testing.T) {
Expand Down

0 comments on commit f52f974

Please sign in to comment.