Skip to content

Commit

Permalink
fix(dot/network): change BlockRequestMessage number from uint64 to ui…
Browse files Browse the repository at this point in the history
…nt32
  • Loading branch information
noot committed Mar 11, 2022
1 parent 226b081 commit 8105cd4
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 150 deletions.
12 changes: 7 additions & 5 deletions dot/network/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
)

var (
errCannotValidateHandshake = errors.New("failed to validate handshake")
errMessageTypeNotValid = errors.New("message type is not valid")
errMessageIsNotHandshake = errors.New("failed to convert message to Handshake")
errInvalidHandshakeForPeer = errors.New("peer previously sent invalid handshake")
errHandshakeTimeout = errors.New("handshake timeout reached")
errCannotValidateHandshake = errors.New("failed to validate handshake")
errMessageTypeNotValid = errors.New("message type is not valid")
errMessageIsNotHandshake = errors.New("failed to convert message to Handshake")
errInvalidHandshakeForPeer = errors.New("peer previously sent invalid handshake")
errHandshakeTimeout = errors.New("handshake timeout reached")
errBlockRequestFromNumberInvalid = errors.New("block request message From number is not valid")
errInvalidStartingBlockType = errors.New("invalid StartingBlock in messsage")
)
2 changes: 1 addition & 1 deletion dot/network/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *testStreamHandler) readStream(stream libp2pnetwork.Stream,
}
}

var starting, _ = variadic.NewUint64OrHash(uint64(1))
var starting, _ = variadic.NewUint32OrHash(uint32(1))

var one = uint32(1)

Expand Down
24 changes: 12 additions & 12 deletions dot/network/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (sd SyncDirection) String() string {
// BlockRequestMessage is sent to request some blocks from a peer
type BlockRequestMessage struct {
RequestedData byte
StartingBlock variadic.Uint64OrHash // first byte 0 = block hash (32 byte), first byte 1 = block number (int64)
StartingBlock variadic.Uint32OrHash // first byte 0 = block hash (32 byte), first byte 1 = block number (uint32)
EndBlockHash *common.Hash
Direction SyncDirection // 0 = ascending, 1 = descending
Max *uint32
Expand Down Expand Up @@ -133,14 +133,14 @@ func (bm *BlockRequestMessage) Encode() ([]byte, error) {
msg.FromBlock = &pb.BlockRequest_Hash{
Hash: hash[:],
}
} else if bm.StartingBlock.IsUint64() {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, bm.StartingBlock.Uint64())
} else if bm.StartingBlock.IsUint32() {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, bm.StartingBlock.Uint32())
msg.FromBlock = &pb.BlockRequest_Number{
Number: buf,
}
} else {
return nil, errors.New("invalid StartingBlock in messsage")
return nil, errInvalidStartingBlockType
}

return proto.Marshal(msg)
Expand All @@ -155,21 +155,21 @@ func (bm *BlockRequestMessage) Decode(in []byte) error {
}

var (
startingBlock *variadic.Uint64OrHash
startingBlock *variadic.Uint32OrHash
endBlockHash *common.Hash
max *uint32
)

