Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new metadata protocol #2062

Merged
merged 6 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/wakunode2/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ type
name: "log-format" .}: logging.LogFormat

## General node config
clusterId* {.
desc: "Cluster id that the node is running in. Node in a different cluster id is disconnected."
defaultValue: 0
name: "cluster-id" }: uint32

agentString* {.
defaultValue: "nwaku",
desc: "Node agent string which is used as identifier in network"
Expand Down
11 changes: 6 additions & 5 deletions apps/wakunode2/internal_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ proc validateExtMultiAddrs*(vals: seq[string]):
return ok(multiaddrs)

proc dnsResolve*(domain: string, conf: WakuNodeConf): Future[Result[string, string]] {.async} =

# Use conf's DNS servers
var nameServers: seq[TransportAddress]
for ip in conf.dnsAddrsNameServers:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53

let dnsResolver = DnsResolver.new(nameServers)

# Resolve domain IP
Expand Down Expand Up @@ -93,18 +93,19 @@ proc networkConfiguration*(conf: WakuNodeConf,
if dns4DomainName.isSome() and extIp.isNone():
try:
let dnsRes = waitFor dnsResolve(conf.dns4DomainName, conf)

if dnsRes.isErr():
return err($dnsRes.error) # Pass error down the stack

extIp = some(ValidIpAddress.init(dnsRes.get()))
except CatchableError:
return err("Could not update extIp to resolved DNS IP: " & getCurrentExceptionMsg())

# Wrap in none because NetConfig does not have a default constructor
# TODO: We could change bindIp in NetConfig to be something less restrictive
# than ValidIpAddress, which doesn't allow default construction
let netConfigRes = NetConfig.init(
clusterId = conf.clusterId,
bindIp = conf.listenAddress,
bindPort = Port(uint16(conf.tcpPort) + conf.portsShift),
extIp = extIp,
Expand Down
44 changes: 44 additions & 0 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import
../../waku/waku_filter,
../../waku/waku_lightpush,
../../waku/waku_peer_exchange,
../../waku/waku_metadata,
./testlib/common,
./testlib/testutils,
./testlib/wakucore,
Expand All @@ -38,6 +39,8 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.start()))

let connOk = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())
await sleepAsync(chronos.milliseconds(500))

check:
connOk == true
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].peerInfo.peerId)
Expand All @@ -53,6 +56,8 @@ procSuite "Peer Manager":

# Dial node2 from node1
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
await sleepAsync(chronos.milliseconds(500))

# Check connection
check:
conn.isSome()
Expand Down Expand Up @@ -145,6 +150,7 @@ procSuite "Peer Manager":
let nonExistentPeer = nonExistentPeerRes.value
require:
(await nodes[0].peerManager.connectRelay(nonExistentPeer)) == false
await sleepAsync(chronos.milliseconds(500))

check:
# Cannot connect to node2
Expand All @@ -153,6 +159,8 @@ procSuite "Peer Manager":
# Successful connection
require:
(await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())) == true
await sleepAsync(chronos.milliseconds(500))

check:
# Currently connected to node2
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connected
Expand Down Expand Up @@ -229,6 +237,8 @@ procSuite "Peer Manager":

require:
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
await sleepAsync(chronos.milliseconds(500))

check:
# Currently connected to node2
node1.peerManager.peerStore.peers().len == 1
Expand Down Expand Up @@ -257,6 +267,36 @@ procSuite "Peer Manager":

await allFutures([node1.stop(), node2.stop(), node3.stop()])

asyncTest "Peer manager drops conections to peers on different networks":
let clusterId1 = 1.uint32
let clusterId2 = 2.uint32

let
# different network
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId1)

# same network
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2)
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2)

# Start nodes
await allFutures([node1.start(), node2.start(), node3.start()])

# 1->2 (fails)
let conn1 = await node1.peerManager.dialPeer(node2.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec)

# 1->3 (fails)
let conn2 = await node1.peerManager.dialPeer(node3.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec)

# 2->3 (succeeds)
let conn3 = await node2.peerManager.dialPeer(node3.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec)

check:
conn1.isNone
conn2.isNone
conn3.isSome


# TODO: nwaku/issues/1377
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
Expand Down Expand Up @@ -370,6 +410,8 @@ procSuite "Peer Manager":
(await nodes[2].peerManager.connectRelay(peerInfos[0])) == true
(await nodes[3].peerManager.connectRelay(peerInfos[0])) == true

await sleepAsync(chronos.milliseconds(500))

check:
# Peerstore track all three peers
nodes[0].peerManager.peerStore.peers().len == 3
Expand Down Expand Up @@ -749,13 +791,15 @@ procSuite "Peer Manager":
# 2 in connections
discard await nodes[1].peerManager.connectRelay(pInfos[0])
discard await nodes[2].peerManager.connectRelay(pInfos[0])
await sleepAsync(chronos.milliseconds(500))

