Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update pubsub interfaces #1194

Merged
merged 2 commits into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"license": "MIT",
"dependencies": {
"@libp2p/pubsub-peer-discovery": "^5.0.1",
"@libp2p/floodsub": "^1.0.5",
"@libp2p/floodsub": "^1.0.6",
"execa": "^2.1.0",
"fs-extra": "^8.1.0",
"libp2p": "../",
Expand Down
4 changes: 3 additions & 1 deletion examples/pubsub/1.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const createNode = async () => {

// node2 publishes "news" every second
setInterval(() => {
node2.pubsub.publish(topic, uint8ArrayFromString('Bird bird bird, bird is the word!'))
node2.pubsub.publish(topic, uint8ArrayFromString('Bird bird bird, bird is the word!')).catch(err => {
console.error(err)
})
}, 1000)
})()
4 changes: 3 additions & 1 deletion examples/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ await node2.pubsub.subscribe(topic)

// node2 publishes "news" every second
setInterval(() => {
node2.pubsub.publish(topic, fromString('Bird bird bird, bird is the word!'))
node2.pubsub.publish(topic, fromString('Bird bird bird, bird is the word!')).catch(err => {
console.error(err)
})
}, 1000)
```

Expand Down
4 changes: 3 additions & 1 deletion examples/pubsub/message-filtering/1.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ const createNode = async () => {
// car is not a fruit !
setInterval(() => {
console.log('############## fruit ' + myFruits[count] + ' ##############')
node1.pubsub.publish(topic, uint8ArrayFromString(myFruits[count]))
node1.pubsub.publish(topic, uint8ArrayFromString(myFruits[count])).catch(err => {
console.info(err)
})
count++
if (count == myFruits.length) {
count = 0
Expand Down
4 changes: 3 additions & 1 deletion examples/pubsub/message-filtering/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ const myFruits = ['banana', 'apple', 'car', 'orange'];

setInterval(() => {
console.log('############## fruit ' + myFruits[count] + ' ##############')
node1.pubsub.publish(topic, new TextEncoder().encode(myFruits[count]))
node1.pubsub.publish(topic, new TextEncoder().encode(myFruits[count])).catch(err => {
console.error(err)
})
count++
if (count == myFruits.length) {
count = 0
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
"@achingbrain/nat-port-mapper": "^1.0.0",
"@libp2p/connection": "^1.1.5",
"@libp2p/crypto": "^0.22.11",
"@libp2p/interfaces": "^1.3.22",
"@libp2p/interfaces": "^1.3.24",
"@libp2p/logger": "^1.1.4",
"@libp2p/multistream-select": "^1.0.4",
"@libp2p/peer-id": "^1.1.10",
Expand Down Expand Up @@ -162,8 +162,8 @@
"@libp2p/daemon-server": "^1.0.2",
"@libp2p/delegated-content-routing": "^1.0.2",
"@libp2p/delegated-peer-routing": "^1.0.2",
"@libp2p/floodsub": "^1.0.5",
"@libp2p/interface-compliance-tests": "^1.1.23",
"@libp2p/floodsub": "^1.0.6",
"@libp2p/interface-compliance-tests": "^1.1.25",
"@libp2p/interop": "^1.0.3",
"@libp2p/kad-dht": "^1.0.7",
"@libp2p/mdns": "^1.0.4",
Expand Down
4 changes: 2 additions & 2 deletions src/pubsub/dummy-pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EventEmitter } from '@libp2p/interfaces'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { PubSub, PubSubEvents, StrictNoSign, StrictSign } from '@libp2p/interfaces/pubsub'
import type { PublishResult, PubSub, PubSubEvents, StrictNoSign, StrictSign } from '@libp2p/interfaces/pubsub'
import errCode from 'err-code'
import { messages, codes } from '../errors.js'

Expand Down Expand Up @@ -45,7 +45,7 @@ export class DummyPubSub extends EventEmitter<PubSubEvents> implements PubSub {
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
}

publish (): void {
async publish (): Promise<PublishResult> {
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
}
}
4 changes: 2 additions & 2 deletions test/configuration/pubsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ describe('Pubsub subscription handlers adapter', () => {

pubsub.subscribe(topic)
pubsub.addEventListener('message', handler)
pubsub.publish(topic, uint8ArrayFromString('useless-data'))
await pubsub.publish(topic, uint8ArrayFromString('useless-data'))
await defer.promise

pubsub.unsubscribe(topic)
pubsub.removeEventListener('message', handler)
pubsub.publish(topic, uint8ArrayFromString('useless-data'))
await pubsub.publish(topic, uint8ArrayFromString('useless-data'))

// wait to guarantee that the handler is not called twice
await delay(100)
Expand Down
10 changes: 7 additions & 3 deletions test/configuration/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { WebSockets } from '@libp2p/websockets'
import * as filters from '@libp2p/websockets/filters'
import { MULTIADDRS_WEBSOCKETS } from '../fixtures/browser.js'
import mergeOptions from 'merge-options'
import type { Message, PubSubInit, PubSubRPC, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import type { Message, PublishResult, PubSubInit, PubSubRPC, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import type { Libp2pInit, Libp2pOptions } from '../../src/index.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import * as cborg from 'cborg'
Expand Down Expand Up @@ -44,11 +44,12 @@ class MockPubSub extends PubSubBaseProtocol {
return cborg.encode(rpc)
}

async publishMessage (from: PeerId, message: Message): Promise<void> {
async publishMessage (from: PeerId, message: Message): Promise<PublishResult> {
const peers = this.getSubscribers(message.topic)
const recipients: PeerId[] = []

if (peers == null || peers.length === 0) {
return
return { recipients }
}

peers.forEach(id => {
Expand All @@ -60,8 +61,11 @@ class MockPubSub extends PubSubBaseProtocol {
return
}

recipients.push(id)
this.send(id, { messages: [message] })
})

return { recipients }
}
}

Expand Down
40 changes: 5 additions & 35 deletions test/identify/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,7 @@ describe('Identify', () => {
await localIdentify.start()
await remoteIdentify.start()

const [localToRemote] = connectionPair({
peerId: localComponents.getPeerId(),
registrar: localComponents.getRegistrar()
}, {
peerId: remoteComponents.getPeerId(),
registrar: remoteComponents.getRegistrar()
})
const [localToRemote] = connectionPair(localComponents, remoteComponents)

const localAddressBookConsumePeerRecordSpy = sinon.spy(localComponents.getPeerStore().addressBook, 'consumePeerRecord')
const localProtoBookSetSpy = sinon.spy(localComponents.getPeerStore().protoBook, 'set')
Expand Down Expand Up @@ -161,13 +155,7 @@ describe('Identify', () => {
})
await remoteIdentify.start()

const [localToRemote] = connectionPair({
peerId: localComponents.getPeerId(),
registrar: localComponents.getRegistrar()
}, {
peerId: remoteComponents.getPeerId(),
registrar: remoteComponents.getRegistrar()
})
const [localToRemote] = connectionPair(localComponents, remoteComponents)

sinon.stub(localComponents.getPeerStore().addressBook, 'consumePeerRecord').throws()

Expand All @@ -194,13 +182,7 @@ describe('Identify', () => {
await localIdentify.start()
await remoteIdentify.start()

const [localToRemote] = connectionPair({
peerId: localComponents.getPeerId(),
registrar: localComponents.getRegistrar()
}, {
peerId: remoteComponents.getPeerId(),
registrar: remoteComponents.getRegistrar()
})
const [localToRemote] = connectionPair(localComponents, remoteComponents)

// send an invalid message
await remoteComponents.getRegistrar().unhandle(MULTICODEC_IDENTIFY)
Expand Down Expand Up @@ -267,13 +249,7 @@ describe('Identify', () => {
await localIdentify.start()
await remoteIdentify.start()

const [localToRemote, remoteToLocal] = connectionPair({
peerId: localComponents.getPeerId(),
registrar: localComponents.getRegistrar()
}, {
peerId: remoteComponents.getPeerId(),
registrar: remoteComponents.getRegistrar()
})
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)

// ensure connections are registered by connection manager
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
Expand Down Expand Up @@ -353,13 +329,7 @@ describe('Identify', () => {
await localIdentify.start()
await remoteIdentify.start()

const [localToRemote, remoteToLocal] = connectionPair({
peerId: localComponents.getPeerId(),
registrar: localComponents.getRegistrar()
}, {
peerId: remoteComponents.getPeerId(),
registrar: remoteComponents.getRegistrar()
})
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)

// ensure connections are registered by connection manager
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
Expand Down