Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update various protocols to autoshard #1857

Merged
merged 3 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ proc publish(c: Chat, line: string) =

if not c.node.wakuLightPush.isNil():
# Attempt lightpush
asyncSpawn c.node.lightpushPublish(DefaultPubsubTopic, message)
asyncSpawn c.node.lightpushPublish(some(DefaultPubsubTopic), message)
else:
asyncSpawn c.node.publish(DefaultPubsubTopic, message)

Expand Down Expand Up @@ -266,7 +266,7 @@ proc writeAndPrint(c: Chat) {.async.} =
if not c.node.wakuFilter.isNil():
echo "unsubscribing from content filters..."

await c.node.unsubscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=c.contentTopic)
await c.node.unsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic)

echo "quitting..."

Expand Down Expand Up @@ -462,7 +462,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
trace "Hit filter handler", contentTopic=msg.contentTopic
chat.printReceivedMessage(msg)

await node.subscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=chat.contentTopic, filterHandler)
await node.subscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=chat.contentTopic, filterHandler)

else:
error "Filter not mounted. Couldn't parse conf.filternode",
Expand All @@ -477,7 +477,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
chat.printReceivedMessage(msg)

let topic = DefaultPubsubTopic
node.subscribe(topic, handler)
await node.subscribe(some(topic), @[ContentTopic("")], handler)

when defined(rln):
if conf.rlnRelay:
Expand Down
4 changes: 2 additions & 2 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
error "failed to parse content topic", error=res.error
quit(QuitFailure)

let shardsRes = contentTopicsRes.mapIt(singleHighestWeigthShard(it.get()))
let shardsRes = contentTopicsRes.mapIt(getShard(it.get()))

for res in shardsRes:
if res.isErr():
Expand Down Expand Up @@ -363,7 +363,7 @@ proc setupProtocols(node: WakuNode,
# TODO autoshard content topics only once.
# Already checked for errors in app.init
let contentTopics = conf.contentTopics.mapIt(NsContentTopic.parse(it).expect("Parsing"))
let shards = contentTopics.mapIt($(singleHighestWeigthShard(it).expect("Sharding")))
let shards = contentTopics.mapIt($(getShard(it).expect("Sharding")))

let pubsubTopics = conf.topics & conf.pubsubTopics & shards
try:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_waku_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@ suite "Waku Lightpush":
requestError == error

## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
await allFutures(clientSwitch.stop(), serverSwitch.stop())
3 changes: 2 additions & 1 deletion tests/test_wakunode_filter.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{.used.}

import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
Expand Down Expand Up @@ -43,7 +44,7 @@ suite "WakuNode - Filter":
filterPushHandlerFut.complete((pubsubTopic, msg))

## When
await client.filterSubscribe(pubsubTopic, contentTopic, filterPushHandler, peer=serverPeerInfo)
await client.filterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo)

# Wait for subscription to take effect
waitFor sleepAsync(100.millis)
Expand Down
3 changes: 2 additions & 1 deletion tests/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{.used.}

import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
Expand Down Expand Up @@ -54,7 +55,7 @@ suite "WakuNode - Lightpush":
await sleepAsync(100.millis)

## When
await lightNode.lightpushPublish(DefaultPubsubTopic, message)
await lightNode.lightpushPublish(some(DefaultPubsubTopic), message)

## Then
check await completionFutRelay.withTimeout(5.seconds)
Expand Down
24 changes: 3 additions & 21 deletions tests/waku_core/test_namespaced_topics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ suite "Waku Message - Content topics namespacing":
## Given
var ns = NsContentTopic()
ns.generation = none(int)
ns.bias = Unbiased
ns.application = "toychat"
ns.version = "2"
ns.name = "huilong"
Expand All @@ -39,15 +38,14 @@ suite "Waku Message - Content topics namespacing":
let ns = nsRes.get()
check:
ns.generation == none(int)
ns.bias == Unbiased
ns.application == "toychat"
ns.version == "2"
ns.name == "huilong"
ns.encoding == "proto"

test "Parse content topic string - Valid string with sharding":
## Given
let topic = "/0/lower20/toychat/2/huilong/proto"
let topic = "/0/toychat/2/huilong/proto"