switch from := msg.FromBlock.(type) {
case *pb.BlockRequest_Hash:
startingBlock, err = variadic.NewUint64OrHash(common.BytesToHash(from.Hash))
startingBlock, err = variadic.NewUint32OrHash(common.BytesToHash(from.Hash))
case *pb.BlockRequest_Number:
// TODO: we are receiving block requests w/ 4-byte From field; this should probably be
// 4-bytes as it represents a block number which is uint32 (#1854)
if len(from.Number) != 8 {
return errors.New("invalid BlockResponseMessage.From; uint64 is not 8 bytes")
if len(from.Number) != 4 {
return fmt.Errorf("%w expected 4 bytes, got %d bytes", errBlockRequestFromNumberInvalid, len(from.Number))
}
startingBlock, err = variadic.NewUint64OrHash(binary.LittleEndian.Uint64(from.Number))

number := binary.LittleEndian.Uint32(from.Number)
startingBlock, err = variadic.NewUint32OrHash(number)
default:
err = errors.New("invalid StartingBlock")
}
Expand Down
10 changes: 5 additions & 5 deletions dot/network/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestEncodeBlockRequestMessage(t *testing.T) {
var one uint32 = 1
bm := &BlockRequestMessage{
RequestedData: 1,
StartingBlock: *variadic.NewUint64OrHashFromBytes(append([]byte{0}, genesisHash...)),
StartingBlock: *variadic.NewUint32OrHashFromBytes(append([]byte{0}, genesisHash...)),
EndBlockHash: &endBlock,
Direction: 1,
Max: &one,
Expand All @@ -51,7 +51,7 @@ func TestEncodeBlockRequestMessage_BlockHash(t *testing.T) {
var one uint32 = 1
bm := &BlockRequestMessage{
RequestedData: 1,
StartingBlock: *variadic.NewUint64OrHashFromBytes(append([]byte{0}, genesisHash...)),
StartingBlock: *variadic.NewUint32OrHashFromBytes(append([]byte{0}, genesisHash...)),
EndBlockHash: &endBlock,
Direction: 1,
Max: &one,
Expand All @@ -74,7 +74,7 @@ func TestEncodeBlockRequestMessage_BlockNumber(t *testing.T) {
var one uint32 = 1
bm := &BlockRequestMessage{
RequestedData: 1,
StartingBlock: *variadic.NewUint64OrHashFromBytes([]byte{1, 1}),
StartingBlock: *variadic.NewUint32OrHashFromBytes([]byte{1, 1}),
EndBlockHash: &endBlock,
Direction: 1,
Max: &one,
Expand All @@ -96,7 +96,7 @@ func TestBlockRequestString(t *testing.T) {

bm := &BlockRequestMessage{
RequestedData: 1,
StartingBlock: *variadic.NewUint64OrHashFromBytes(append([]byte{0}, genesisHash...)),
StartingBlock: *variadic.NewUint32OrHashFromBytes(append([]byte{0}, genesisHash...)),
EndBlockHash: nil,
Direction: 1,
Max: nil,
Expand All @@ -116,7 +116,7 @@ func TestEncodeBlockRequestMessage_NoOptionals(t *testing.T) {

bm := &BlockRequestMessage{
RequestedData: 1,
StartingBlock: *variadic.NewUint64OrHashFromBytes(append([]byte{0}, genesisHash...)),
StartingBlock: *variadic.NewUint32OrHashFromBytes(append([]byte{0}, genesisHash...)),
EndBlockHash: nil,
Direction: 1,
Max: nil,
Expand Down
4 changes: 3 additions & 1 deletion dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
return
}

if s.host.messageCache != nil && s.host.messageCache.exists(peer, msg) {
_, isConsensusMsg := msg.(*ConsensusMessage)

if s.host.messageCache != nil && s.host.messageCache.exists(peer, msg) && !isConsensusMsg {
logger.Tracef("message has already been sent, ignoring: peer=%s msg=%s", peer, msg)
return
}
Expand Down
6 changes: 3 additions & 3 deletions dot/sync/chain_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestChainProcessor_HandleBlockResponse_ValidChain(t *testing.T) {

// syncer makes request for chain
startNum := 1
start, err := variadic.NewUint64OrHash(startNum)
start, err := variadic.NewUint32OrHash(startNum)
require.NoError(t, err)

req := &network.BlockRequestMessage{
Expand All @@ -57,7 +57,7 @@ func TestChainProcessor_HandleBlockResponse_ValidChain(t *testing.T) {

// syncer makes request for chain again (block 129+)
startNum = 129
start, err = variadic.NewUint64OrHash(startNum)
start, err = variadic.NewUint32OrHash(startNum)
require.NoError(t, err)

req = &network.BlockRequestMessage{
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestChainProcessor_HandleBlockResponse_MissingBlocks(t *testing.T) {
}

startNum := 15
start, err := variadic.NewUint64OrHash(startNum)
start, err := variadic.NewUint32OrHash(startNum)
require.NoError(t, err)

req := &network.BlockRequestMessage{
Expand Down
16 changes: 8 additions & 8 deletions dot/sync/chain_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,9 +764,9 @@ func (cs *chainSync) handleReadyBlock(bd *types.BlockData) {

// determineSyncPeers returns a list of peers that likely have the blocks in the given block request.
func (cs *chainSync) determineSyncPeers(req *network.BlockRequestMessage, peersTried map[peer.ID]struct{}) []peer.ID {
var start uint64
if req.StartingBlock.IsUint64() {
start = req.StartingBlock.Uint64()
var start uint32
if req.StartingBlock.IsUint32() {
start = req.StartingBlock.Uint32()
}

cs.RLock()
Expand Down Expand Up @@ -796,7 +796,7 @@ func (cs *chainSync) determineSyncPeers(req *network.BlockRequestMessage, peersT

// if peer definitely doesn't have any blocks we want in the request,
// don't request from them
if start > 0 && uint64(state.number) < start {
if start > 0 && uint32(state.number) < start {
continue
}

Expand Down Expand Up @@ -1005,18 +1005,18 @@ func workerToRequests(w *worker) ([]*network.BlockRequestMessage, error) {
max = uint32(size)
}

var start *variadic.Uint64OrHash
var start *variadic.Uint32OrHash
if w.startHash.IsEmpty() {
// worker startHash is unspecified if we are in bootstrap mode
start = variadic.MustNewUint64OrHash(uint64(startNumber))
start = variadic.MustNewUint32OrHash(uint32(startNumber))
} else {
// in tip-syncing mode, we know the hash of the block on the fork we wish to sync
start = variadic.MustNewUint64OrHash(w.startHash)
start = variadic.MustNewUint32OrHash(w.startHash)

// if we're doing descending requests and not at the last (highest starting) request,
// then use number as start block
if w.direction == network.Descending && i != numRequests-1 {
start = variadic.MustNewUint64OrHash(startNumber)
start = variadic.MustNewUint32OrHash(startNumber)
}
}

Expand Down
30 changes: 15 additions & 15 deletions dot/sync/chain_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func TestWorkerToRequests(t *testing.T) {
expected: []*network.BlockRequestMessage{
{
RequestedData: bootstrapRequestData,
StartingBlock: *variadic.MustNewUint64OrHash(1),
StartingBlock: *variadic.MustNewUint32OrHash(1),
EndBlockHash: nil,
Direction: network.Ascending,
Max: &max128,
Expand All @@ -291,14 +291,14 @@ func TestWorkerToRequests(t *testing.T) {
expected: []*network.BlockRequestMessage{
{
RequestedData: bootstrapRequestData,
StartingBlock: *variadic.MustNewUint64OrHash(1),
StartingBlock: *variadic.MustNewUint32OrHash(1),
EndBlockHash: nil,
Direction: network.Ascending,
Max: &max128,
},
{
RequestedData: network.RequestedDataHeader + network.RequestedDataBody + network.RequestedDataJustification,
StartingBlock: *variadic.MustNewUint64OrHash(1 + maxResponseSize),
StartingBlock: *variadic.MustNewUint32OrHash(1 + maxResponseSize),
EndBlockHash: nil,
Direction: network.Ascending,
Max: &max128,
Expand All @@ -315,7 +315,7 @@ func TestWorkerToRequests(t *testing.T) {
expected: []*network.BlockRequestMessage{
{
RequestedData: bootstrapRequestData,
StartingBlock: *variadic.MustNewUint64OrHash(1),
StartingBlock: *variadic.MustNewUint32OrHash(1),
EndBlockHash: nil,
Direction: network.Ascending,
Max: &max128,
Expand All @@ -332,7 +332,7 @@ func TestWorkerToRequests(t *testing.T) {
expected: []*network.BlockRequestMessage{
{
RequestedData: bootstrapRequestData,
StartingBlock: *variadic.MustNewUint64OrHash(10),
StartingBlock: *variadic.MustNewUint32OrHash(10),
EndBlockHash: nil,
Direction: network.Descending,
Max: &max9,
Expand All @@ -349,14 +349,14 @@ func TestWorkerToRequests(t *testing.T) {
expected: []*network.BlockRequestMessage{
{
RequestedData: bootstrapRequestData,
StartingBlock: *variadic.MustNewUint64OrHash(1),
StartingBlock: *variadic.MustNewUint32OrHash(1),
EndBlockHash: nil,
Direction: network.Ascending,
Max: &max128,
},
{
RequestedData: network.RequestedDataHeader + network.RequestedDataBody + network.RequestedDataJustification,
StartingBlock: *variadic.MustNewUint64OrHash(1 + maxResponseSize),
StartingBlock: *variadic.MustNewUint32OrHash(1 + maxResponseSize),
EndBlockHash: nil,
Direction: network.Ascending,
Max: &max128,
Expand All @@ -374,7 +374,7 @@ func TestWorkerToRequests(t *testing.T) {
expected: []*network.BlockRequestMessage{
{
RequestedData: bootstrapRequestData,
StartingBlock: *variadic.MustNewUint64OrHash(1),
StartingBlock: *variadic.MustNewUint32OrHash(1),
EndBlockHash: &(common.Hash{0xa}),
Direction: network.Ascending,
Max: &max128,
Expand All @@ -393,7 +393,7 @@ func TestWorkerToRequests(t *testing.T) {
expected: []*network.BlockRequestMessage{
{
RequestedData: bootstrapRequestData,
StartingBlock: *variadic.MustNewUint64OrHash(common.Hash{0xb}),
StartingBlock: *variadic.MustNewUint32OrHash(common.Hash{0xb}),
EndBlockHash: &(common.Hash{0xc}),
Direction: network.Ascending,
Max: &max128,
Expand All @@ -410,7 +410,7 @@ func TestWorkerToRequests(t *testing.T) {
expected: []*network.BlockRequestMessage{
{
RequestedData: bootstrapRequestData,
StartingBlock: *variadic.MustNewUint64OrHash(10),
StartingBlock: *variadic.MustNewUint32OrHash(10),
Direction: network.Ascending,
Max: &max128,
},
Expand All @@ -426,14 +426,14 @@ func TestWorkerToRequests(t *testing.T) {
expected: []*network.BlockRequestMessage{
{
RequestedData: network.RequestedDataHeader + network.RequestedDataBody + network.RequestedDataJustification,
StartingBlock: *variadic.MustNewUint64OrHash(1 + (maxResponseSize / 2)),
StartingBlock: *variadic.MustNewUint32OrHash(1 + (maxResponseSize / 2)),
EndBlockHash: nil,
Direction: network.Descending,
Max: &max64,
},
{
RequestedData: bootstrapRequestData,
StartingBlock: *variadic.MustNewUint64OrHash(1 + maxResponseSize + (maxResponseSize / 2)),
StartingBlock: *variadic.MustNewUint32OrHash(1 + maxResponseSize + (maxResponseSize / 2)),
EndBlockHash: nil,
Direction: network.Descending,
Max: &max128,
Expand Down Expand Up @@ -626,7 +626,7 @@ func TestChainSync_doSync(t *testing.T) {
max := uint32(1)
req := &network.BlockRequestMessage{
RequestedData: bootstrapRequestData,
StartingBlock: *variadic.MustNewUint64OrHash(1),
StartingBlock: *variadic.MustNewUint32OrHash(1),
EndBlockHash: nil,
Direction: network.Ascending,
Max: &max,
Expand Down Expand Up @@ -798,7 +798,7 @@ func TestChainSync_determineSyncPeers(t *testing.T) {
require.Equal(t, 0, len(cs.ignorePeers))

// test peer's best block below number case, shouldn't include that peer
start, err := variadic.NewUint64OrHash(130)
start, err := variadic.NewUint32OrHash(130)
require.NoError(t, err)
req.StartingBlock = *start
peers = cs.determineSyncPeers(req, peersTried)
Expand All @@ -807,7 +807,7 @@ func TestChainSync_determineSyncPeers(t *testing.T) {

// test peer tried case, should ignore peer already tried
peersTried[testPeerA] = struct{}{}
req.StartingBlock = variadic.Uint64OrHash{}
req.StartingBlock = variadic.Uint32OrHash{}
peers = cs.determineSyncPeers(req, peersTried)
require.Equal(t, 1, len(peers))
require.Equal(t, []peer.ID{testPeerB}, peers)
Expand Down
4 changes: 2 additions & 2 deletions dot/sync/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *Service) handleAscendingRequest(req *network.BlockRequestMessage) (*net
}

switch startBlock := req.StartingBlock.Value().(type) {
case uint64:
case uint32:
if startBlock == 0 {
startBlock = 1
}
Expand Down Expand Up @@ -147,7 +147,7 @@ func (s *Service) handleDescendingRequest(req *network.BlockRequestMessage) (*ne
}

switch startBlock := req.StartingBlock.Value().(type) {
case uint64:
case uint32:
bestBlockNumber, err := s.blockState.BestBlockNumber()
if err != nil {
return nil, fmt.Errorf("failed to get best block %d for request: %w", bestBlockNumber, err)
Expand Down
Loading

0 comments on commit 8105cd4

Please sign in to comment.