Skip to content

Commit

Permalink
refactor: pubsub subsystem
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos authored and jacobheun committed Nov 4, 2019
1 parent fba0583 commit 4be379e
Show file tree
Hide file tree
Showing 12 changed files with 904 additions and 147 deletions.
14 changes: 5 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,22 +211,18 @@ class Node extends Libp2p {

**IMPORTANT NOTE**: All the methods listed in the API section that take a callback are also now Promisified. Libp2p is migrating away from callbacks to async/await, and in a future release (that will be announced in advance), callback support will be removed entirely. You can follow progress of the async/await endeavor at https://github.com/ipfs/js-ipfs/issues/1670.

#### Create a Node - `Libp2p.createLibp2p(options, callback)`
#### Create a Node - `Libp2p.create(options)`

> Behaves exactly like `new Libp2p(options)`, but doesn't require a PeerInfo. One will be generated instead
```js
const { createLibp2p } = require('libp2p')
createLibp2p(options, (err, libp2p) => {
if (err) throw err
libp2p.start((err) => {
if (err) throw err
})
})
const { create } = require('libp2p')
const libp2p = await create(options)

await libp2p.start()
```

- `options`: Object of libp2p configuration options
- `callback`: Function with signature `function (Error, Libp2p) {}`

#### Create a Node alternative - `new Libp2p(options)`

Expand Down
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@
"multiaddr": "^7.1.0",
"multistream-select": "^0.15.0",
"once": "^1.4.0",
"p-map": "^3.0.0",
"p-queue": "^6.1.1",
"p-settle": "^3.1.0",
"peer-book": "^0.9.1",
"peer-id": "^0.13.3",
"peer-info": "^0.17.0",
"promisify-es6": "^1.0.3",
Expand Down Expand Up @@ -92,8 +92,8 @@
"libp2p-bootstrap": "^0.9.7",
"libp2p-delegated-content-routing": "^0.2.2",
"libp2p-delegated-peer-routing": "^0.2.2",
"libp2p-floodsub": "~0.17.0",
"libp2p-gossipsub": "~0.0.4",
"libp2p-floodsub": "libp2p/js-libp2p-floodsub#refactor/async",
"libp2p-gossipsub": "ChainSafe/gossipsub-js#refactor/async",
"libp2p-kad-dht": "^0.15.3",
"libp2p-mdns": "^0.12.3",
"libp2p-mplex": "^0.9.1",
Expand All @@ -105,6 +105,7 @@
"lodash.times": "^4.3.2",
"nock": "^10.0.6",
"p-defer": "^3.0.0",
"p-wait-for": "^3.1.0",
"portfinder": "^1.0.20",
"pull-goodbye": "0.0.2",
"pull-length-prefixed": "^1.3.3",
Expand Down
102 changes: 102 additions & 0 deletions src/connection-manager/topology.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
'use strict'

class Topology {
/**
* @param {Object} props
* @param {number} props.min minimum needed connections (default: 0)
* @param {number} props.max maximum needed connections (default: Infinity)
* @param {function} props.onConnect protocol "onConnect" handler
* @param {function} props.onDisconnect protocol "onDisconnect" handler
* @param {Array<string>} props.multicodecs protocol multicodecs
* @param {Registrar} registrar
* @constructor
*/
constructor ({
min = 0,
max = Infinity,
onConnect,
onDisconnect,
multicodecs,
registrar
}) {
this.multicodecs = multicodecs
this.registrar = registrar
this.min = min
this.max = max
this.peers = new Map()

// Handlers
this._onConnect = onConnect
this._onDisconnect = onDisconnect

this._onProtocolChange = this._onProtocolChange.bind(this)

// Set by the registrar
this._peerStore = null
}

/**
* Set peerstore to the topology.
* @param {PeerStore} peerStore
*/
set peerStore (peerStore) {
this._peerStore = peerStore

this._peerStore.on('change:protocols', this._onProtocolChange)
}

/**
* Try to add a connected peer to the topology, if inside the thresholds.
* @param {PeerInfo} peerInfo
* @param {Connection} connection
* @returns {void}
*/
tryToConnect (peerInfo, connection) {
// TODO: conn manager should validate if it should try to connect

this._onConnect(peerInfo, connection)

this.peers.set(peerInfo.id.toB58String(), peerInfo)
}

/**
* Notify protocol of peer disconnected.
* @param {PeerInfo} peerInfo
* @param {Error} [error]
* @returns {void}
*/
disconnect (peerInfo, error) {
if (this.peers.delete(peerInfo.id.toB58String())) {
this._onDisconnect(peerInfo, error)
}
}

/**
* Check if a new peer support the multicodecs for this topology.
* @param {Object} props
* @param {PeerInfo} props.peerInfo
* @param {Array<string>} props.protocols
*/
_onProtocolChange ({ peerInfo, protocols }) {
const existingPeer = this.peers.get(peerInfo.id.toB58String())

protocols.filter(protocol => this.multicodecs.includes(protocol))

// Not supporting the protocol anymore?
if (existingPeer && protocols.filter(protocol => this.multicodecs.includes(protocol)).length === 0) {
this._onDisconnect({
peerInfo
})
}

// New to protocol support
for (const protocol of protocols) {
if (this.multicodecs.includes(protocol)) {
this.tryToConnect(peerInfo, this.registrar.getPeerConnection(peerInfo))
return
}
}
}
}

module.exports = Topology
76 changes: 46 additions & 30 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
'use strict'

const FSM = require('fsm-event')
const EventEmitter = require('events').EventEmitter
const { EventEmitter } = require('events')
const debug = require('debug')
const log = debug('libp2p')
log.error = debug('libp2p:error')
const errCode = require('err-code')
const promisify = require('promisify-es6')

const each = require('async/each')
const nextTick = require('async/nextTick')

const PeerBook = require('peer-book')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const Switch = require('./switch')
Expand All @@ -29,6 +27,8 @@ const { codes } = require('./errors')
const Dialer = require('./dialer')
const TransportManager = require('./transport-manager')
const Upgrader = require('./upgrader')
const Registrar = require('./registrar')
const PeerStore = require('./peer-store')

const notStarted = (action, state) => {
return errCode(
Expand All @@ -54,25 +54,31 @@ class Libp2p extends EventEmitter {

this.datastore = this._options.datastore
this.peerInfo = this._options.peerInfo
this.peerBook = this._options.peerBook || new PeerBook()

this._modules = this._options.modules
this._config = this._options.config
this._transport = [] // Transport instances/references
this._discovery = [] // Discovery service instances/references

this.peerStore = new PeerStore()

// create the switch, and listen for errors
this._switch = new Switch(this.peerInfo, this.peerBook, this._options.switch)
this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch)

// Setup the Upgrader
this.upgrader = new Upgrader({
localPeer: this.peerInfo.id,
onConnection: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)

this.peerStore.put(peerInfo)
this.registrar.onConnect(peerInfo, connection)
this.emit('peer:connect', peerInfo)
},
onConnectionEnd: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)

this.registrar.onDisconnect(peerInfo)
this.emit('peer:disconnect', peerInfo)
}
})
Expand Down Expand Up @@ -106,6 +112,10 @@ class Libp2p extends EventEmitter {
transportManager: this.transportManager
})

