Skip to content

Commit

Permalink
wip: more floodsub tests; CLI placeholder tests; initializer
Browse files Browse the repository at this point in the history
  • Loading branch information
gavinmcdermott committed Nov 17, 2016
1 parent 9026a88 commit 12ec5c3
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 34 deletions.
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
},
"homepage": "https://github.com/ipfs/js-ipfs#readme",
"devDependencies": {

"aegir": "9.1.2",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
Expand Down Expand Up @@ -91,7 +90,7 @@
"ipld-resolver": "^0.2.0",
"isstream": "^0.1.2",
"joi": "^9.2.0",
"libp2p-floodsub": "0.3.0",
"libp2p-floodsub": "0.3.1",
"libp2p-ipfs": "^0.15.0",
"libp2p-ipfs-browser": "^0.16.0",
"lodash.flatmap": "^4.5.0",
Expand Down Expand Up @@ -149,4 +148,4 @@
"nginnever <ginneversource@gmail.com>",
"npmcdn-to-unpkg-bot <npmcdn-to-unpkg-bot@users.noreply.github.com>"
]
}
}
17 changes: 17 additions & 0 deletions src/cli/commands/floodsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict'

// NOTE: Floodsub CLI is not tested. Tests will not be run until
// https://github.com/ipfs/js-ipfs-api/pull/377
// is merged
module.exports = {
command: 'floodsub',

description: 'floodsub commands',

builder (yargs) {
return yargs
.commandDir('floodsub')
},

handler (argv) {}
}
28 changes: 28 additions & 0 deletions src/cli/commands/floodsub/pub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:floodsub')
log.error = debug('cli:floodsub:error')

module.exports = {
command: 'pub <topic> <message>',

describe: 'Publish a message to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.floodsub.pub(argv.topic, argv.message, (err) => {
if (err) {
throw err
}
})
})
}
}
30 changes: 30 additions & 0 deletions src/cli/commands/floodsub/sub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:floodsub')
log.error = debug('cli:floodsub:error')

module.exports = {
command: 'sub <topic>',

describe: 'Subscribe to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.floodsub.sub(argv.topic, (err, stream) => {
if (err) {
throw err
}

console.log(stream.toString())
})
})
}
}
30 changes: 15 additions & 15 deletions src/core/components/floodsub.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
'use strict'

// const FloodSub = require('libp2p-floodsub')
const FloodSub = require('./../../../node_modules/libp2p-floodsub/src')
const FloodSub = require('libp2p-floodsub')
const promisify = require('promisify-es6')
const Stream = require('stream')
const Readable = require('stream').Readable

const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR

