Skip to content

Commit

Permalink
wip: dht integration
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Apr 3, 2017
1 parent e6a365f commit 88b97a8
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 24 deletions.
56 changes: 33 additions & 23 deletions src/components/network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
const debug = require('debug')
const lp = require('pull-length-prefixed')
const pull = require('pull-stream')
const setImmediate = require('async/setImmediate')
const waterfall = require('async/waterfall')
const each = require('async/each')

const Message = require('../../types/message')
const CONSTANTS = require('../../constants')
Expand Down Expand Up @@ -99,24 +100,39 @@ class Network {
this.bitswap._onPeerDisconnected(peerInfo.id)
}

// Connect to the given peer
connectTo (peerId, callback) {
const done = (err) => setImmediate(() => callback(err))

if (!this._running) {
return done(new Error('No running network'))
return callback(new Error('No running network'))
}

// NOTE: For now, all this does is ensure that we are
// connected. Once we have Peer Routing, we will be able
// to find the Peer
if (this.libp2p.swarm.muxedConns[peerId.toB58String()]) {
done()
} else {
done(new Error('Could not connect to peer with peerId:', peerId.toB58String()))
}
this.libp2p.dial(peerId, (err, conn) => {
if (err) {
return callback(err)
}

pull(pull.empty, conn)
callback()
})
}

findProviders (cid, maxProviders, callback) {
this.libp2p.dht.findNProviders(cid, CONSTANTS.providerRequestTimeout, maxProviders, callback)
}

findAndConnect (cid, maxProviders, callback) {
waterfall([
(cb) => this.findProviders(cid, maxProviders, cb),
(provs, cb) => each(provs, (p, cb) => {
this.connectTo(p, cb)
})
], callback)
}

provide (cid, callback) {
this.libp2p.dht.provide()
}

// Connect to the given peer
// Send the given msg (instance of Message) to the given peer
sendMessage (peerId, msg, callback) {
if (!this._running) {
Expand All @@ -125,14 +141,8 @@ class Network {

const stringId = peerId.toB58String()
log('sendMessage to %s', stringId, msg)
let peerInfo
try {
peerInfo = this.peerBook.getByB58String(stringId)
} catch (err) {
return callback(err)
}

this._dialPeer(peerInfo, (err, conn, protocol) => {
this._dialPeer(peerId, (err, conn, protocol) => {
if (err) {
return callback(err)
}
Expand All @@ -157,14 +167,14 @@ class Network {
})
}

_dialPeer (peerInfo, callback) {
_dialPeer (peer, callback) {
// dialByPeerInfo throws if no network is there
try {
// Attempt Bitswap 1.1.0
this.libp2p.dial(peerInfo, BITSWAP110, (err, conn) => {
this.libp2p.dial(peer, BITSWAP110, (err, conn) => {
if (err) {
// Attempt Bitswap 1.0.0
this.libp2p.dial(peerInfo, BITSWAP100, (err, conn) => {
this.libp2p.dial(peer, BITSWAP100, (err, conn) => {
if (err) {
return callback(err)
}
Expand Down
1 change: 1 addition & 0 deletions src/components/want-manager/msg-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ module.exports = class MsgQueue {
log.error('cant connect to peer %s: %s', this.peerId.toB58String(), err.message)
return
}

log('sending message')
this.network.sendMessage(this.peerId, msg, (err) => {
if (err) {
Expand Down
20 changes: 19 additions & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ class Bitswap {
`block:${block.cid.buffer.toString()}`,
block
)
this.network.provide(block.cid, (err) => {
if (err) {
log.error('Failed to provide: %s', err.message)
}
})

this.engine.receivedBlocks([block.cid])
callback()
})
Expand Down Expand Up @@ -198,7 +204,14 @@ class Bitswap {
}

addListener()
this.wm.wantBlocks([cid])

this.network.findAndConnect(cid, CONSTANTS.maxProvidersPerRequest, (err) => {
if (err) {
return callback(err)
}

this.wm.wantBlocks([cid])
})
})
}

Expand Down Expand Up @@ -269,6 +282,11 @@ class Bitswap {
block
)
this.engine.receivedBlocks([block.cid])
this.network.provide(block.cid, (err) => {
if (err) {
log.error('Failed to provide: %s', err.message)
}
})
})
cb()
})
Expand Down

0 comments on commit 88b97a8

Please sign in to comment.