Skip to content

Commit

Permalink
- updated light push for autosharding
Browse files Browse the repository at this point in the history
- updated FILTER protocol to autosharding
- added tests
  • Loading branch information
SionoiS committed Aug 1, 2023
1 parent afb93e2 commit a5d1cf4
Show file tree
Hide file tree
Showing 23 changed files with 380 additions and 100 deletions.
6 changes: 3 additions & 3 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ proc writeAndPrint(c: Chat) {.async.} =
if not c.node.wakuFilter.isNil():
echo "unsubscribing from content filters..."

await c.node.unsubscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=c.contentTopic)
await c.node.unsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic)

echo "quitting..."

Expand Down Expand Up @@ -539,7 +539,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
let peerInfo = parsePeerInfo(conf.lightpushnode)
if peerInfo.isOk():
await mountLightPush(node)
node.mountLightPushClient()
node.mountLightPushClient()
node.peerManager.addServicePeer(peerInfo.value, WakuLightpushCodec)
else:
error "LightPush not mounted. Couldn't parse conf.lightpushnode",
Expand All @@ -556,7 +556,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
trace "Hit filter handler", contentTopic=msg.contentTopic
chat.printReceivedMessage(msg)

await node.subscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=chat.contentTopic, filterHandler)
await node.subscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=chat.contentTopic, filterHandler)

else:
error "Filter not mounted. Couldn't parse conf.filternode",
Expand Down
5 changes: 3 additions & 2 deletions examples/v2/filter_subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
## subscribe to messages without relay

import
options,
chronicles,
chronos,
stew/byteutils,
Expand All @@ -22,7 +23,7 @@ proc unsubscribe(wfc: WakuFilterClient,
filterPubsubTopic: PubsubTopic,
filterContentTopic: ContentTopic) {.async.} =
notice "unsubscribing from filter"
let unsubscribeRes = await wfc.unsubscribe(filterPeer, filterPubsubTopic, @[filterContentTopic])
let unsubscribeRes = await wfc.unsubscribe(filterPeer, some(filterPubsubTopic), @[filterContentTopic])
if unsubscribeRes.isErr:
notice "unsubscribe request failed", err=unsubscribeRes.error
else:
Expand All @@ -47,7 +48,7 @@ proc maintainSubscription(wfc: WakuFilterClient,
# No subscription found. Let's subscribe.
notice "no subscription found. Sending subscribe request"

let subscribeRes = await wfc.subscribe(filterPeer, filterPubsubTopic, @[filterContentTopic])
let subscribeRes = await wfc.subscribe(filterPeer, some(filterPubsubTopic), @[filterContentTopic])

if subscribeRes.isErr():
notice "subscribe request failed. Quitting.", err=subscribeRes.error
Expand Down
3 changes: 2 additions & 1 deletion examples/v2/lightpush_publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
## use lightpush to publish messages without relay

import
std/options,
chronicles,
chronos,
stew/byteutils,
Expand All @@ -28,7 +29,7 @@ proc publishMessages(wlc: WakuLightpushClient,
ephemeral: true, # tell store nodes to not store it
timestamp: getNowInNanosecondTime()) # current timestamp

let wlpRes = await wlc.publish(lightpushPubsubTopic, message, lightpushPeer)
let wlpRes = await wlc.publish(some(lightpushPubsubTopic), message, lightpushPeer)

if wlpRes.isOk():
notice "published message using lightpush", message=message
Expand Down
10 changes: 5 additions & 5 deletions tests/v2/test_waku_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ suite "Waku Filter":
msg = fakeWakuMessage(contentTopic=contentTopic)

## When
require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk()
require (await client.subscribe(some(pubsubTopic), contentTopic, pushHandler, peer=serverAddr)).isOk()

# WARN: Sleep necessary to avoid a race condition between the subscription and the handle message proc
await sleepAsync(500.milliseconds)
Expand Down Expand Up @@ -106,7 +106,7 @@ suite "Waku Filter":
msg = fakeWakuMessage(contentTopic=contentTopic)

## When
require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk()
require (await client.subscribe(some(pubsubTopic), contentTopic, pushHandler, peer=serverAddr)).isOk()

# WARN: Sleep necessary to avoid a race condition between the subscription and the handle message proc
await sleepAsync(500.milliseconds)
Expand All @@ -118,7 +118,7 @@ suite "Waku Filter":
# Reset to test unsubscribe
pushHandlerFuture = newFuture[void]()

require (await client.unsubscribe(pubsubTopic, contentTopic, peer=serverAddr)).isOk()
require (await client.unsubscribe(some(pubsubTopic), contentTopic, peer=serverAddr)).isOk()

# WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc
await sleepAsync(500.milliseconds)
Expand Down Expand Up @@ -158,7 +158,7 @@ suite "Waku Filter":
msg = fakeWakuMessage(contentTopic=contentTopic)

## When
require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk()
require (await client.subscribe(some(pubsubTopic), contentTopic, pushHandler, peer=serverAddr)).isOk()

# WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc
await sleepAsync(500.milliseconds)
Expand Down Expand Up @@ -223,7 +223,7 @@ suite "Waku Filter":
msg = fakeWakuMessage(contentTopic=contentTopic)

## When
require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk()
require (await client.subscribe(some(pubsubTopic), contentTopic, pushHandler, peer=serverAddr)).isOk()

# WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc
await sleepAsync(500.milliseconds)
Expand Down
11 changes: 6 additions & 5 deletions tests/v2/test_waku_lightpush.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{.used.}

import
std/options,
testutils/unittests,
chronicles,
chronos,
Expand Down Expand Up @@ -42,8 +43,8 @@ suite "Waku Lightpush":

## Given
let handlerFuture = newFuture[(string, WakuMessage)]()
let handler = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
let handler = proc(peer: PeerId, pubsubTopic: Option[PubsubTopic], message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic.get(), message))
return ok()

