Skip to content

Commit

Permalink
fix(lib/babe): Unrestricted Loop When Building Blocks (GSR-19) (#2632)
Browse files Browse the repository at this point in the history
Co-authored-by: Quentin McGaw <quentin.mcgaw@gmail.com>
  • Loading branch information
edwardmack and qdm12 committed Oct 6, 2022
1 parent 736cbc8 commit 139ad89
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 15 deletions.
7 changes: 7 additions & 0 deletions dot/state/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package state

import (
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/telemetry"

Expand Down Expand Up @@ -47,6 +48,12 @@ func (s *TransactionState) Pop() *transaction.ValidTransaction {
return s.queue.Pop()
}

// PopWithTimer returns the next valid transaction from the queue.
// When the timer expires, it returns `nil`.
func (s *TransactionState) PopWithTimer(timerCh <-chan time.Time) (transaction *transaction.ValidTransaction) {
return s.queue.PopWithTimer(timerCh)
}

// Peek returns the head of the queue without removing it
func (s *TransactionState) Peek() *transaction.ValidTransaction {
return s.queue.Peek()
Expand Down
19 changes: 9 additions & 10 deletions lib/babe/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,15 @@ func (b *BlockBuilder) buildBlockSeal(header *types.Header) (*types.SealDigest,
func (b *BlockBuilder) buildBlockExtrinsics(slot Slot, rt runtime.Instance) []*transaction.ValidTransaction {
var included []*transaction.ValidTransaction

for !hasSlotEnded(slot) {
txn := b.transactionState.Pop()
// Transaction queue is empty.
if txn == nil {
continue
slotEnd := slot.start.Add(slot.duration * 2 / 3) // reserve last 1/3 of slot for block finalisation
timeout := time.Until(slotEnd)
slotTimer := time.NewTimer(timeout)

for {
txn := b.transactionState.PopWithTimer(slotTimer.C)
slotTimerExpired := txn == nil
if slotTimerExpired {
break
}

extrinsic := txn.Extrinsic
Expand Down Expand Up @@ -287,11 +291,6 @@ func (b *BlockBuilder) addToQueue(txs []*transaction.ValidTransaction) {
}
}

func hasSlotEnded(slot Slot) bool {
slotEnd := slot.start.Add(slot.duration * 2 / 3) // reserve last 1/3 of slot for block finalisation
return time.Since(slotEnd) >= 0
}

func extrinsicsToBody(inherents [][]byte, txs []*transaction.ValidTransaction) (types.Body, error) {
extrinsics := types.BytesArrayToExtrinsics(inherents)

Expand Down
14 changes: 14 additions & 0 deletions lib/babe/mock_state_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/babe/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type TransactionState interface {
Push(vt *transaction.ValidTransaction) (common.Hash, error)
Pop() *transaction.ValidTransaction
Peek() *transaction.ValidTransaction
PopWithTimer(timerCh <-chan time.Time) (tx *transaction.ValidTransaction)
}

// EpochState is the interface for epoch methods
Expand Down
46 changes: 41 additions & 5 deletions lib/transaction/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"container/heap"
"errors"
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
Expand Down Expand Up @@ -77,17 +78,18 @@ func (pq *priorityQueue) Pop() interface{} {

// PriorityQueue is a thread safe wrapper over `priorityQueue`
type PriorityQueue struct {
pq priorityQueue
currOrder uint64
txs map[common.Hash]*Item
pq priorityQueue
currOrder uint64
txs map[common.Hash]*Item
pollInterval time.Duration
sync.Mutex
}

// NewPriorityQueue creates new instance of PriorityQueue
func NewPriorityQueue() *PriorityQueue {
spq := &PriorityQueue{
pq: make(priorityQueue, 0),
txs: make(map[common.Hash]*Item),
txs: make(map[common.Hash]*Item),
pollInterval: 10 * time.Millisecond,
}

heap.Init(&spq.pq)
Expand Down Expand Up @@ -139,6 +141,40 @@ func (spq *PriorityQueue) Push(txn *ValidTransaction) (common.Hash, error) {
return hash, nil
}

// PopWithTimer returns the next valid transaction from the queue.
// When the timer expires, it returns `nil`.
func (spq *PriorityQueue) PopWithTimer(timerCh <-chan time.Time) (transaction *ValidTransaction) {
transaction = spq.Pop()
if transaction != nil {
return transaction
}

transactionChannel := make(chan *ValidTransaction)
go func() {
pollTicker := time.NewTicker(spq.pollInterval)
defer pollTicker.Stop()

for {
select {
case <-timerCh:
transactionChannel <- nil
return
case <-pollTicker.C:
}

transaction := spq.Pop()
if transaction == nil {
continue
}

transactionChannel <- transaction
return
}
}()

return <-transactionChannel
}

// Pop removes the transaction with has the highest priority value from the queue and returns it.
// If there are multiple transaction with same priority value then it return them in FIFO order.
func (spq *PriorityQueue) Pop() *ValidTransaction {
Expand Down
57 changes: 57 additions & 0 deletions lib/transaction/priority_queue_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2021 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

//go:build integration

package transaction

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test_PopWithTimer(t *testing.T) {
pq := NewPriorityQueue()
slotTimer := time.NewTimer(time.Second)

tests := []*ValidTransaction{
{
Extrinsic: []byte("a"),
Validity: &Validity{Priority: 1},
},
{
Extrinsic: []byte("b"),
Validity: &Validity{Priority: 4},
},
{
Extrinsic: []byte("c"),
Validity: &Validity{Priority: 2},
},
{
Extrinsic: []byte("d"),
Validity: &Validity{Priority: 17},
},
{
Extrinsic: []byte("e"),
Validity: &Validity{Priority: 2},
},
}

expected := []int{3, 1, 2, 4, 0}

for _, test := range tests {
pq.Push(test)
}

counter := 0
for {
txn := pq.PopWithTimer(slotTimer.C)
if txn == nil {
break
}
assert.Equal(t, tests[expected[counter]], txn)
counter++
}
}
75 changes: 75 additions & 0 deletions lib/transaction/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestPriorityQueue(t *testing.T) {
Expand Down Expand Up @@ -270,3 +272,76 @@ func TestRemoveExtrinsic(t *testing.T) {
t.Fatalf("Fail: got %v expected %v", res, tests[1])
}
}

func Test_PriorityQueue_PopWithTimer(t *testing.T) {
t.Parallel()

testCases := map[string]struct {
queueBuilder func() *PriorityQueue
queueModifier func(queue *PriorityQueue, done chan<- struct{})
timer *time.Timer
transaction *ValidTransaction
}{
"empty queue polled once": {
// test should last 1ns
queueBuilder: NewPriorityQueue,
timer: time.NewTimer(time.Nanosecond),
},
"empty queue polled multiple times": {
// test should last 1ms
queueBuilder: func() *PriorityQueue {
queue := NewPriorityQueue()
queue.pollInterval = time.Nanosecond
return queue
},
timer: time.NewTimer(time.Millisecond),
},
"queue with one element polled once": {
// test should be instantaneous
queueBuilder: func() *PriorityQueue {
queue := NewPriorityQueue()
queue.Push(&ValidTransaction{Validity: &Validity{Priority: 1}})
return queue
},
timer: time.NewTimer(time.Nanosecond),
transaction: &ValidTransaction{Validity: &Validity{Priority: 1}},
},
"queue polled multiple times until new element": {
// test should last 1ms
queueBuilder: func() *PriorityQueue {
queue := NewPriorityQueue()
queue.pollInterval = time.Nanosecond
return queue
},
queueModifier: func(queue *PriorityQueue, done chan<- struct{}) {
close(done)
time.Sleep(time.Millisecond)
queue.Push(&ValidTransaction{Validity: &Validity{Priority: 1}})
},
timer: time.NewTimer(time.Second),
transaction: &ValidTransaction{Validity: &Validity{Priority: 1}},
},
}

for name, testCase := range testCases {
testCase := testCase
t.Run(name, func(t *testing.T) {
t.Parallel()

queue := testCase.queueBuilder()

modifyDone := make(chan struct{})
if testCase.queueModifier != nil {
// modify queue asynchronously while popping
go testCase.queueModifier(queue, modifyDone)
} else {
close(modifyDone)
}

transaction := queue.PopWithTimer(testCase.timer.C)
<-modifyDone
testCase.timer.Stop()
assert.Equal(t, testCase.transaction, transaction)
})
}
}

0 comments on commit 139ad89

Please sign in to comment.