## When
let nsRes = NsContentTopic.parse(topic)
Expand All @@ -58,7 +56,6 @@ suite "Waku Message - Content topics namespacing":
let ns = nsRes.get()
check:
ns.generation == some(0)
ns.bias == Lower20
ns.application == "toychat"
ns.version == "2"
ns.name == "huilong"
Expand Down Expand Up @@ -122,11 +119,11 @@ suite "Waku Message - Content topics namespacing":
let err = ns.tryError()
check:
err.kind == ParsingErrorKind.InvalidFormat
err.cause == "invalid topic structure"
err.cause == "generation should be a numeric value"

test "Parse content topic string - Invalid string: non numeric generation":
## Given
let topic = "/first/unbiased/toychat/2/huilong/proto"
let topic = "/first/toychat/2/huilong/proto"

## When
let ns = NsContentTopic.parse(topic)
Expand All @@ -139,21 +136,6 @@ suite "Waku Message - Content topics namespacing":
err.kind == ParsingErrorKind.InvalidFormat
err.cause == "generation should be a numeric value"

test "Parse content topic string - Invalid string: invalid bias":
## Given
let topic = "/0/no/toychat/2/huilong/proto"

## When
let ns = NsContentTopic.parse(topic)

## Then
assert ns.isErr(), $ns.get()

let err = ns.tryError()
check:
err.kind == ParsingErrorKind.InvalidFormat
err.cause == "bias should be one of; unbiased, lower20 or higher80"

suite "Waku Message - Pub-sub topics namespacing":

test "Stringify named sharding pub-sub topic":
Expand Down
75 changes: 21 additions & 54 deletions tests/waku_core/test_sharding.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import
std/options,
std/strutils,
std/sugar,
std/algorithm,
std/random,
stew/results,
testutils/unittests
Expand Down Expand Up @@ -34,88 +33,60 @@ suite "Waku Sharding":

let enc = "cbor"

NsContentTopic.init(none(int), Unbiased, app, version, name, enc)
NsContentTopic.init(none(int), app, version, name, enc)

test "Implicit content topic generation":
## Given
let topic = "/toychat/2/huilong/proto"

## When
let ns = NsContentTopic.parse(topic).expect("Parsing")

let paramRes = shardCount(ns)
let parseRes = NsContentTopic.parse(topic)

## Then
assert paramRes.isOk(), paramRes.error
assert parseRes.isOk(), $parseRes.error

let count = paramRes.get()
let nsTopic = parseRes.get()
check:
count == GenerationZeroShardsCount
ns.bias == Unbiased
nsTopic.generation == none(int)

test "Valid content topic":
## Given
let topic = "/0/lower20/toychat/2/huilong/proto"
let topic = "/0/toychat/2/huilong/proto"

## When
let ns = NsContentTopic.parse(topic).expect("Parsing")

let paramRes = shardCount(ns)
let parseRes = NsContentTopic.parse(topic)

## Then
assert paramRes.isOk(), paramRes.error
assert parseRes.isOk(), $parseRes.error

let count = paramRes.get()
let nsTopic = parseRes.get()
check:
count == GenerationZeroShardsCount
ns.bias == Lower20
nsTopic.generation.get() == 0

test "Invalid content topic generation":
## Given
let topic = "/1/unbiased/toychat/2/huilong/proto"
let topic = "/1/toychat/2/huilong/proto"

## When
let ns = NsContentTopic.parse(topic).expect("Parsing")

let paramRes = shardCount(ns)
let shardRes = getShard(ns)

## Then
assert paramRes.isErr(), $paramRes.get()
assert shardRes.isErr(), $shardRes.get()

let err = paramRes.error
let err = shardRes.error
check:
err == "Generation > 0 are not supported yet"

test "Weigths bias":
## Given
let count = 5

## When
let anonWeigths = biasedWeights(count, ShardingBias.Lower20)
let speedWeigths = biasedWeights(count, ShardingBias.Higher80)

## Then
check:
anonWeigths[0] == 2.0
anonWeigths[1] == 1.0
anonWeigths[2] == 1.0
anonWeigths[3] == 1.0
anonWeigths[4] == 1.0

speedWeigths[0] == 1.0
speedWeigths[1] == 2.0
speedWeigths[2] == 2.0
speedWeigths[3] == 2.0
speedWeigths[4] == 2.0