this.registrar = new Registrar(this.peerStore)
this.handle = this.handle.bind(this)
this.registrar.handle = this.handle

// Attach private network protector
if (this._modules.connProtector) {
this.upgrader.protector = this._modules.connProtector
Expand All @@ -124,7 +134,7 @@ class Libp2p extends EventEmitter {
}

// start pubsub
if (this._modules.pubsub && this._config.pubsub.enabled !== false) {
if (this._modules.pubsub) {
this.pubsub = pubsub(this, this._modules.pubsub, this._config.pubsub)
}

Expand Down Expand Up @@ -179,7 +189,7 @@ class Libp2p extends EventEmitter {

// Once we start, emit and dial any peers we may have already discovered
this.state.on('STARTED', () => {
this.peerBook.getAllArray().forEach((peerInfo) => {
this.peerStore.getAllArray().forEach((peerInfo) => {
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
})
Expand Down Expand Up @@ -228,6 +238,7 @@ class Libp2p extends EventEmitter {
this.state('stop')

try {
this.pubsub && await this.pubsub.stop()
await this.transportManager.close()
await this._switch.stop()
} catch (err) {
Expand All @@ -245,7 +256,7 @@ class Libp2p extends EventEmitter {

/**
* Dials to the provided peer. If successful, the `PeerInfo` of the
* peer will be added to the nodes `PeerBook`
* peer will be added to the nodes `PeerStore`
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
* @param {object} options
Expand All @@ -258,7 +269,7 @@ class Libp2p extends EventEmitter {

/**
* Dials to the provided peer and handshakes with the given protocol.
* If successful, the `PeerInfo` of the peer will be added to the nodes `PeerBook`,
* If successful, the `PeerInfo` of the peer will be added to the nodes `PeerStore`,
* and the `Connection` will be sent in the callback
*
* @async
Expand All @@ -279,7 +290,13 @@ class Libp2p extends EventEmitter {

// If a protocol was provided, create a new stream
if (protocols) {
return connection.newStream(protocols)
const stream = await connection.newStream(protocols)
const peerInfo = getPeerInfo(connection.remotePeer)

peerInfo.protocols.add(stream.protocol)
this.peerStore.put(peerInfo)

return stream
}

return connection
Expand Down Expand Up @@ -350,10 +367,16 @@ class Libp2p extends EventEmitter {
const multiaddrs = this.peerInfo.multiaddrs.toArray()

// Start parallel tasks
const tasks = [
this.transportManager.listen(multiaddrs)
]

if (this._config.pubsub.enabled) {
this.pubsub && this.pubsub.start()
}

try {
await Promise.all([
this.transportManager.listen(multiaddrs)
])
await Promise.all(tasks)
} catch (err) {
log.error(err)
this.emit('error', err)
Expand All @@ -369,12 +392,6 @@ class Libp2p extends EventEmitter {
* the `peer:discovery` event. If auto dial is enabled for libp2p
* and the current connection count is under the low watermark, the
* peer will be dialed.
*
* TODO: If `peerBook.put` becomes centralized, https://github.com/libp2p/js-libp2p/issues/345,
* it would be ideal if only new peers were emitted. Currently, with
* other modules adding peers to the `PeerBook` we have no way of knowing
* if a peer is new or not, so it has to be emitted.
*
* @private
* @param {PeerInfo} peerInfo
*/
Expand All @@ -383,7 +400,7 @@ class Libp2p extends EventEmitter {
log.error(new Error(codes.ERR_DISCOVERED_SELF))
return
}
peerInfo = this.peerBook.put(peerInfo)
peerInfo = this.peerStore.put(peerInfo)

if (!this.isStarted()) return

Expand Down Expand Up @@ -454,16 +471,15 @@ module.exports = Libp2p
* Like `new Libp2p(options)` except it will create a `PeerInfo`
* instance if one is not provided in options.
* @param {object} options Libp2p configuration options
* @param {function(Error, Libp2p)} callback
* @returns {void}
* @returns {Libp2p}
*/
module.exports.createLibp2p = promisify((options, callback) => {
module.exports.create = async (options = {}) => {
if (options.peerInfo) {
return nextTick(callback, null, new Libp2p(options))
return new Libp2p(options)
}
PeerInfo.create((err, peerInfo) => {
if (err) return callback(err)
options.peerInfo = peerInfo
callback(null, new Libp2p(options))
})
})

const peerInfo = await PeerInfo.create()

options.peerInfo = peerInfo
return new Libp2p(options)
}
Loading

0 comments on commit 4be379e

Please sign in to comment.