let
Expand All @@ -57,7 +58,7 @@ suite "Waku Lightpush":
message = fakeWakuMessage()

## When
let requestRes = await client.publish(topic, message, peer=serverPeerId)
let requestRes = await client.publish(some(topic), message, peer=serverPeerId)

require await handlerFuture.withTimeout(100.millis)

Expand Down Expand Up @@ -86,7 +87,7 @@ suite "Waku Lightpush":
let error = "test_failure"

let handlerFuture = newFuture[void]()
let handler = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
let handler = proc(peer: PeerId, pubsubTopic: Option[PubsubTopic], message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete()
return err(error)

Expand All @@ -101,7 +102,7 @@ suite "Waku Lightpush":
message = fakeWakuMessage()

## When
let requestRes = await client.publish(topic, message, peer=serverPeerId)
let requestRes = await client.publish(some(topic), message, peer=serverPeerId)

require await handlerFuture.withTimeout(100.millis)

Expand Down
3 changes: 2 additions & 1 deletion tests/v2/test_wakunode_filter.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{.used.}

import
options,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
Expand Down Expand Up @@ -43,7 +44,7 @@ suite "WakuNode - Filter":
filterPushHandlerFut.complete((pubsubTopic, msg))

## When
await client.filterSubscribe(pubsubTopic, contentTopic, filterPushHandler, peer=serverPeerInfo)
await client.filterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo)

# Wait for subscription to take effect
waitFor sleepAsync(100.millis)
Expand Down
51 changes: 51 additions & 0 deletions tests/v2/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{.used.}

import
std/options,
stew/results,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
Expand Down Expand Up @@ -61,3 +63,52 @@ suite "WakuNode - Lightpush":

## Cleanup
await allFutures(lightNode.stop(), bridgeNode.stop(), destNode.stop())

