From b4618f98ed88afb1db18f2f1eeb485b38a9ac3ef Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Mon, 29 Jul 2024 15:53:43 -0400 Subject: [PATCH] chore: add peer filtering by cluster for waku peer exchange (#2932) --- tests/waku_peer_exchange/test_protocol.nim | 35 ++++++++++++++++++++++ waku/factory/node_factory.nim | 2 +- waku/node/waku_node.nim | 6 ++-- waku/waku_peer_exchange/protocol.nim | 34 ++++++++++++++++----- 4 files changed, 66 insertions(+), 11 deletions(-) diff --git a/tests/waku_peer_exchange/test_protocol.nim b/tests/waku_peer_exchange/test_protocol.nim index 571a1bd504..e80386d5e5 100644 --- a/tests/waku_peer_exchange/test_protocol.nim +++ b/tests/waku_peer_exchange/test_protocol.nim @@ -23,6 +23,8 @@ import waku_relay, waku_core, waku_core/message/codec, + common/enr/builder, + waku_enr/sharding, ], ../testlib/[wakucore, wakunode, simple_mock, assertions], ./utils.nim @@ -237,6 +239,39 @@ suite "Waku Peer Exchange": response.isErr response.error == "peer_not_found_failure" + asyncTest "Pool filtering": + let + key1 = generateSecp256k1Key() + key2 = generateSecp256k1Key() + cluster: Option[uint16] = some(uint16(16)) + bindIp = parseIpAddress("0.0.0.0") + nodeTcpPort = Port(64010) + nodeUdpPort = Port(9000) + + var + builder1 = EnrBuilder.init(key1) + builder2 = EnrBuilder.init(key2) + + builder1.withIpAddressAndPorts(some(bindIp), some(nodeTcpPort), some(nodeUdpPort)) + builder2.withIpAddressAndPorts(some(bindIp), some(nodeTcpPort), some(nodeUdpPort)) + builder1.withShardedTopics(@["/waku/2/rs/1/7"]).expect("valid topic") + builder2.withShardedTopics(@["/waku/2/rs/16/32"]).expect("valid topic") + + let + enr1 = builder1.build().expect("valid ENR") + enr2 = builder2.build().expect("valid ENR") + + var + peerInfo1 = enr1.toRemotePeerInfo().expect("valid PeerInfo") + peerInfo2 = enr2.toRemotePeerInfo().expect("valid PeerInfo") + + peerInfo1.origin = PeerOrigin.Discv5 + peerInfo2.origin = PeerOrigin.Discv5 + + check: + not poolFilter(cluster, peerInfo1) + poolFilter(cluster, peerInfo2) + asyncTest "Request 0 peers, with 1 peer in PeerExchange": # Given two valid nodes with PeerExchange let diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 80e59cd987..c13c1dd124 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -341,7 +341,7 @@ proc setupProtocols( # waku peer exchange setup if conf.peerExchangeNode != "" or conf.peerExchange: try: - await mountPeerExchange(node) + await mountPeerExchange(node, some(conf.clusterId)) except CatchableError: return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg()) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index b847838347..bf1ae74e20 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1101,10 +1101,12 @@ proc mountRlnRelay*( ## Waku peer-exchange -proc mountPeerExchange*(node: WakuNode) {.async: (raises: []).} = +proc mountPeerExchange*( + node: WakuNode, cluster: Option[uint16] = none(uint16) +) {.async: (raises: []).} = info "mounting waku peer exchange" - node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager) + node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager, cluster) if node.started: try: diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index 9c0f026045..689c8fddab 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -1,5 +1,5 @@ import - std/[options, sequtils, random], + std/[options, sequtils, random, sugar], results, chronicles, chronos, @@ -50,6 +50,7 @@ type WakuPeerExchange* = ref object of LPProtocol peerManager*: PeerManager enrCache*: seq[enr.Record] + cluster*: Option[uint16] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ proc request*( @@ -128,12 +129,25 @@ proc getEnrsFromCache( # return numPeers or less if cache is smaller return shuffledCache[0 ..< min(shuffledCache.len.int, numPeers.int)] +proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool = + if peer.origin != Discv5: + trace "peer not from discv5", peer = $peer, origin = $peer.origin + return false + + if peer.enr.isNone(): + trace "peer has no ENR", peer = $peer + return false + + if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()): + trace "peer has mismatching cluster", peer = $peer + return false + + return true + proc populateEnrCache(wpx: WakuPeerExchange) = - # share only peers that i) are reachable ii) come from discv5 - let withEnr = wpx.peerManager.peerStore - .getReachablePeers() - .filterIt(it.origin == Discv5) - .filterIt(it.enr.isSome) + # share only peers that i) are reachable ii) come from discv5 iii) share cluster + let withEnr = + wpx.peerManager.peerStore.getReachablePeers().filterIt(poolFilter(wpx.cluster, it)) # either what we have or max cache size var newEnrCache = newSeq[enr.Record](0) @@ -181,8 +195,12 @@ proc initProtocolHandler(wpx: WakuPeerExchange) = wpx.handler = handler wpx.codec = WakuPeerExchangeCodec -proc new*(T: type WakuPeerExchange, peerManager: PeerManager): T = - let wpx = WakuPeerExchange(peerManager: peerManager) +proc new*( + T: type WakuPeerExchange, + peerManager: PeerManager, + cluster: Option[uint16] = none(uint16), +): T = + let wpx = WakuPeerExchange(peerManager: peerManager, cluster: cluster) wpx.initProtocolHandler() asyncSpawn wpx.updatePxEnrCache() return wpx