Skip to content

Commit

Permalink
feat: update FILTER and LIGHTPUSH for autoshard
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Aug 4, 2023
1 parent afb93e2 commit 93a9880
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 24 deletions.
6 changes: 5 additions & 1 deletion apps/wakunode2/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,13 @@ type
defaultValue: false
name: "keep-alive" }: bool

topics* {.
desc: "Default topic to subscribe to. Argument may be repeated. Deprecated! Please use pubsub-topic and/or content-topic instead."
defaultValue: @["/waku/2/default-waku/proto"]
name: "topic" .}: seq[string]

pubsubTopics* {.
desc: "Default pubsub topic to subscribe to. Argument may be repeated."
defaultValue: @["/waku/2/default-waku/proto"]
name: "pubsub-topic" .}: seq[string]

contentTopics* {.
Expand Down
31 changes: 29 additions & 2 deletions waku/v2/waku_core/topics/sharding.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import
./pubsub_topic

## For indices allocation and other magic numbers refer to RFC 51
const ClusterIndex* = 49152
const GenerationZeroShardsCount* = 5
const ClusterIndex* = 1
const GenerationZeroShardsCount* = 8

type ShardsPriority = seq[tuple[topic: NsPubsubTopic, value: float64]]

Expand Down Expand Up @@ -103,3 +103,30 @@ proc singleHighestWeigthShard*(topic: NsContentTopic): Result[NsPubsubTopic, str
let (pubsub, _) = list[list.len - 1]

ok(pubsub)

proc optional_sharding*(pubsubTopic: Option[PubsubTopic], contentTopic: ContentTopic): Result[(NsPubsubTopic, NsContentTopic), string] =
let parseRes = NsContentTopic.parse(contentTopic)

let content =
if parseRes.isErr():
return err("Cannot parse content topic: " & $parseRes.error)
else: parseRes.get()

if pubsubTopic.isSome():
let parseRes = NsPubsubTopic.parse(pubsubTopic.get())

let pubsub =
if parseRes.isErr():
return err("Cannot parse pubsub topic: " & $parseRes.error)
else: parseRes.get()

return ok((pubsub, content))

let shardsRes = singleHighestWeigthShard(content)

let pubsub =
if shardsRes.isErr():
return err("Cannot autoshard content topic: " & $shardsRes.error)
else: shardsRes.get()

ok((pubsub, content))
87 changes: 68 additions & 19 deletions waku/v2/waku_filter_v2/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ else:

import
std/options,
std/tables,
std/sequtils,
std/sugar,
chronicles,
chronos,
libp2p/protocols/protocol,
Expand Down Expand Up @@ -77,25 +80,71 @@ proc ping*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSub

return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)

proc subscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} =
let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.subscribe(
requestId = requestId,
pubsubTopic = pubsubTopic,
contentTopics = contentTopics
)

return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)

proc unsubscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} =
let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe(
requestId = requestId,
pubsubTopic = pubsubTopic,
contentTopics = contentTopics
)

return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
proc subscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic]): Result[seq[Future[FilterSubscribeResult]], FilterSubscribeError] =
var topicMap = initTable[PubsubTopic, seq[ContentTopic]]()
for contentTopic in contentTopics:
let res = optional_sharding(pubsubTopic, contentTopic)

let (pubsub, content) =
if res.isErr():
return err(FilterSubscribeError.badRequest($res.error))
else: res.get()

if not topicMap.hasKey(pubsub):
topicMap[pubsub] = @[]

try:
topicMap[pubsub].add(content)
except KeyError:
#That can't happen anyway
return err(FilterSubscribeError.badRequest($getCurrentExceptionMsg()))

var futures = collect(newSeq):
for pubsubTopic, contentTopics in topicMap.pairs:
let requestId = generateRequestId(wfc.rng)

let filterSubscribeRequest = FilterSubscribeRequest.subscribe(
requestId = requestId,
pubsubTopic = pubsubTopic,
contentTopics = contentTopics
)

wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)

ok(futures)

proc unsubscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic]): Result[seq[Future[FilterSubscribeResult]], FilterSubscribeError] =
var topicMap = initTable[PubsubTopic, seq[ContentTopic]]()
for contentTopic in contentTopics:
let res = optional_sharding(pubsubTopic, contentTopic)

let (pubsub, content) =
if res.isErr():
return err(FilterSubscribeError.badRequest($res.error))
else: res.get()

if not topicMap.hasKey(pubsub):
topicMap[pubsub] = @[]

try:
topicMap[pubsub].add(content)
except KeyError:
#That can't happen anyway
return err(FilterSubscribeError.badRequest($getCurrentExceptionMsg()))

var futures = collect(newSeq):
for pubsubTopic, contentTopics in topicMap.pairs:
let requestId = generateRequestId(wfc.rng)

let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe(
requestId = requestId,
pubsubTopic = pubsubTopic,
contentTopics = contentTopics
)

wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)

ok(futures)

proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSubscribeResult] {.async.} =
let requestId = generateRequestId(wfc.rng)
Expand Down
12 changes: 10 additions & 2 deletions waku/v2/waku_lightpush/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ proc sendPushRequest(wl: WakuLightPushClient, req: PushRequest, peer: PeerId|Rem

return ok()

proc publish*(wl: WakuLightPushClient, pubsubTopic: PubsubTopic, message: WakuMessage, peer: PeerId|RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
let pushRequest = PushRequest(pubsubTopic: pubsubTopic, message: message)
proc publish*(wl: WakuLightPushClient, pubsubTopic: Option[PubsubTopic], message: WakuMessage, peer: PeerId|RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
let res = optional_sharding(pubsubTopic, message.contentTopic)

let (pubsub, _) =
if res.isErr():
return Future(err($res.error))
else: res.get()

let pushRequest = PushRequest(pubsub, message)

return await wl.sendPushRequest(pushRequest, peer)

0 comments on commit 93a9880

Please sign in to comment.