asyncTest "Lightpush message return success with autosharding":
## Setup
let
lightNodeKey = generateSecp256k1Key()
lightNode = newTestWakuNode(lightNodeKey, ValidIpAddress.init("0.0.0.0"), Port(0))
bridgeNodeKey = generateSecp256k1Key()
bridgeNode = newTestWakuNode(bridgeNodeKey, ValidIpAddress.init("0.0.0.0"), Port(0))
destNodeKey = generateSecp256k1Key()
destNode = newTestWakuNode(destNodeKey, ValidIpAddress.init("0.0.0.0"), Port(0))
shard = singleHighestWeigthShard(NsContentTopic.parse(DefaultContentTopic).expect("Valid Topic")).expect("Valid shard")

await allFutures(destNode.start(), bridgeNode.start(), lightNode.start())

await destNode.mountRelay(@[$shard])
await bridgeNode.mountRelay(@[$shard])
await bridgeNode.mountLightPush()
lightNode.mountLightPushClient()

let bridgeRemotePeerInfo = bridgeNode.peerInfo.toRemotePeerInfo()

discard await lightNode.peerManager.dialPeer(bridgeRemotePeerInfo, WakuLightPushCodec)
await sleepAsync(100.milliseconds)
await destNode.connectToNodes(@[bridgeRemotePeerInfo])

## Given
let message = fakeWakuMessage()

var completionFutRelay = newFuture[bool]()
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == shard
msg == message
completionFutRelay.complete(true)
destNode.subscribe(shard, relayHandler)

# Wait for subscription to take effect
await sleepAsync(100.millis)

## When
let res = await lightNode.lightpushPublish(none(PubsubTopic), message, bridgeRemotePeerInfo)

## Then
check await completionFutRelay.withTimeout(5.seconds)

assert res.isOk(), $res.error

## Cleanup
await allFutures(lightNode.stop(), bridgeNode.stop(), destNode.stop())
14 changes: 7 additions & 7 deletions tests/v2/waku_filter_v2/test_waku_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ suite "Waku Filter - end to end":
response.error().kind == FilterSubscribeErrorKind.NOT_FOUND

# When
let response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), DefaultPubsubTopic, @[DefaultContentTopic])
let response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), some(DefaultPubsubTopic), @[DefaultContentTopic])

require response2.isOk()

Expand All @@ -84,7 +84,7 @@ suite "Waku Filter - end to end":
wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler)
clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic
pubsubTopic = some(DefaultPubsubTopic)
contentTopics = @[DefaultContentTopic]

# When
Expand Down Expand Up @@ -142,7 +142,7 @@ suite "Waku Filter - end to end":
wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler)
clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic
pubsubTopic = some(DefaultPubsubTopic)
contentTopic2 = ContentTopic("/waku/2/non-default-content/proto")
contentTopics = @[DefaultContentTopic, contentTopic2]

Expand Down Expand Up @@ -227,7 +227,7 @@ suite "Waku Filter - end to end":
wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler)
clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic
pubsubTopic = some(DefaultPubsubTopic)
contentTopic2 = ContentTopic("/waku/2/non-default-content/proto")
contentTopics = @[DefaultContentTopic, contentTopic2]

Expand Down Expand Up @@ -302,7 +302,7 @@ suite "Waku Filter - end to end":
wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler)
clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic
pubsubTopic = some(DefaultPubsubTopic)
pubsubTopic2 = PubsubTopic("/waku/2/non-default-pubsub/proto")
contentTopic2 = ContentTopic("/waku/2/non-default-content/proto")
contentTopics = @[DefaultContentTopic, contentTopic2]
Expand All @@ -313,7 +313,7 @@ suite "Waku Filter - end to end":
await allFutures(serverSwitch.start(), clientSwitch.start())
let
response1 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics)
response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic2, contentTopics)
response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), some(pubsubTopic2), contentTopics)

# Then
check:
Expand Down Expand Up @@ -352,7 +352,7 @@ suite "Waku Filter - end to end":
## Step 3: We can selectively unsubscribe from pubsub topics and content topic(s)

# When
let response3 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic2, @[contentTopic2])
let response3 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), some(pubsubTopic2), @[contentTopic2])
require response3.isOk()

let msg3 = fakeWakuMessage(contentTopic=contentTopic2)
Expand Down
Loading

0 comments on commit a5d1cf4

Please sign in to comment.