diff --git a/dot/network/block_announce.go b/dot/network/block_announce.go index 64338154cb..8c5771b67e 100644 --- a/dot/network/block_announce.go +++ b/dot/network/block_announce.go @@ -69,11 +69,14 @@ func (bm *BlockAnnounceMessage) Decode(in []byte) error { } // Hash returns the hash of the BlockAnnounceMessage -func (bm *BlockAnnounceMessage) Hash() common.Hash { +func (bm *BlockAnnounceMessage) Hash() (common.Hash, error) { // scale encode each extrinsic - encMsg, _ := bm.Encode() - hash, _ := common.Blake2bHash(encMsg) - return hash + encMsg, err := bm.Encode() + if err != nil { + return common.Hash{}, fmt.Errorf("cannot encode message: %w", err) + } + + return common.Blake2bHash(encMsg) } // IsHandshake returns false @@ -144,9 +147,15 @@ func (*BlockAnnounceHandshake) Type() byte { return 0 } -// Hash ... -func (*BlockAnnounceHandshake) Hash() common.Hash { - return common.Hash{} +// Hash returns blake2b hash of block announce handshake. +func (hs *BlockAnnounceHandshake) Hash() (common.Hash, error) { + // scale encode each extrinsic + encMsg, err := hs.Encode() + if err != nil { + return common.Hash{}, fmt.Errorf("cannot encode handshake: %w", err) + } + + return common.Blake2bHash(encMsg) } // IsHandshake returns true @@ -174,7 +183,7 @@ func (s *Service) validateBlockAnnounceHandshake(from peer.ID, hs Handshake) err return errors.New("invalid handshake type") } - if bhs.GenesisHash != s.blockState.GenesisHash() { + if !bhs.GenesisHash.Equal(s.blockState.GenesisHash()) { s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ Value: peerset.GenesisMismatch, Reason: peerset.GenesisMismatchReason, diff --git a/dot/network/gossip.go b/dot/network/gossip.go index 5dca71cd25..d91f1c017e 100644 --- a/dot/network/gossip.go +++ b/dot/network/gossip.go @@ -4,6 +4,7 @@ package network import ( + "fmt" "sync" "github.com/ChainSafe/gossamer/internal/log" @@ -25,19 +26,25 @@ func newGossip() *gossip { } } -// hasSeen broadcasts messages that have not been seen -func (g *gossip) hasSeen(msg NotificationsMessage) bool { - // check if message has not been seen - msgHash := msg.Hash() +// hasSeen checks if we have seen the given message before. +func (g *gossip) hasSeen(msg NotificationsMessage) (bool, error) { + msgHash, err := msg.Hash() + if err != nil { + return false, fmt.Errorf("could not hash notification message: %w", err) + } + g.seenMutex.Lock() defer g.seenMutex.Unlock() + // check if message has not been seen _, ok := g.seenMap[msgHash] if !ok { // set message to has been seen - g.seenMap[msgHash] = struct{}{} - return false + if !msg.IsHandshake() { + g.seenMap[msgHash] = struct{}{} + } + return false, nil } - return true + return true, nil } diff --git a/dot/network/gossip_test.go b/dot/network/gossip_test.go index 7bad703f9d..6c80f6905b 100644 --- a/dot/network/gossip_test.go +++ b/dot/network/gossip_test.go @@ -89,12 +89,15 @@ func TestGossip(t *testing.T) { time.Sleep(TestMessageTimeout) - _, ok := nodeB.gossip.seenMap[announceMessage.Hash()] + hash, err := announceMessage.Hash() + require.NoError(t, err) + + _, ok := nodeB.gossip.seenMap[hash] require.True(t, ok, "node B did not receive block request message from node A") - _, ok = nodeC.gossip.seenMap[announceMessage.Hash()] + _, ok = nodeC.gossip.seenMap[hash] require.True(t, ok, "node C did not receive block request message from node B") - _, ok = nodeA.gossip.seenMap[announceMessage.Hash()] + _, ok = nodeA.gossip.seenMap[hash] require.True(t, ok, "node A did not receive block request message from node C") } diff --git a/dot/network/message.go b/dot/network/message.go index 798ed3157a..8c26974138 100644 --- a/dot/network/message.go +++ b/dot/network/message.go @@ -36,7 +36,7 @@ type Message interface { type NotificationsMessage interface { Message Type() byte - Hash() common.Hash + Hash() (common.Hash, error) IsHandshake() bool } @@ -389,11 +389,13 @@ func (cm *ConsensusMessage) Decode(in []byte) error { } // Hash returns the Hash of ConsensusMessage -func (cm *ConsensusMessage) Hash() common.Hash { +func (cm *ConsensusMessage) Hash() (common.Hash, error) { // scale encode each extrinsic - encMsg, _ := cm.Encode() - hash, _ := common.Blake2bHash(encMsg) - return hash + encMsg, err := cm.Encode() + if err != nil { + return common.Hash{}, fmt.Errorf("cannot encode message: %w", err) + } + return common.Blake2bHash(encMsg) } // IsHandshake returns false diff --git a/dot/network/message_cache.go b/dot/network/message_cache.go index c3a1da3692..a2182db490 100644 --- a/dot/network/message_cache.go +++ b/dot/network/message_cache.go @@ -5,6 +5,7 @@ package network import ( "errors" + "fmt" "time" "github.com/ChainSafe/gossamer/lib/common" @@ -55,6 +56,7 @@ func (m *messageCache) put(peer peer.ID, msg NotificationsMessage) (bool, error) func (m *messageCache) exists(peer peer.ID, msg NotificationsMessage) bool { key, err := generateCacheKey(peer, msg) if err != nil { + logger.Errorf("could not generate cache key: %s", err) return false } @@ -67,7 +69,12 @@ func generateCacheKey(peer peer.ID, msg NotificationsMessage) ([]byte, error) { return nil, errors.New("cache does not support handshake messages") } - peerMsgHash, err := common.Blake2bHash(append([]byte(peer), msg.Hash().ToBytes()...)) + msgHash, err := msg.Hash() + if err != nil { + return nil, fmt.Errorf("cannot hash notification message: %w", err) + } + + peerMsgHash, err := common.Blake2bHash(append([]byte(peer), msgHash.ToBytes()...)) if err != nil { return nil, err } diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 266ecc6018..758c97b99d 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -136,6 +136,20 @@ func (s *Service) createNotificationsMessageHandler( return fmt.Errorf("%w: expected %T but got %T", errMessageTypeNotValid, (NotificationsMessage)(nil), msg) } + hasSeen, err := s.gossip.hasSeen(msg) + if err != nil { + return fmt.Errorf("could not check if message was seen before: %w", err) + } + + if hasSeen { + // report peer if we get duplicate gossip message. + s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ + Value: peerset.DuplicateGossipValue, + Reason: peerset.DuplicateGossipReason, + }, peer) + return nil + } + if msg.IsHandshake() { logger.Tracef("received handshake on notifications sub-protocol %s from peer %s, message is: %s", info.protocolID, stream.Conn().RemotePeer(), msg) @@ -207,16 +221,7 @@ func (s *Service) createNotificationsMessageHandler( return nil } - if !s.gossip.hasSeen(msg) { - s.broadcastExcluding(info, peer, msg) - return nil - } - - // report peer if we get duplicate gossip message. - s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ - Value: peerset.DuplicateGossipValue, - Reason: peerset.DuplicateGossipReason, - }, peer) + s.broadcastExcluding(info, peer, msg) return nil } } @@ -238,7 +243,13 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc return } - if support, err := s.host.supportsProtocol(peer, info.protocolID); err != nil || !support { + support, err := s.host.supportsProtocol(peer, info.protocolID) + if err != nil { + logger.Errorf("could not check if protocol %s is supported by peer %s: %s", info.protocolID, peer, err) + return + } + + if !support { s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ Value: peerset.BadProtocolValue, Reason: peerset.BadProtocolReason, @@ -319,7 +330,7 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP peer, info.protocolID, hs) stream, err := s.host.send(peer, info.protocolID, hs) if err != nil { - logger.Tracef("failed to send message to peer %s: %s", peer, err) + logger.Tracef("failed to send handshake to peer %s: %s", peer, err) // don't need to close the stream here, as it's nil! return nil, err } @@ -345,7 +356,7 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP } if hsResponse.err != nil { - logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, err) + logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, hsResponse.err) closeOutboundStream(info, peer, stream) return nil, hsResponse.err } diff --git a/dot/network/notifications_test.go b/dot/network/notifications_test.go index a62f121e4b..5670114262 100644 --- a/dot/network/notifications_test.go +++ b/dot/network/notifications_test.go @@ -189,7 +189,9 @@ func TestCreateNotificationsMessageHandler_BlockAnnounceHandshake(t *testing.T) Roles: 4, BestBlockNumber: 77, BestBlockHash: common.Hash{1}, - GenesisHash: common.Hash{2}, + // we are using a different genesis here, thus this + // handshake would be validated to be incorrect. + GenesisHash: common.Hash{2}, } err = handler(stream, testHandshake) @@ -367,28 +369,28 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) { require.Len(t, txnBatch, 1) msg = &TransactionMessage{ - Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}}, } err = handler(stream, msg) require.NoError(t, err) require.Len(t, txnBatch, 2) msg = &TransactionMessage{ - Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}}, } err = handler(stream, msg) require.NoError(t, err) require.Len(t, txnBatch, 3) msg = &TransactionMessage{ - Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}}, } err = handler(stream, msg) require.NoError(t, err) require.Len(t, txnBatch, 4) msg = &TransactionMessage{ - Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}}, } err = handler(stream, msg) require.NoError(t, err) @@ -396,14 +398,14 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) { // reached batch size limit, below transaction will not be included in batch. msg = &TransactionMessage{ - Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}, {7, 7}}, } err = handler(stream, msg) require.NoError(t, err) require.Len(t, txnBatch, 5) msg = &TransactionMessage{ - Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}, {7, 7}, {8, 8}}, } // wait for transaction batch channel to process. time.Sleep(1300 * time.Millisecond) diff --git a/dot/network/transaction.go b/dot/network/transaction.go index b7a4f1adba..e8a41087d4 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -54,10 +54,12 @@ func (tm *TransactionMessage) Decode(in []byte) error { } // Hash returns the hash of the TransactionMessage -func (tm *TransactionMessage) Hash() common.Hash { - encMsg, _ := tm.Encode() - hash, _ := common.Blake2bHash(encMsg) - return hash +func (tm *TransactionMessage) Hash() (common.Hash, error) { + encMsg, err := tm.Encode() + if err != nil { + return common.Hash{}, fmt.Errorf("could not encode message: %w", err) + } + return common.Blake2bHash(encMsg) } // IsHandshake returns false @@ -93,8 +95,8 @@ func (*transactionHandshake) Type() byte { } // Hash ... -func (*transactionHandshake) Hash() common.Hash { - return common.Hash{} +func (*transactionHandshake) Hash() (common.Hash, error) { + return common.Hash{}, nil } // IsHandshake returns true @@ -129,6 +131,7 @@ func (s *Service) startTxnBatchProcessing(txnBatchCh chan *BatchMessage, slotDur case txnMsg := <-txnBatchCh: propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg) if err != nil { + logger.Warnf("could not handle transaction message: %s", err) s.host.closeProtocolStream(protocolID, txnMsg.peer) continue } @@ -137,7 +140,16 @@ func (s *Service) startTxnBatchProcessing(txnBatchCh chan *BatchMessage, slotDur continue } - if !s.gossip.hasSeen(txnMsg.msg) { + // TODO: Check if s.gossip.hasSeen should be moved before handleTransactionMessage. #2445 + // That we could avoid handling the transactions again, which we would have already seen. + + hasSeen, err := s.gossip.hasSeen(txnMsg.msg) + if err != nil { + s.host.closeProtocolStream(protocolID, txnMsg.peer) + logger.Debugf("could not check if message was seen before: %s", err) + continue + } + if !hasSeen { s.broadcastExcluding(s.notificationsProtocols[TransactionMsgType], txnMsg.peer, txnMsg.msg) } } diff --git a/dot/peerset/handler.go b/dot/peerset/handler.go index ab69592daa..5c90d2be1b 100644 --- a/dot/peerset/handler.go +++ b/dot/peerset/handler.go @@ -77,6 +77,10 @@ func (h *Handler) RemovePeer(setID int, peers ...peer.ID) { // ReportPeer reports ReputationChange according to the peer behaviour. func (h *Handler) ReportPeer(rep ReputationChange, peers ...peer.ID) { + for _, pid := range peers { + logger.Debugf("reporting reputation change of %d to peer %s, reason: %s", rep.Value, pid, rep.Reason) + } + h.actionQueue <- action{ actionCall: reportPeer, reputation: rep, diff --git a/lib/grandpa/network.go b/lib/grandpa/network.go index 49549243bf..5caaaf91c5 100644 --- a/lib/grandpa/network.go +++ b/lib/grandpa/network.go @@ -64,8 +64,8 @@ func (*GrandpaHandshake) Type() byte { } // Hash ... -func (*GrandpaHandshake) Hash() common.Hash { - return common.Hash{} +func (*GrandpaHandshake) Hash() (common.Hash, error) { + return common.Hash{}, nil } // IsHandshake returns true