Skip to content

Commit

Permalink
feat(lib/trie): opportunistic parallel hashing (#2081)
Browse files Browse the repository at this point in the history
- Uses up to N goroutines where N is the number of CPU cores available
- Defaults to sequential encoding of branches if no more worker goroutines are available
- Only encode in parallel branches, encode leaves sequentially
  • Loading branch information
qdm12 committed Jan 11, 2022
1 parent cc3599a commit 790dfb5
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 39 deletions.
97 changes: 65 additions & 32 deletions internal/trie/node/branch_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"hash"
"io"
"runtime"

"github.com/ChainSafe/gossamer/internal/trie/codec"
"github.com/ChainSafe/gossamer/internal/trie/pools"
Expand Down Expand Up @@ -113,43 +114,72 @@ func (b *Branch) Encode(buffer Buffer) (err error) {
}
}

const parallel = false // TODO Done in pull request #2081
if parallel {
err = encodeChildrenInParallel(b.Children, buffer)
} else {
err = encodeChildrenSequentially(b.Children, buffer)
}
err = encodeChildrenOpportunisticParallel(b.Children, buffer)
if err != nil {
return fmt.Errorf("cannot encode children of branch: %w", err)
}

return nil
}

func encodeChildrenInParallel(children [16]Node, buffer io.Writer) (err error) {
type result struct {
index int
buffer *bytes.Buffer
err error
type encodingAsyncResult struct {
index int
buffer *bytes.Buffer
err error
}

func runEncodeChild(child Node, index int,
results chan<- encodingAsyncResult, rateLimit <-chan struct{}) {
buffer := pools.EncodingBuffers.Get().(*bytes.Buffer)
buffer.Reset()
// buffer is put back in the pool after processing its
// data in the select block below.

err := encodeChild(child, buffer)

results <- encodingAsyncResult{
index: index,
buffer: buffer,
err: err,
}
if rateLimit != nil {
// Only run if runEncodeChild is launched
// in its own goroutine.
<-rateLimit
}
}

var parallelLimit = runtime.NumCPU()

var parallelEncodingRateLimit = make(chan struct{}, parallelLimit)

resultsCh := make(chan result)
// encodeChildrenOpportunisticParallel encodes children in parallel eventually.
// Leaves are encoded in a blocking way, and branches are encoded in separate
// goroutines IF they are less than the parallelLimit number of goroutines already
// running. This is designed to limit the total number of goroutines in order to
// avoid using too much memory on the stack.
func encodeChildrenOpportunisticParallel(children [16]Node, buffer io.Writer) (err error) {
// Buffered channels since children might be encoded in this
// goroutine or another one.
resultsCh := make(chan encodingAsyncResult, len(children))

for i, child := range children {
go func(index int, child Node) {
buffer := pools.EncodingBuffers.Get().(*bytes.Buffer)
buffer.Reset()
// buffer is put back in the pool after processing its
// data in the select block below.

err := encodeChild(child, buffer)

resultsCh <- result{
index: index,
buffer: buffer,
err: err,
}
}(i, child)
if isNodeNil(child) || child.Type() == LeafType {
runEncodeChild(child, i, resultsCh, nil)
continue
}

// Branch child
select {
case parallelEncodingRateLimit <- struct{}{}:
// We have a goroutine available to encode
// the branch in parallel.
go runEncodeChild(child, i, resultsCh, parallelEncodingRateLimit)
default:
// we reached the maximum parallel goroutines
// so encode this branch in this goroutine
runEncodeChild(child, i, resultsCh, nil)
}
}

currentIndex := 0
Expand All @@ -166,7 +196,7 @@ func encodeChildrenInParallel(children [16]Node, buffer io.Writer) (err error) {
for currentIndex < len(children) &&
resultBuffers[currentIndex] != nil {
bufferSlice := resultBuffers[currentIndex].Bytes()
if len(bufferSlice) > 0 {
if err == nil && len(bufferSlice) > 0 {
// note buffer.Write copies the byte slice given as argument
_, writeErr := buffer.Write(bufferSlice)
if writeErr != nil && err == nil {
Expand Down Expand Up @@ -203,17 +233,20 @@ func encodeChildrenSequentially(children [16]Node, buffer io.Writer) (err error)
return nil
}

func encodeChild(child Node, buffer io.Writer) (err error) {
var isNil bool
switch impl := child.(type) {
func isNodeNil(n Node) (isNil bool) {
switch impl := n.(type) {
case *Branch:
isNil = impl == nil
case *Leaf:
isNil = impl == nil
default:
isNil = child == nil
isNil = n == nil
}
if isNil {
return isNil
}

func encodeChild(child Node, buffer io.Writer) (err error) {
if isNodeNil(child) {
return nil
}

Expand Down
97 changes: 90 additions & 7 deletions internal/trie/node/branch_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package node

import (
"bytes"
"io"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -251,7 +253,7 @@ func Test_Branch_Encode(t *testing.T) {
wrappedErr: errTest,
errMessage: "cannot write encoded value to buffer: test error",
},
"buffer write error for children encoded sequentially": {
"buffer write error for children encoding": {
branch: &Branch{
Key: []byte{1, 2, 3},
Value: []byte{100},
Expand Down Expand Up @@ -280,10 +282,10 @@ func Test_Branch_Encode(t *testing.T) {
},
wrappedErr: errTest,
errMessage: "cannot encode children of branch: " +
"cannot encode child at index 3: " +
"failed to write child to buffer: test error",
"cannot write encoding of child at index 3: " +
"test error",
},
"success with sequential children encoding": {
"success with children encoding": {
branch: &Branch{
Key: []byte{1, 2, 3},
Value: []byte{100},
Expand Down Expand Up @@ -346,7 +348,46 @@ func Test_Branch_Encode(t *testing.T) {
}
}

func Test_encodeChildrenInParallel(t *testing.T) {
// Opportunistic parallel: 13781602 ns/op 14419488 B/op 323575 allocs/op
// Sequentially: 24269268 ns/op 20126525 B/op 327668 allocs/op
func Benchmark_encodeChildrenOpportunisticParallel(b *testing.B) {
const valueBytesSize = 10
const depth = 3 // do not raise above 4

children := populateChildren(valueBytesSize, depth)

b.Run("", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = encodeChildrenOpportunisticParallel(children, io.Discard)
}
})
}

func populateChildren(valueSize, depth int) (children [16]Node) {
someValue := make([]byte, valueSize)

if depth == 0 {
for i := range children {
children[i] = &Leaf{
Key: someValue,
Value: someValue,
}
}
return children
}

for i := range children {
children[i] = &Branch{
Key: someValue,
Value: someValue,
Children: populateChildren(valueSize, depth-1),
}
}

return children
}

func Test_encodeChildrenOpportunisticParallel(t *testing.T) {
t.Parallel()

testCases := map[string]struct {
Expand Down Expand Up @@ -393,7 +434,7 @@ func Test_encodeChildrenInParallel(t *testing.T) {
},
},
},
"encoding error": {
"leaf encoding error": {
children: [16]Node{
nil, nil, nil, nil,
nil, nil, nil, nil,
Expand All @@ -413,6 +454,23 @@ func Test_encodeChildrenInParallel(t *testing.T) {
errMessage: "cannot write encoding of child at index 11: " +
"test error",
},
"branch encoding": {
// Note this may run in parallel or not depending on other tests
// running in parallel.
children: [16]Node{
&Branch{
Key: []byte{1},
Children: [16]Node{
&Leaf{Key: []byte{1}},
},
},
},
writes: []writeCall{
{
written: []byte{32, 129, 1, 1, 0, 12, 65, 1, 0},
},
},
},
}

for name, testCase := range testCases {
Expand All @@ -434,7 +492,7 @@ func Test_encodeChildrenInParallel(t *testing.T) {
previousCall = call
}

err := encodeChildrenInParallel(testCase.children, buffer)
err := encodeChildrenOpportunisticParallel(testCase.children, buffer)

if testCase.wrappedErr != nil {
assert.ErrorIs(t, err, testCase.wrappedErr)
Expand All @@ -444,6 +502,31 @@ func Test_encodeChildrenInParallel(t *testing.T) {
}
})
}

t.Run("opportunist parallel branch encoding", func(t *testing.T) {
t.Parallel()

var children [16]Node
for i := range children {
children[i] = &Branch{}
}

buffer := bytes.NewBuffer(nil)

err := encodeChildrenOpportunisticParallel(children, buffer)

require.NoError(t, err)
expectedBytes := []byte{
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0,
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0,
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0,
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0,
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0,
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0,
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0,
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0}
assert.Equal(t, expectedBytes, buffer.Bytes())
})
}

func Test_encodeChildrenSequentially(t *testing.T) {
Expand Down

0 comments on commit 790dfb5

Please sign in to comment.