Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
feat: initial implementation of circuit relaying
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitriy ryajov authored and dmitriy ryajov committed Mar 2, 2017
1 parent 0225da3 commit e6954c5
Show file tree
Hide file tree
Showing 6 changed files with 535 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
'use strict'

const debug = require('debug')

const log = debug('libp2p:circuit')
log.err = debug('libp2p:circuit:error')

module.exports = {
log: log,
multicodec: '/ipfs/relay/circuit/1.0.0'
}
104 changes: 104 additions & 0 deletions src/dialer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
'use strict'

const config = require('./config')
const mss = require('multistream-select')
const pull = require('pull-stream')
const Peer = require('./peer')

const log = config.log

class Dialer {
constructor (libp2p, relayPeers) {
this.libp2p = libp2p
this.relayPeers = relayPeers || new Map()
this.peers = new Map()

// Speed up any new peer that comes in my way
// this.libp2p.swarm.on('peer-mux-established', this._isRelayPeer)
// this.libp2p.swarm.on('peer-mux-closed', () => {
// // TODO: detach relay connection
// })
}

relay (peerInfo, callback) {
if (this.peers.has(peerInfo.id.toB58String())) {
return callback(null,
this.peers.get(peerInfo.id.toB58String()).conn)
}

let next = (relayPeer) => {
if (!relayPeer) {
return callback(`no relay peers were found!`)
}

log(`Trying relay peer ${relayPeer.id.toB58String()}`)
this._dialRelay(relayPeer, (err, conn) => {
if (err) {
if (relays.length > 0) {
return next(relays.shift())
}
return callback(err)
}

this._negotiateRelay(conn, peerInfo, (err, conn) => {
if (err) {
log.err(`An error has occurred negotiating the relay connection`, err)
return callback(err)
}

this.peers.set(peerInfo.id.toB58String(), new Peer(conn))
callback(null, conn)
})
})
}

let relays = Array.from(this.relayPeers.values())
next(relays.shift())
}

_negotiateRelay (conn, peerInfo, callback) {
let addr = this.libp2p.peerInfo.distinctMultiaddr()
let destAddrs = peerInfo.distinctMultiaddr()

if (!(addr && addr.length > 0) || !(destAddrs && destAddrs.length > 0)) {
log.err(`No valid multiaddress for peer!`)
callback(`No valid multiaddress for peer!`)
}

log(`negotiating relay for peer ${peerInfo.id.toB58String()}`)
mss.util.encode(new Buffer(`${destAddrs[0].toString()}/ipfs/${peerInfo.id.toB58String()}`), (err, encoded) => {
pull(
pull.values([encoded]),
conn,
pull.collect((err) => {
callback(err, conn)
})
)
})
}

_isRelayPeer (peerInfo) {
this._dialRelay(peerInfo, (peerInfo, conn) => {
// TODO: implement relay peer discovery here
})
}

_dialRelay (relayPeer, callback) {
const idB58Str = relayPeer.id.toB58String()
log('dialing %s', idB58Str)

if (this.peers.has(idB58Str)) {
return callback(null, this.peers.get(idB58Str))
}

this.libp2p.dialByPeerInfo(relayPeer, config.multicodec, (err, conn) => {
if (err) {
return callback(err)
}

callback(null, conn)
})
}
}

module.exports = Dialer
88 changes: 88 additions & 0 deletions src/listener.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
const config = require('./config')
const Peer = require('./peer')
const pull = require('pull-stream')
const multiaddr = require('multiaddr')
const PeerInfo = require('peer-info')
const includes = require('lodash/includes')
const lp = require('pull-length-prefixed')

const multicodec = config.multicodec

const log = config.log

class Listener {
constructor (libp2p) {
this.libp2p = libp2p
this.peers = new Map()

this._onConnection = this._onConnection.bind(this)
}

listen (cb) {
this.libp2p.handle(multicodec, this._onConnection)
cb()
}

close (cb) {
this.libp2p.unhandle(multicodec)
cb()
}

_onConnection (protocol, conn) {
conn.getPeerInfo((err, peerInfo) => {
if (err) {
log.err('Failed to identify incomming conn', err)
return pull(pull.empty(), conn)
}

const idB58Str = peerInfo.id.toB58String()
let relayPeer = this.peers.get(idB58Str)
if (!relayPeer) {
log('new peer', idB58Str)
relayPeer = new Peer(peerInfo)
this.peers.set(idB58Str, relayPeer)
}
relayPeer.attachConnection(conn)
this._processConnection(relayPeer, conn)
})
}

_processConnection (relayPeer, conn) {
return pull(
conn,
lp.decode(),
pull.collect((err, val) => {
if (err) {
err(err)
return err
}

let addr = multiaddr(val[0].toString())
let sourcePeer
try {
PeerInfo.create(addr.peerId(), (err, peerId) => {
let peerInfo = new PeerInfo(peerId)

if (includes(addr.protoNames(), 'ipfs')) {
addr = addr.decapsulate('ipfs')
}

peerInfo.multiaddr.add(addr)
sourcePeer = new Peer(peerInfo)
sourcePeer.attachConnection(conn) // attach relay connection

pull(
pull.values(['Hello']),
conn
)
})
} catch (err) {
log.err(err)
}
})
)
}

}

module.exports = Listener
84 changes: 84 additions & 0 deletions src/peer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
'use strict'

const pull = require('pull-stream')

/**
* The known state of a connected peer.
*/
class Peer {
/**
* @param {PeerInfo} this peer
*/
constructor (peerInfo) {
/**
* @type {PeerInfo}
*/
this.peer = peerInfo
/**
* @type {Map<Peer>} map of relayed peers
*/
this.peers = new Map()
/**
* @type {Connection}
*/
this.conn = null
}

/**
* Is the peer connected currently?
*
* @type {boolean}
*/
get isConnected () {
return Boolean(this.conn)
}

/**
* Circuit this connection with the dest peer
*
* @param dest {Peer} the destination peer to be short circuited
*/
circuit (dest, callback) {
if (this.peers.has(dest.peer.id.toB58String())) {
callback(null)
}

pull(this.conn, dest.conn)
this.peers.set(dest.peer.id.toB58String(), dest)

callback(null)
}

/**
* Attach the peer to a connection and setup a write stream
*
* @param {Connection} conn
* @returns {undefined}
*/
attachConnection (conn) {
this.conn = conn
}

/**
* Closes the open connection to peer
*
* @param {Function} callback
* @returns {undefined}
*/
close (callback) {
if (!this.conn || !this.stream) {
// no connection to close
}
// end the pushable pull-stream
if (this.stream) {
this.stream.end()
}
setImmediate(() => {
this.conn = null
this.stream = null
callback()
})
}
}

module.exports = Peer
Loading

0 comments on commit e6954c5

Please sign in to comment.