test "Sorted shard list":
#[ test "Sorted shard list":
## Given
let topic = "/0/unbiased/toychat/2/huilong/proto"
let topic = "/0/toychat/2/huilong/proto"

## When
let contentTopic = NsContentTopic.parse(topic).expect("Parsing")
let count = shardCount(contentTopic).expect("Valid parameters")
let weights = biasedWeights(count, contentTopic.bias)
let weights = repeat(1.0, count)

let shardsRes = weightedShardList(contentTopic, count, weights)

Expand All @@ -125,7 +96,7 @@ suite "Waku Sharding":
let shards = shardsRes.get()
check:
shards.len == count
isSorted(shards, hashOrder)
isSorted(shards, hashOrder) ]#

test "Shard Choice Reproducibility":
## Given
Expand All @@ -134,15 +105,11 @@ suite "Waku Sharding":
## When
let contentTopic = NsContentTopic.parse(topic).expect("Parsing")

let res = singleHighestWeigthShard(contentTopic)
let pubsub = getGenZeroShard(contentTopic, GenerationZeroShardsCount)

## Then
assert res.isOk(), res.error

let pubsubTopic = res.get()

check:
pubsubTopic == NsPubsubTopic.staticSharding(ClusterIndex, 3)
pubsub == NsPubsubTopic.staticSharding(ClusterIndex, 3)

test "Shard Choice Simulation":
## Given
Expand All @@ -154,7 +121,7 @@ suite "Waku Sharding":

## When
for topic in topics:
let pubsub = singleHighestWeigthShard(topic).expect("Valid Topic")
let pubsub = getShard(topic).expect("Valid Topic")
counts[pubsub.shard] += 1

## Then
Expand Down
1 change: 0 additions & 1 deletion tests/waku_filter_v2/test_waku_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import
../../../waku/node/peer_manager,
../../../waku/waku_filter_v2,
../../../waku/waku_filter_v2/client,
../../../waku/waku_filter_v2/rpc,
../../../waku/waku_core,
../testlib/common,
../testlib/wakucore
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_store/test_waku_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@ suite "Waku Store - query handler":
error.kind == HistoryErrorKind.BAD_REQUEST

## Cleanup
await allFutures(serverSwitch.stop(), clientSwitch.stop())
await allFutures(serverSwitch.stop(), clientSwitch.stop())
2 changes: 1 addition & 1 deletion tests/waku_store/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ procSuite "WakuNode - Store":
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
filterFut.complete((pubsubTopic, msg))

waitFor server.filterSubscribe(DefaultPubsubTopic, DefaultContentTopic, filterHandler, peer=filterSourcePeer)
waitFor server.filterSubscribe(some(DefaultPubsubTopic), DefaultContentTopic, filterHandler, peer=filterSourcePeer)

waitFor sleepAsync(100.millis)

Expand Down
6 changes: 3 additions & 3 deletions tests/wakunode_jsonrpc/test_jsonrpc_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ procSuite "Waku v2 JSON-RPC API - Filter":

let contentFilters = @[
ContentFilter(contentTopic: DefaultContentTopic),
ContentFilter(contentTopic: ContentTopic("2")),
ContentFilter(contentTopic: ContentTopic("3")),
ContentFilter(contentTopic: ContentTopic("4")),
ContentFilter(contentTopic: ContentTopic("/waku/2/default-content2/proto")),
ContentFilter(contentTopic: ContentTopic("/waku/2/default-content3/proto")),
ContentFilter(contentTopic: ContentTopic("/waku/2/default-content4/proto")),
]
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic))
check:
Expand Down
6 changes: 3 additions & 3 deletions tests/wakunode_rest/test_rest_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ suite "Waku v2 Rest API - Filter":
]

let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
pubsubTopic: DefaultPubsubTopic)
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody)

# Then
Expand All @@ -106,7 +106,7 @@ suite "Waku v2 Rest API - Filter":
restFilterTest.messageCache.isSubscribed("4")

# When - error case
let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: "")
let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: none(string))
let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody)

check:
Expand Down Expand Up @@ -137,7 +137,7 @@ suite "Waku v2 Rest API - Filter":

# When
let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
pubsubTopic: DefaultPubsubTopic)
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody)

# Then
Expand Down
Loading