Skip to content

Commit

Permalink
feat: make start and stop be async, fix a bunch of bugs in between, d…
Browse files Browse the repository at this point in the history
…etect a couple of others
  • Loading branch information
daviddias committed Aug 22, 2017
1 parent 108432e commit 84db9ce
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 220 deletions.
8 changes: 6 additions & 2 deletions src/decision-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const debug = require('debug')
const each = require('async/each')
const eachSeries = require('async/eachSeries')
const waterfall = require('async/waterfall')
const setImmediate = require('async/setImmediate')

const map = require('async/map')
const debounce = require('lodash.debounce')
const uniqWith = require('lodash.uniqwith')
Expand Down Expand Up @@ -271,12 +273,14 @@ class DecisionEngine {
return l
}

start () {
start (callback) {
this._running = true
setImmediate(() => callback())
}

stop () {
stop (callback) {
this._running = false
setImmediate(() => callback())
}
}

Expand Down
22 changes: 13 additions & 9 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -386,21 +386,25 @@ class Bitswap {
*
* @returns {void}
*/
start () {
this.wm.run()
this.network.start()
this.engine.start()
start (callback) {
series([

This comment has been minimized.

Copy link
@dignifiedquire

dignifiedquire Aug 22, 2017

Member

why not parallel?

This comment has been minimized.

Copy link
@daviddias

daviddias Aug 22, 2017

Author Member

True. I assumed they had to be sequencial since they were all 'sync before'.

This comment has been minimized.

Copy link
@daviddias

daviddias Aug 24, 2017

Author Member

Tried with parallel and it failed. There are dependencies between these pieces.

This comment has been minimized.

Copy link
@dignifiedquire

dignifiedquire Aug 24, 2017

Member

Interesting, we should know what those are

(cb) => this.wm.start(cb),
(cb) => this.network.start(cb),
(cb) => this.engine.start(cb)
], callback)
}

/**
* Stoop the bitswap node.
* Stop the bitswap node.
*
* @returns {void}
*/
stop () {
this.wm.stop()
this.network.stop()
this.engine.stop()
stop (callback) {
series([
(cb) => this.wm.stop(cb),
(cb) => this.network.stop(cb),
(cb) => this.engine.stop(cb)
], callback)
}
}

Expand Down
60 changes: 29 additions & 31 deletions src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const lp = require('pull-length-prefixed')
const pull = require('pull-stream')
const waterfall = require('async/waterfall')
const each = require('async/each')
const setImmediate = require('async/setImmediate')

const Message = require('./types/message')
const CONSTANTS = require('./constants')
Expand All @@ -27,7 +28,7 @@ class Network {
// this.libp2p.swarm.setMaxListeners(CONSTANTS.maxListeners)
}

start () {
start (callback) {
this._running = true
// bind event listeners
this._onPeerConnect = this._onPeerConnect.bind(this)
Expand All @@ -41,18 +42,24 @@ class Network {
this.libp2p.on('peer:disconnect', this._onPeerDisconnect)

// All existing connections are like new ones for us
this.libp2p.peerBook.getAllArray().filter((peer) => peer.isConnected())
.forEach((peer) => this._onPeerConnect((peer)))
this.libp2p.peerBook
.getAllArray()
.filter((peer) => peer.isConnected())
.forEach((peer) => this._onPeerConnect((peer)))

setImmediate(() => callback())
}

stop () {
stop (callback) {
this._running = false

this.libp2p.unhandle(BITSWAP100)
if (!this.b100Only) { this.libp2p.unhandle(BITSWAP110) }

this.libp2p.removeListener('peer:connect', this._onPeerConnect)
this.libp2p.removeListener('peer:disconnect', this._onPeerDisconnect)

setImmediate(() => callback())
}

// Handles both types of bitswap messgages
Expand All @@ -66,18 +73,15 @@ class Network {
pull.asyncMap((data, cb) => Message.deserialize(data, cb)),
pull.asyncMap((msg, cb) => {
conn.getPeerInfo((err, peerInfo) => {
if (err) {
return cb(err)
}
if (err) { return cb(err) }

// log('data from', peerInfo.id.toB58String())
this.bitswap._receiveMessage(peerInfo.id, msg, cb)
})
}),
pull.onEnd((err) => {
log('ending connection')
if (err) {
return this.bitswap._receiveError(err)
}
if (err) { return this.bitswap._receiveError(err) }
})
)
}
Expand Down Expand Up @@ -147,27 +151,21 @@ class Network {

// Dial to the peer and try to use the most recent Bitswap
_dialPeer (peer, callback) {
// dialByPeerInfo throws if no network is there
try {
// Attempt Bitswap 1.1.0
this.libp2p.dial(peer, BITSWAP110, (err, conn) => {
if (err) {
// Attempt Bitswap 1.0.0
this.libp2p.dial(peer, BITSWAP100, (err, conn) => {
if (err) {
return callback(err)
}

callback(null, conn, BITSWAP100)
})
return
}

callback(null, conn, BITSWAP110)
})
} catch (err) {
return callback(err)
}
// Attempt Bitswap 1.1.0
this.libp2p.dial(peer, BITSWAP110, (err, conn) => {
if (err) {
// Attempt Bitswap 1.0.0
this.libp2p.dial(peer, BITSWAP100, (err, conn) => {
if (err) { return callback(err) }

callback(null, conn, BITSWAP100)
})

return
}

callback(null, conn, BITSWAP110)
})
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/types/wantlist/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class Wantlist {
}
}

forEach (fn) {
return this.set.forEach(fn)
}

entries () {
return this.set.entries()
}
Expand Down
23 changes: 12 additions & 11 deletions src/want-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const Message = require('../types/message')
const Wantlist = require('../types/wantlist')
const CONSTANTS = require('../constants')
const MsgQueue = require('./msg-queue')
const setImmediate = require('async/setImmediate')

const log = debug('bitswap:wantmanager')
log.error = debug('bitswap:wantmanager:error')
Expand Down Expand Up @@ -111,24 +112,24 @@ module.exports = class WantManager {
this._stopPeerHandler(peerId)
}

run () {
start (callback) {
// resend entire wantlist every so often
this.timer = setInterval(() => {
// resend entirew wantlist every so often
const fullwantlist = new Message(true)
for (let entry of this.wantlist.entries()) {
this.wantlist.forEach((entry) => {
fullwantlist.addEntry(entry[1].cid, entry[1].priority)
}

this.peers.forEach((p) => {
p.addMessage(fullwantlist)
})

this.peers.forEach((p) => p.addMessage(fullwantlist))
}, 10 * 1000)

setImmediate(() => callback())
}

stop () {
for (let mq of this.peers.values()) {
this.disconnected(mq.peerId)
}
stop (callback) {
this.peers.forEach((mq) => this.disconnected(mq.peerId))

clearInterval(this.timer)
setImmediate(() => callback())
}
}
Loading

0 comments on commit 84db9ce

Please sign in to comment.