# but one is pruned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1

# 2 out connections
discard await nodes[0].peerManager.connectRelay(pInfos[3])
discard await nodes[0].peerManager.connectRelay(pInfos[4])
await sleepAsync(chronos.milliseconds(500))

# they are also prunned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1
Expand Down
50 changes: 50 additions & 0 deletions tests/test_waku_metadata.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{.used.}

import
std/[options, sequtils, tables],
testutils/unittests,
chronos,
chronicles,
stew/shims/net,
libp2p/switch,
libp2p/peerId,
libp2p/crypto/crypto,
libp2p/multistream,
libp2p/muxers/muxer,
eth/keys,
eth/p2p/discoveryv5/enr
import
../../waku/waku_node,
../../waku/node/peer_manager,
../../waku/waku_discv5,
../../waku/waku_metadata,
./testlib/wakucore,
./testlib/wakunode


procSuite "Waku Metadata Protocol":

# TODO: Add tests with shards when ready
asyncTest "request() returns the supported metadata of the peer":
let clusterId = 10.uint32
let
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId)

# Start nodes
await allFutures([node1.start(), node2.start()])

# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec)
require:
connOpt.isSome

# Request metadata
let response1 = await node2.wakuMetadata.request(connOpt.get())

# Check the response or dont even continue
require:
response1.isOk

check:
response1.get().clusterId.get() == clusterId
45 changes: 45 additions & 0 deletions tests/test_waku_protobufs.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{.used.}

import
std/[options, sequtils, tables],
testutils/unittests,
chronos,
chronicles
import
../../waku/waku_metadata,
../../waku/waku_metadata/rpc,
./testlib/wakucore,
./testlib/wakunode


procSuite "Waku Protobufs":
# TODO: Missing test coverage in many encode/decode protobuf functions

test "WakuMetadataResponse":
let res = WakuMetadataResponse(
clusterId: some(7),
shards: @[10, 23, 33],
)

let buffer = res.encode()

let decodedBuff = WakuMetadataResponse.decode(buffer.buffer)
check:
decodedBuff.isOk()
decodedBuff.get().clusterId.get() == res.clusterId.get()
decodedBuff.get().shards == res.shards

test "WakuMetadataRequest":
let req = WakuMetadataRequest(
clusterId: some(5),
shards: @[100, 2, 0],
)

let buffer = req.encode()

let decodedBuff = WakuMetadataRequest.decode(buffer.buffer)
check:
decodedBuff.isOk()
decodedBuff.get().clusterId.get() == req.clusterId.get()
decodedBuff.get().shards == req.shards

10 changes: 6 additions & 4 deletions tests/testlib/wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,21 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
agentString = none(string),
clusterId: uint32 = 0.uint32,
peerStoreCapacity = none(int)): WakuNode =

var resolvedExtIp = extIp

# Update extPort to default value if it's missing and there's an extIp or a DNS domain
# Update extPort to default value if it's missing and there's an extIp or a DNS domain
let extPort = if (extIp.isSome() or dns4DomainName.isSome()) and
extPort.isNone():
some(Port(60000))
else:
extPort

if dns4DomainName.isSome() and extIp.isNone():
let conf = defaultTestWakuNodeConf()
# If there's an error resolving the IP, an exception is thrown and test fails
# If there's an error resolving the IP, an exception is thrown and test fails
let dnsRes = waitFor dnsResolve(dns4DomainName.get(), conf)
if dnsRes.isErr():
raise newException(Defect, $dnsRes.error)
Expand All @@ -76,6 +77,7 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,

let netConfigRes = NetConfig.init(
bindIp = bindIp,
clusterId = clusterId,
bindPort = bindPort,
extIp = resolvedExtIp,
extPort = extPort,
Expand Down
3 changes: 3 additions & 0 deletions waku/node/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import

type NetConfig* = object
hostAddress*: MultiAddress
clusterId*: uint32
wsHostAddress*: Option[MultiAddress]
hostExtAddress*: Option[MultiAddress]
wsExtAddress*: Option[MultiAddress]
Expand Down Expand Up @@ -69,6 +70,7 @@ proc init*(T: type NetConfig,
wssEnabled: bool = false,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
clusterId: uint32 = 0,
wakuFlags = none(CapabilitiesBitfield)): NetConfigResult =
## Initialize and validate waku node network configuration

Expand Down Expand Up @@ -137,6 +139,7 @@ proc init*(T: type NetConfig,

ok(NetConfig(
hostAddress: hostAddress,
clusterId: clusterId,
wsHostAddress: wsHostAddress,
hostExtAddress: hostExtAddress,
wsExtAddress: wsExtAddress,
Expand Down
Loading
Loading