module.exports = function floodsub (self) {
let fsub

return {
start: promisify(() => {
if (!self.isOnline()) {
throw OFFLINE_ERROR
}

fsub = new FloodSub(self._libp2pNode)
return self._libp2pNode
}),
function init () {
fsub = new FloodSub(self._libp2pNode)
}

return {
sub: promisify((topic, options, callback) => {
// TODO: Clarify with @diasdavid what to do with the `options.discover` param

This comment has been minimized.

Copy link
@daviddias

daviddias Nov 18, 2016

not much really, we don't have the DHT right now

// Ref: https://github.com/ipfs/js-ipfs-api/pull/377/files#diff-f0c61c06fd5dc36b6f760b7ea97b1862R50
Expand All @@ -32,9 +26,11 @@ module.exports = function floodsub (self) {
throw OFFLINE_ERROR
}

let rs = new Stream()
rs.readable = true
rs._read = () => {}
if (!fsub) {
init()
}

let rs = new Readable()
rs.cancel = () => fsub.unsubscribe(topic)

fsub.on(topic, (data) => {
Expand All @@ -58,6 +54,10 @@ module.exports = function floodsub (self) {
throw OFFLINE_ERROR
}

if (!fsub) {
init()
}

const buf = Buffer.isBuffer(data) ? data : new Buffer(data)

try {
Expand Down
1 change: 0 additions & 1 deletion src/core/components/go-online.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ module.exports = function goOnline (self) {
return cb(err)
}

self.floodsub.start()
self._bitswap = new Bitswap(
self._libp2pNode.peerInfo,
self._libp2pNode,
Expand Down
12 changes: 10 additions & 2 deletions src/http-api/resources/floodsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ exports = module.exports

exports.sub = {
handler: (request, reply) => {
const discover = request.query.discover
const discover = request.query.discover || null
const topic = request.params.topic

request.server.app.ipfs.floodsub.sub(topic, { discover }, (err, stream) => {
Expand All @@ -20,6 +20,14 @@ exports.sub = {
}).code(500)
}

// hapi is not very clever and throws if no
// - _read method
// - _readableState object
// are there :(
if (!stream._read) {
stream._read = () => {}
stream._readableState = {}
}
return reply(stream)
})
}
Expand All @@ -39,7 +47,7 @@ exports.pub = {
}).code(500)
}

return reply(true)
return reply()
})
}
}
2 changes: 1 addition & 1 deletion src/http-api/routes/floodsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module.exports = (server) => {

api.route({
method: '*',
path: '/api/v0/floodsub/pub',
path: '/api/v0/pubsub/pub',
config: {
handler: resources.floodsub.pub.handler,
validate: {
Expand Down
10 changes: 8 additions & 2 deletions test/cli/test-commands.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@ const ipfsBase = require('../utils/ipfs-exec')
const ipfs = ipfsBase(repoPath)
const describeOnlineAndOffline = require('../utils/on-and-off')

// NOTE: Floodsub CLI tests will not be run until
// https://github.com/ipfs/js-ipfs-api/pull/377
// is merged. But we have the files ready to go
// so the command count is bumped from 56 to 59
const commandCount = 59

describe('commands', () => {
describeOnlineAndOffline(repoPath, () => {
it('list the commands', () => {
return ipfs('commands').then((out) => {
expect(out.split('\n')).to.have.length(56)
expect(out.split('\n')).to.have.length(commandCount)
})
})
})
Expand All @@ -20,7 +26,7 @@ describe('commands', () => {
return ipfsBase(repoPath, {
cwd: '/tmp'
})('commands').then((out) => {
expect(out.split('\n').length).to.equal(56)
expect(out.split('\n').length).to.equal(commandCount)
})
})
})
67 changes: 67 additions & 0 deletions test/cli/test-floodsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/* eslint max-nested-callbacks: ["error", 8] */
/* eslint-env mocha */
'use strict'

const expect = require('chai').expect
const HttpAPI = require('../../src/http-api')
const createTempNode = require('../utils/temp-node')
const repoPath = require('./index').repoPath
const ipfs = require('../utils/ipfs-exec')(repoPath)

// NOTE: Floodsub CLI tests will not be run until
// https://github.com/ipfs/js-ipfs-api/pull/377
// is merged
describe.skip('floodsub', function () {
this.timeout(30 * 1000)
let node

const topic = 'nonscents'
const message = new Buffer('Some non cents.')

before((done) => {
createTempNode(1, (err, _node) => {
expect(err).to.not.exist
node = _node
node.goOnline((err) => {
expect(err).to.not.exist
done()
})
})
})

after((done) => {
node.goOffline(done)
})

describe('api running', () => {
let httpAPI
const called = true

before((done) => {
httpAPI = new HttpAPI(repoPath)
httpAPI.start((err) => {
expect(err).to.not.exist
done()
})
})

after((done) => {
httpAPI.stop((err) => {
expect(err).to.not.exist
done()
})
})

it('pub', () => {
return ipfs('floodsub', 'pub', topic, message).then((out) => {
expect(called).to.eql(true)
})
})

it('sub', () => {
return ipfs('floodsub', 'sub', topic).then((out) => {
expect(out).to.have.length.above(0)
})
})
})
})
10 changes: 0 additions & 10 deletions test/core/both/test-floodsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,6 @@ describe('floodsub', () => {
})

describe('Floodsub API', () => {
describe('start', () => {
it('throws if offline', () => {
expect(() => nodeOffline.floodsub.start()).to.throw
})

it('success', () => {
expect(nodeA.floodsub.start()).to.exist
})
})

describe('sub', () => {
it('throws if offline', () => {
expect(() => nodeOffline.floodsub.sub()).to.throw
Expand Down
Loading

3 comments on commit 12ec5c3

@gavinmcdermott
Copy link
Owner Author

@gavinmcdermott gavinmcdermott commented on 12ec5c3 Nov 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @diasdavid

These commits are the initial implementation of floodsub into js-ipfs core :) I would love to get a code review here to start tightening things up!

I was building against this unmerged api change, so there are a few test placeholders in my code for the CLI and js-ipfs-api tests.

Here are the relevant commits—sorry I can't squash them due to syncing with the upstream (let me know if you have any suggestions to clean this up...)

Most recent listed first...


Questions:

  • I ended up not adding an explicit floodsub.start method that we could call from the core's goOnline function. The way floodsub connects to peers was causing issues with ports in use and the like. In order to avoid pubsub-specific-implementation-issues affecting the core (which I do not think they should), I initialize FloodSub from within the module itself. Are you ok with this? Any thoughts?

  • In the floodsub.sub call, what should I do with the options.discover param? (I saw it passed in the API call I was building against)
    -- Where it manifests in my code
    -- Where I saw this in the request

Thanks a ton David! The feedback and ideas are greatly appreciated!

@daviddias
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Hey @gavinmcdermott :D Off to a great start I see :)

It would be great if this communication happened within a PR to js-ipfs, it helps to keep track of things and give review.

Have you went through the design thinking process of handling unsubscribes? Namely: ipfs#580 (comment)

I ended up not adding an explicit floodsub.start method that we could call from the core's goOnline function. The way floodsub connects to peers was causing issues with ports in use and the like. In order to avoid pubsub-specific-implementation-issues affecting the core (which I do not think they should), I initialize FloodSub from within the module itself. Are you ok with this? Any thoughts?

Sounds good for now, although we will want something that offers a more fine grained control. Related discussion: ipfs#556

In the floodsub.sub call, what should I do with the options.discover param? (I saw it passed in the API call I was building against)
-- Where it manifests in my code
-- Where I saw this in the request

Unfortunately nothing right now, we don't have the DHT.

@gavinmcdermott
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@diasdavid: Thank you for the comments.

It would be great if this communication happened within a PR to js-ipfs

My goal is to open this PR later today :)

Have you went through the design thinking process of handling unsubscribes?

Yes. unsubscribe is happening this morning.

Sounds good for now, although we will want something that offers a more fine grained control.

I had a sense of this....I'll add back in the start method that can be called to initialize FloodSub.

Thanks again for the feedback; it cultivates a great community, D!

Please sign in to comment.