Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/network): remove maxReads limitation to read stream #3287

Merged
merged 5 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dot/network/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ var (
errInvalidStartingBlockType = errors.New("invalid StartingBlock in messsage")
errInboundHanshakeExists = errors.New("an inbound handshake already exists for given peer")
errInvalidRole = errors.New("invalid role")
ErrFailedToReadEntireMessage = errors.New("failed to read entire message")
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
ErrNilStream = errors.New("nil stream")
ErrInvalidLEB128EncodedData = errors.New("invalid LEB128 encoded data")
ErrGreaterThanMaxSize = errors.New("greater than maximum size")
)
249 changes: 249 additions & 0 deletions dot/network/mock_stream_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dot/network/mocks_generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ package network
//go:generate mockgen -destination=mock_syncer_test.go -package $GOPACKAGE . Syncer
//go:generate mockgen -destination=mock_block_state_test.go -package $GOPACKAGE . BlockState
//go:generate mockgen -destination=mock_transaction_handler_test.go -package $GOPACKAGE . TransactionHandler
//go:generate mockgen -destination=mock_stream_test.go -package $GOPACKAGE github.com/libp2p/go-libp2p/core/network Stream
3 changes: 1 addition & 2 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ const (
)

var (
logger = log.NewFromGlobal(log.AddContext("pkg", "network"))
maxReads = 256
logger = log.NewFromGlobal(log.AddContext("pkg", "network"))

peerCountGauge = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "gossamer_network_node",
Expand Down
42 changes: 16 additions & 26 deletions dot/network/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package network
import (
crand "crypto/rand"
"encoding/hex"
"errors"
"fmt"
"io"
mrand "math/rand"
Expand Down Expand Up @@ -150,34 +149,32 @@ func uint64ToLEB128(in uint64) []byte {
return out
}

func readLEB128ToUint64(r io.Reader, buf []byte) (uint64, int, error) {
if len(buf) == 0 {
return 0, 0, errors.New("buffer has length 0")
}

func readLEB128ToUint64(r io.Reader) (uint64, int, error) {
var out uint64
var shift uint

maxSize := 10 // Max bytes in LEB128 encoding of uint64 is 10.
bytesRead := 0

for {
n, err := r.Read(buf[:1])
// read a sinlge byte
singleByte := []byte{0}
n, err := r.Read(singleByte)
if err != nil {
return 0, bytesRead, err
}

bytesRead += n

b := buf[0]
b := singleByte[0]
out |= uint64(0x7F&b) << shift
if b&0x80 == 0 {
break
}

maxSize--
if maxSize == 0 {
return 0, bytesRead, fmt.Errorf("invalid LEB128 encoded data")
return 0, bytesRead, ErrInvalidLEB128EncodedData
}

shift += 7
Expand All @@ -186,17 +183,12 @@ func readLEB128ToUint64(r io.Reader, buf []byte) (uint64, int, error) {
}

// readStream reads from the stream into the given buffer, returning the number of bytes read
func readStream(stream libp2pnetwork.Stream, bufPointer *[]byte, maxSize uint64) (int, error) {
func readStream(stream libp2pnetwork.Stream, bufPointer *[]byte, maxSize uint64) (tot int, err error) {
if stream == nil {
return 0, errors.New("stream is nil")
return 0, ErrNilStream
}

var (
tot int
)

buf := *bufPointer
length, bytesRead, err := readLEB128ToUint64(stream, buf[:1])
length, bytesRead, err := readLEB128ToUint64(stream)
if err != nil {
return bytesRead, fmt.Errorf("failed to read length: %w", err)
}
Expand All @@ -205,32 +197,30 @@ func readStream(stream libp2pnetwork.Stream, bufPointer *[]byte, maxSize uint64)
return 0, nil // msg length of 0 is allowed, for example transactions handshake
}

buf := *bufPointer
if length > uint64(len(buf)) {
extraBytes := int(length) - len(buf)
*bufPointer = append(buf, make([]byte, extraBytes)...) // TODO #2288 use bytes.Buffer instead
logger.Warnf("received message with size %d greater than allocated message buffer size %d", length, len(buf))
extraBytes := int(length) - len(buf)
*bufPointer = append(buf, make([]byte, extraBytes)...)
buf = *bufPointer
}

if length > maxSize {
logger.Warnf("received message with size %d greater than max size %d, closing stream", length, maxSize)
return 0, fmt.Errorf("message size greater than maximum: got %d", length)
return 0, fmt.Errorf("%w: max %d, got %d", ErrGreaterThanMaxSize, maxSize, length)
}

tot = 0
for i := 0; i < maxReads; i++ {
for tot < int(length) {
n, err := stream.Read(buf[tot:])
if err != nil {
return n + tot, err
}

tot += n
if tot == int(length) {
break
}
}

if tot != int(length) {
return tot, fmt.Errorf("failed to read entire message: expected %d bytes, received %d bytes", length, tot)
return tot, fmt.Errorf("%w: expected %d bytes, received %d bytes", ErrFailedToReadEntireMessage, length, tot)
}

return tot, nil
Expand Down
Loading