From 3303ad0dfa72572a06051d64a14a9d305c15ed7c Mon Sep 17 00:00:00 2001 From: dirkmc Date: Thu, 6 Dec 2018 05:29:24 -0500 Subject: [PATCH] fix: prevent double dialing same peer (#63) --- package.json | 1 + src/base.js | 24 ++++++++++ test/2-nodes.js | 120 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 145 insertions(+) diff --git a/package.json b/package.json index f9158cfd89..0fc1760da1 100644 --- a/package.json +++ b/package.json @@ -41,6 +41,7 @@ "aegir": "^17.1.1", "benchmark": "^2.1.4", "chai": "^4.2.0", + "chai-spies": "^1.0.0", "dirty-chai": "^2.0.1", "libp2p": "~0.24.1", "libp2p-secio": "~0.10.1", diff --git a/src/base.js b/src/base.js index c6b5ba3b26..3f73d59a76 100644 --- a/src/base.js +++ b/src/base.js @@ -38,6 +38,9 @@ class BaseProtocol extends EventEmitter { */ this.peers = new Map() + // Dials that are currently in progress + this._dials = new Set() + this._onConnection = this._onConnection.bind(this) this._dialPeer = this._dialPeer.bind(this) } @@ -88,13 +91,29 @@ class BaseProtocol extends EventEmitter { return setImmediate(() => callback()) } + // If already dialing this peer, ignore + if (this._dials.has(idB58Str)) { + this.log('already dialing %s, ignoring dial attempt', idB58Str) + return setImmediate(() => callback()) + } + this._dials.add(idB58Str) + this.log('dialing %s', idB58Str) this.libp2p.dialProtocol(peerInfo, this.multicodec, (err, conn) => { + this.log('dial to %s complete', idB58Str) if (err) { this.log.err(err) return callback() } + // If the dial is not in the set, it means that floodsub has been + // stopped, so we should just bail out + if (!this._dials.has(idB58Str)) { + this.log('floodsub was stopped, not processing dial to %s', idB58Str) + return callback() + } + this._dials.delete(idB58Str) + this._onDial(peerInfo, conn, callback) }) } @@ -149,6 +168,7 @@ class BaseProtocol extends EventEmitter { if (this.started) { return setImmediate(() => callback(new Error('already started'))) } + this.log('starting') this.libp2p.handle(this.multicodec, this._onConnection) @@ -160,6 +180,7 @@ class BaseProtocol extends EventEmitter { asyncEach(peerInfos, (peer, cb) => this._dialPeer(peer, cb), (err) => { setImmediate(() => { + this.log('started') this.started = true callback(err) }) @@ -181,6 +202,9 @@ class BaseProtocol extends EventEmitter { this.libp2p.unhandle(this.multicodec) this.libp2p.removeListener('peer:connect', this._dialPeer) + // Prevent any dials that are in flight from being processed + this._dials = new Set() + this.log('stopping') asyncEach(this.peers.values(), (peer, cb) => peer.close(cb), (err) => { if (err) { diff --git a/test/2-nodes.js b/test/2-nodes.js index 15b767762d..82e78f36b2 100644 --- a/test/2-nodes.js +++ b/test/2-nodes.js @@ -4,6 +4,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) +chai.use(require('chai-spies')) const expect = chai.expect const parallel = require('async/parallel') const series = require('async/series') @@ -390,6 +391,125 @@ describe('basics between 2 nodes', () => { ], done) }) }) + + describe('prevent concurrent dials', () => { + let sandbox + let nodeA + let nodeB + let fsA + let fsB + + before((done) => { + sandbox = chai.spy.sandbox() + + series([ + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb), + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb) + ], (err, nodes) => { + if (err) return done(err) + + nodeA = nodes[0] + nodeB = nodes[1] + + // Put node B in node A's peer book + nodeA.peerBook.put(nodeB.peerInfo) + + fsA = new FloodSub(nodeA) + fsB = new FloodSub(nodeB) + + fsB.start(done) + }) + }) + + after((done) => { + sandbox.restore() + + parallel([ + (cb) => nodeA.stop(cb), + (cb) => nodeB.stop(cb) + ], (ignoreErr) => { + done() + }) + }) + + it('does not dial twice to same peer', (done) => { + sandbox.on(fsA, ['_onDial']) + + // When node A starts, it will dial all peers in its peer book, which + // is just peer B + fsA.start(startComplete) + + // Simulate a connection coming in from peer B at the same time. This + // causes floodsub to dial peer B + nodeA.emit('peer:connect', nodeB.peerInfo) + + function startComplete () { + // Check that only one dial was made + setTimeout(() => { + expect(fsA._onDial).to.have.been.called.once() + done() + }, 1000) + } + }) + }) + + describe('prevent processing dial after stop', () => { + let sandbox + let nodeA + let nodeB + let fsA + let fsB + + before((done) => { + sandbox = chai.spy.sandbox() + + series([ + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb), + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb) + ], (err, nodes) => { + if (err) return done(err) + + nodeA = nodes[0] + nodeB = nodes[1] + + fsA = new FloodSub(nodeA) + fsB = new FloodSub(nodeB) + + parallel([ + (cb) => fsA.start(cb), + (cb) => fsB.start(cb) + ], done) + }) + }) + + after((done) => { + sandbox.restore() + + parallel([ + (cb) => nodeA.stop(cb), + (cb) => nodeB.stop(cb) + ], (ignoreErr) => { + done() + }) + }) + + it('does not process dial after stop', (done) => { + sandbox.on(fsA, ['_onDial']) + + // Simulate a connection coming in from peer B at the same time. This + // causes floodsub to dial peer B + nodeA.emit('peer:connect', nodeB.peerInfo) + + // Stop floodsub before the dial can complete + fsA.stop(() => { + // Check that the dial was not processed + setTimeout(() => { + expect(fsA._onDial).to.not.have.been.called() + done() + }, 1000) + }) + }) + }) }) function shouldNotHappen (msg) {