diff --git a/src/components/network/index.js b/src/components/network/index.js index 7521b849..2fbc684f 100644 --- a/src/components/network/index.js +++ b/src/components/network/index.js @@ -3,7 +3,8 @@ const debug = require('debug') const lp = require('pull-length-prefixed') const pull = require('pull-stream') -const setImmediate = require('async/setImmediate') +const waterfall = require('async/waterfall') +const each = require('async/each') const Message = require('../../types/message') const CONSTANTS = require('../../constants') @@ -99,24 +100,39 @@ class Network { this.bitswap._onPeerDisconnected(peerInfo.id) } - // Connect to the given peer connectTo (peerId, callback) { - const done = (err) => setImmediate(() => callback(err)) - if (!this._running) { - return done(new Error('No running network')) + return callback(new Error('No running network')) } - // NOTE: For now, all this does is ensure that we are - // connected. Once we have Peer Routing, we will be able - // to find the Peer - if (this.libp2p.swarm.muxedConns[peerId.toB58String()]) { - done() - } else { - done(new Error('Could not connect to peer with peerId:', peerId.toB58String())) - } + this.libp2p.dial(peerId, (err, conn) => { + if (err) { + return callback(err) + } + + pull(pull.empty, conn) + callback() + }) + } + + findProviders (cid, maxProviders, callback) { + this.libp2p.dht.findNProviders(cid, CONSTANTS.providerRequestTimeout, maxProviders, callback) + } + + findAndConnect (cid, maxProviders, callback) { + waterfall([ + (cb) => this.findProviders(cid, maxProviders, cb), + (provs, cb) => each(provs, (p, cb) => { + this.connectTo(p, cb) + }) + ], callback) + } + + provide (cid, callback) { + this.libp2p.dht.provide() } + // Connect to the given peer // Send the given msg (instance of Message) to the given peer sendMessage (peerId, msg, callback) { if (!this._running) { @@ -125,14 +141,8 @@ class Network { const stringId = peerId.toB58String() log('sendMessage to %s', stringId, msg) - let peerInfo - try { - peerInfo = this.peerBook.getByB58String(stringId) - } catch (err) { - return callback(err) - } - this._dialPeer(peerInfo, (err, conn, protocol) => { + this._dialPeer(peerId, (err, conn, protocol) => { if (err) { return callback(err) } @@ -157,14 +167,14 @@ class Network { }) } - _dialPeer (peerInfo, callback) { + _dialPeer (peer, callback) { // dialByPeerInfo throws if no network is there try { // Attempt Bitswap 1.1.0 - this.libp2p.dial(peerInfo, BITSWAP110, (err, conn) => { + this.libp2p.dial(peer, BITSWAP110, (err, conn) => { if (err) { // Attempt Bitswap 1.0.0 - this.libp2p.dial(peerInfo, BITSWAP100, (err, conn) => { + this.libp2p.dial(peer, BITSWAP100, (err, conn) => { if (err) { return callback(err) } diff --git a/src/components/want-manager/msg-queue.js b/src/components/want-manager/msg-queue.js index f9f25652..5af1f139 100644 --- a/src/components/want-manager/msg-queue.js +++ b/src/components/want-manager/msg-queue.js @@ -53,6 +53,7 @@ module.exports = class MsgQueue { log.error('cant connect to peer %s: %s', this.peerId.toB58String(), err.message) return } + log('sending message') this.network.sendMessage(this.peerId, msg, (err) => { if (err) { diff --git a/src/index.js b/src/index.js index de5a5684..0feaeaf7 100644 --- a/src/index.js +++ b/src/index.js @@ -126,6 +126,12 @@ class Bitswap { `block:${block.cid.buffer.toString()}`, block ) + this.network.provide(block.cid, (err) => { + if (err) { + log.error('Failed to provide: %s', err.message) + } + }) + this.engine.receivedBlocks([block.cid]) callback() }) @@ -198,7 +204,14 @@ class Bitswap { } addListener() - this.wm.wantBlocks([cid]) + + this.network.findAndConnect(cid, CONSTANTS.maxProvidersPerRequest, (err) => { + if (err) { + return callback(err) + } + + this.wm.wantBlocks([cid]) + }) }) } @@ -269,6 +282,11 @@ class Bitswap { block ) this.engine.receivedBlocks([block.cid]) + this.network.provide(block.cid, (err) => { + if (err) { + log.error('Failed to provide: %s', err.message) + } + }) }) cb() })