Skip to content

Commit

Permalink
chore: add peer filtering by cluster for waku peer exchange (#2932)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Jul 29, 2024
1 parent e4e01fa commit b4618f9
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 11 deletions.
35 changes: 35 additions & 0 deletions tests/waku_peer_exchange/test_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import
waku_relay,
waku_core,
waku_core/message/codec,
common/enr/builder,
waku_enr/sharding,
],
../testlib/[wakucore, wakunode, simple_mock, assertions],
./utils.nim
Expand Down Expand Up @@ -237,6 +239,39 @@ suite "Waku Peer Exchange":
response.isErr
response.error == "peer_not_found_failure"

asyncTest "Pool filtering":
let
key1 = generateSecp256k1Key()
key2 = generateSecp256k1Key()
cluster: Option[uint16] = some(uint16(16))
bindIp = parseIpAddress("0.0.0.0")
nodeTcpPort = Port(64010)
nodeUdpPort = Port(9000)

var
builder1 = EnrBuilder.init(key1)
builder2 = EnrBuilder.init(key2)

builder1.withIpAddressAndPorts(some(bindIp), some(nodeTcpPort), some(nodeUdpPort))
builder2.withIpAddressAndPorts(some(bindIp), some(nodeTcpPort), some(nodeUdpPort))
builder1.withShardedTopics(@["/waku/2/rs/1/7"]).expect("valid topic")
builder2.withShardedTopics(@["/waku/2/rs/16/32"]).expect("valid topic")

let
enr1 = builder1.build().expect("valid ENR")
enr2 = builder2.build().expect("valid ENR")

var
peerInfo1 = enr1.toRemotePeerInfo().expect("valid PeerInfo")
peerInfo2 = enr2.toRemotePeerInfo().expect("valid PeerInfo")

peerInfo1.origin = PeerOrigin.Discv5
peerInfo2.origin = PeerOrigin.Discv5

check:
not poolFilter(cluster, peerInfo1)
poolFilter(cluster, peerInfo2)

asyncTest "Request 0 peers, with 1 peer in PeerExchange":
# Given two valid nodes with PeerExchange
let
Expand Down
2 changes: 1 addition & 1 deletion waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ proc setupProtocols(
# waku peer exchange setup
if conf.peerExchangeNode != "" or conf.peerExchange:
try:
await mountPeerExchange(node)
await mountPeerExchange(node, some(conf.clusterId))
except CatchableError:
return
err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())
Expand Down
6 changes: 4 additions & 2 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1101,10 +1101,12 @@ proc mountRlnRelay*(

## Waku peer-exchange

proc mountPeerExchange*(node: WakuNode) {.async: (raises: []).} =
proc mountPeerExchange*(
node: WakuNode, cluster: Option[uint16] = none(uint16)
) {.async: (raises: []).} =
info "mounting waku peer exchange"

node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager)
node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager, cluster)

if node.started:
try:
Expand Down
34 changes: 26 additions & 8 deletions waku/waku_peer_exchange/protocol.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import
std/[options, sequtils, random],
std/[options, sequtils, random, sugar],
results,
chronicles,
chronos,
Expand Down Expand Up @@ -50,6 +50,7 @@ type
WakuPeerExchange* = ref object of LPProtocol
peerManager*: PeerManager
enrCache*: seq[enr.Record]
cluster*: Option[uint16]
# todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/

proc request*(
Expand Down Expand Up @@ -128,12 +129,25 @@ proc getEnrsFromCache(
# return numPeers or less if cache is smaller
return shuffledCache[0 ..< min(shuffledCache.len.int, numPeers.int)]

proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
if peer.origin != Discv5:
trace "peer not from discv5", peer = $peer, origin = $peer.origin
return false

if peer.enr.isNone():
trace "peer has no ENR", peer = $peer
return false

if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
trace "peer has mismatching cluster", peer = $peer
return false

return true

proc populateEnrCache(wpx: WakuPeerExchange) =
# share only peers that i) are reachable ii) come from discv5
let withEnr = wpx.peerManager.peerStore
.getReachablePeers()
.filterIt(it.origin == Discv5)
.filterIt(it.enr.isSome)
# share only peers that i) are reachable ii) come from discv5 iii) share cluster
let withEnr =
wpx.peerManager.peerStore.getReachablePeers().filterIt(poolFilter(wpx.cluster, it))

# either what we have or max cache size
var newEnrCache = newSeq[enr.Record](0)
Expand Down Expand Up @@ -181,8 +195,12 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
wpx.handler = handler
wpx.codec = WakuPeerExchangeCodec

proc new*(T: type WakuPeerExchange, peerManager: PeerManager): T =
let wpx = WakuPeerExchange(peerManager: peerManager)
proc new*(
T: type WakuPeerExchange,
peerManager: PeerManager,
cluster: Option[uint16] = none(uint16),
): T =
let wpx = WakuPeerExchange(peerManager: peerManager, cluster: cluster)
wpx.initProtocolHandler()
asyncSpawn wpx.updatePxEnrCache()
return wpx

0 comments on commit b4618f9

Please sign in to comment.