From e6954c594516aecd3097f9605f4424b9a58249dd Mon Sep 17 00:00:00 2001 From: dmitriy ryajov Date: Thu, 2 Mar 2017 10:09:25 -0800 Subject: [PATCH] feat: initial implementation of circuit relaying --- src/config.js | 11 ++++ src/dialer.js | 104 +++++++++++++++++++++++++++++++++ src/listener.js | 88 ++++++++++++++++++++++++++++ src/peer.js | 84 +++++++++++++++++++++++++++ src/relay.js | 142 +++++++++++++++++++++++++++++++++++++++++++++ test/index.spec.js | 106 +++++++++++++++++++++++++++++++++ 6 files changed, 535 insertions(+) create mode 100644 src/config.js create mode 100644 src/dialer.js create mode 100644 src/listener.js create mode 100644 src/peer.js create mode 100644 src/relay.js create mode 100644 test/index.spec.js diff --git a/src/config.js b/src/config.js new file mode 100644 index 0000000..9d72525 --- /dev/null +++ b/src/config.js @@ -0,0 +1,11 @@ +'use strict' + +const debug = require('debug') + +const log = debug('libp2p:circuit') +log.err = debug('libp2p:circuit:error') + +module.exports = { + log: log, + multicodec: '/ipfs/relay/circuit/1.0.0' +} diff --git a/src/dialer.js b/src/dialer.js new file mode 100644 index 0000000..7073bd4 --- /dev/null +++ b/src/dialer.js @@ -0,0 +1,104 @@ +'use strict' + +const config = require('./config') +const mss = require('multistream-select') +const pull = require('pull-stream') +const Peer = require('./peer') + +const log = config.log + +class Dialer { + constructor (libp2p, relayPeers) { + this.libp2p = libp2p + this.relayPeers = relayPeers || new Map() + this.peers = new Map() + + // Speed up any new peer that comes in my way + // this.libp2p.swarm.on('peer-mux-established', this._isRelayPeer) + // this.libp2p.swarm.on('peer-mux-closed', () => { + // // TODO: detach relay connection + // }) + } + + relay (peerInfo, callback) { + if (this.peers.has(peerInfo.id.toB58String())) { + return callback(null, + this.peers.get(peerInfo.id.toB58String()).conn) + } + + let next = (relayPeer) => { + if (!relayPeer) { + return callback(`no relay peers were found!`) + } + + log(`Trying relay peer ${relayPeer.id.toB58String()}`) + this._dialRelay(relayPeer, (err, conn) => { + if (err) { + if (relays.length > 0) { + return next(relays.shift()) + } + return callback(err) + } + + this._negotiateRelay(conn, peerInfo, (err, conn) => { + if (err) { + log.err(`An error has occurred negotiating the relay connection`, err) + return callback(err) + } + + this.peers.set(peerInfo.id.toB58String(), new Peer(conn)) + callback(null, conn) + }) + }) + } + + let relays = Array.from(this.relayPeers.values()) + next(relays.shift()) + } + + _negotiateRelay (conn, peerInfo, callback) { + let addr = this.libp2p.peerInfo.distinctMultiaddr() + let destAddrs = peerInfo.distinctMultiaddr() + + if (!(addr && addr.length > 0) || !(destAddrs && destAddrs.length > 0)) { + log.err(`No valid multiaddress for peer!`) + callback(`No valid multiaddress for peer!`) + } + + log(`negotiating relay for peer ${peerInfo.id.toB58String()}`) + mss.util.encode(new Buffer(`${destAddrs[0].toString()}/ipfs/${peerInfo.id.toB58String()}`), (err, encoded) => { + pull( + pull.values([encoded]), + conn, + pull.collect((err) => { + callback(err, conn) + }) + ) + }) + } + + _isRelayPeer (peerInfo) { + this._dialRelay(peerInfo, (peerInfo, conn) => { + // TODO: implement relay peer discovery here + }) + } + + _dialRelay (relayPeer, callback) { + const idB58Str = relayPeer.id.toB58String() + log('dialing %s', idB58Str) + + if (this.peers.has(idB58Str)) { + return callback(null, this.peers.get(idB58Str)) + } + + this.libp2p.dialByPeerInfo(relayPeer, config.multicodec, (err, conn) => { + if (err) { + return callback(err) + } + + callback(null, conn) + }) + } +} + +module.exports = Dialer diff --git a/src/listener.js b/src/listener.js new file mode 100644 index 0000000..74db365 --- /dev/null +++ b/src/listener.js @@ -0,0 +1,88 @@ +const config = require('./config') +const Peer = require('./peer') +const pull = require('pull-stream') +const multiaddr = require('multiaddr') +const PeerInfo = require('peer-info') +const includes = require('lodash/includes') +const lp = require('pull-length-prefixed') + +const multicodec = config.multicodec + +const log = config.log + +class Listener { + constructor (libp2p) { + this.libp2p = libp2p + this.peers = new Map() + + this._onConnection = this._onConnection.bind(this) + } + + listen (cb) { + this.libp2p.handle(multicodec, this._onConnection) + cb() + } + + close (cb) { + this.libp2p.unhandle(multicodec) + cb() + } + + _onConnection (protocol, conn) { + conn.getPeerInfo((err, peerInfo) => { + if (err) { + log.err('Failed to identify incomming conn', err) + return pull(pull.empty(), conn) + } + + const idB58Str = peerInfo.id.toB58String() + let relayPeer = this.peers.get(idB58Str) + if (!relayPeer) { + log('new peer', idB58Str) + relayPeer = new Peer(peerInfo) + this.peers.set(idB58Str, relayPeer) + } + relayPeer.attachConnection(conn) + this._processConnection(relayPeer, conn) + }) + } + + _processConnection (relayPeer, conn) { + return pull( + conn, + lp.decode(), + pull.collect((err, val) => { + if (err) { + err(err) + return err + } + + let addr = multiaddr(val[0].toString()) + let sourcePeer + try { + PeerInfo.create(addr.peerId(), (err, peerId) => { + let peerInfo = new PeerInfo(peerId) + + if (includes(addr.protoNames(), 'ipfs')) { + addr = addr.decapsulate('ipfs') + } + + peerInfo.multiaddr.add(addr) + sourcePeer = new Peer(peerInfo) + sourcePeer.attachConnection(conn) // attach relay connection + + pull( + pull.values(['Hello']), + conn + ) + }) + } catch (err) { + log.err(err) + } + }) + ) + } + +} + +module.exports = Listener diff --git a/src/peer.js b/src/peer.js new file mode 100644 index 0000000..e0d6c5e --- /dev/null +++ b/src/peer.js @@ -0,0 +1,84 @@ +'use strict' + +const pull = require('pull-stream') + +/** + * The known state of a connected peer. + */ +class Peer { + /** + * @param {PeerInfo} this peer + */ + constructor (peerInfo) { + /** + * @type {PeerInfo} + */ + this.peer = peerInfo + /** + * @type {Map} map of relayed peers + */ + this.peers = new Map() + /** + * @type {Connection} + */ + this.conn = null + } + + /** + * Is the peer connected currently? + * + * @type {boolean} + */ + get isConnected () { + return Boolean(this.conn) + } + + /** + * Circuit this connection with the dest peer + * + * @param dest {Peer} the destination peer to be short circuited + */ + circuit (dest, callback) { + if (this.peers.has(dest.peer.id.toB58String())) { + callback(null) + } + + pull(this.conn, dest.conn) + this.peers.set(dest.peer.id.toB58String(), dest) + + callback(null) + } + + /** + * Attach the peer to a connection and setup a write stream + * + * @param {Connection} conn + * @returns {undefined} + */ + attachConnection (conn) { + this.conn = conn + } + + /** + * Closes the open connection to peer + * + * @param {Function} callback + * @returns {undefined} + */ + close (callback) { + if (!this.conn || !this.stream) { + // no connection to close + } + // end the pushable pull-stream + if (this.stream) { + this.stream.end() + } + setImmediate(() => { + this.conn = null + this.stream = null + callback() + }) + } +} + +module.exports = Peer diff --git a/src/relay.js b/src/relay.js new file mode 100644 index 0000000..76c74ab --- /dev/null +++ b/src/relay.js @@ -0,0 +1,142 @@ +'use strict' + +const pull = require('pull-stream') +const lp = require('pull-length-prefixed') +const multiaddr = require('multiaddr') +const config = require('./config') +const Peer = require('./peer') +const mss = require('multistream-select') + +const multicodec = require('./config').multicodec + +const log = config.log + +class Relay { + constructor (libp2p) { + this.libp2p = libp2p + this.peers = new Map() + + this._onConnection = this._onConnection.bind(this) + this._dialPeer = this._dialPeer.bind(this) + } + + start (cb) { + this.libp2p.handle(multicodec, this._onConnection) + cb() + } + + stop (cb) { + this.libp2p.unhandle(multicodec) + cb() + } + + _dialPeer (ma, callback) { + let idB58Str + + try { + idB58Str = ma.peerId() // try to get the peerid from the multiaddr + } catch (err) { + log.err(err) + } + + if (idB58Str) { + const peer = this.peers.get(idB58Str) + if (peer && peer.isConnected()) { + return + } + } + + this.libp2p.dialByMultiaddr(ma, multicodec, (err, conn) => { + if (err) { + log.err(err) + return callback(err) + } + + conn.getPeerInfo((err, peerInfo) => { + if (err) { + err(err) + return + } + + this._onDial(peerInfo, conn, callback) + }) + }) + } + + _onDial (peerInfo, conn, callback) { + const idB58Str = peerInfo.id.toB58String() + + // If already had a dial to me, just add the conn + if (!this.peers.has(idB58Str)) { + this.peers.set(idB58Str, new Peer(peerInfo)) + } + + const peer = this.peers.get(idB58Str) + peer.attachConnection(conn) + + let srcAddrs = peerInfo.distinctMultiaddr() + + if (!(srcAddrs && srcAddrs.length > 0)) { + log.err(`No valid multiaddress for peer!`) + callback(`No valid multiaddress for peer!`) + } + + mss.util.encode(new Buffer(`${srcAddrs[0].toString()} + /ipfs/${peerInfo.id.toB58String()}`), (err, encoded) => { + pull( + pull.values([encoded]), + conn, + pull.collect((err) => { + callback(err, peer) + }) + ) + }) + } + + _onConnection (protocol, conn) { + conn.getPeerInfo((err, peerInfo) => { + if (err) { + log.err('Failed to identify incomming conn', err) + return pull(pull.empty(), conn) + } + + const idB58Str = peerInfo.id.toB58String() + let srcPeer = this.peers.get(idB58Str) + if (!srcPeer) { + log('new peer', idB58Str) + srcPeer = new Peer(peerInfo) + this.peers.set(idB58Str, srcPeer) + } + srcPeer.attachConnection(conn) + this._processConnection(srcPeer, conn) + }) + } + + _processConnection (srcPeer, conn) { + return pull( + conn, + lp.decode(), + pull.collect((err, val) => { + if (err) { + err(err) + return err + } + + let addr = multiaddr(val[0].toString()) + this._circuit(srcPeer, addr, (err) => { + if (err) { + log.err(err) + } + }) + }) + ) + } + + _circuit (srcPeer, ma, callback) { + this._dialPeer(ma, (err, destPeer) => { + srcPeer.circuit(destPeer, callback) + }) + } +} + +module.exports = Relay diff --git a/test/index.spec.js b/test/index.spec.js new file mode 100644 index 0000000..c1e2d61 --- /dev/null +++ b/test/index.spec.js @@ -0,0 +1,106 @@ +const Node = require('libp2p-ipfs-nodejs') +const PeerInfo = require('peer-info') +const series = require('async/series') +const parallel = require('async/parallel') +const pull = require('pull-stream') + +const Relay = require('../src').Relay +const Dialer = require('../src').Dialer +const Listener = require('../src').Listener + +describe(`test circuit`, () => { + let srcNode + let dstNode + let relayNode + + let srcPeer + let dstPerr + let relayPeer + + let dialer + let relayCircuit + let listener + + let srcMa + let dstMa + let relayMa + + let portBase = 9000 // TODO: randomize or mock sockets + before((done) => { + series([ + (cb) => { + PeerInfo.create((err, info) => { + srcPeer = info + srcPeer.multiaddr.add(`/ip4/0.0.0.0/tcp/${portBase++}`) + srcNode = new Node(srcPeer) + cb(err) + }) + }, + (cb) => { + PeerInfo.create((err, info) => { + dstPeer = info + dstPeer.multiaddr.add(`/ip4/0.0.0.0/tcp/${portBase++}`) + dstNode = new Node(dstPeer) + cb(err) + }) + }, + (cb) => { + PeerInfo.create((err, info) => { + relayPeer = info + relayPeer.multiaddr.add(`/ip4/0.0.0.0/tcp/${portBase++}`) + relayNode = new Node(relayPeer) + cb(err) + }) + }, + (cb) => { + let relays = new Map() + relays.set(relayPeer.id.toB58String(), relayPeer) + dialer = new Dialer(srcNode, relays) + cb() + }, + (cb) => { + relayCircuit = new Relay(relayNode) + relayCircuit.start(cb) + }, + (cb) => { + listener = new Listener(dstNode) + listener.listen(cb) + }], + (err) => done(err) + ) + }) + + beforeEach((done) => { + parallel([ + (cb) => { + srcNode.start(cb) + }, + (cb) => { + dstNode.start(cb) + }, + (cb) => { + relayNode.start(cb) + } + ], (err) => done(err)) + }) + + afterEach((done) => { + parallel([ + (cb) => { + srcNode.stop(cb) + }, + (cb) => { + dstNode.stop(cb) + }, + (cb) => { + relayNode.stop(cb) + } + ], (err) => done(err)) + }) + + it(`should connect to relay peer`, (done) => { + dialer.relay(dstPeer, (err, conn) => { + done(err) + }) + }).timeout(50000) +})