diff --git a/src/components/decision/engine.js b/src/components/decision-engine/index.js similarity index 77% rename from src/components/decision/engine.js rename to src/components/decision-engine/index.js index 4cd60d2b..9186067b 100644 --- a/src/components/decision/engine.js +++ b/src/components/decision-engine/index.js @@ -7,6 +7,7 @@ const whilst = require('async/whilst') const setImmediate = require('async/setImmediate') const each = require('async/each') const debounce = require('lodash.debounce') +const CID = require('cids') const log = debug('bitswap:engine') log.error = debug('bitswap:engine:error') @@ -16,7 +17,7 @@ const Wantlist = require('../../types/wantlist') const PeerRequestQueue = require('./peer-request-queue') const Ledger = require('./ledger') -module.exports = class Engine { +class DecisionEngine { constructor (blockstore, network) { this.blockstore = blockstore this.network = network @@ -35,6 +36,7 @@ module.exports = class Engine { _sendBlock (env, cb) { const msg = new Message(false) + msg.addBlock(env.block, (err) => { if (err) { return cb(err) @@ -69,7 +71,7 @@ module.exports = class Engine { log('got task') pull( - this.blockstore.getStream(nextTask.entry.key), + this.blockstore.getStream(nextTask.entry.cid), pull.collect((err, blocks) => { const block = blocks[0] if (err || !block) { @@ -91,11 +93,12 @@ module.exports = class Engine { } wantlistForPeer (peerId) { - if (!this.ledgerMap.has(peerId.toB58String())) { + const peerIdStr = peerId.toB58String() + if (!this.ledgerMap.has(peerIdStr)) { return new Map() } - return this.ledgerMap.get(peerId.toB58String()).wantlist.sortedEntries() + return this.ledgerMap.get(peerIdStr).wantlist.sortedEntries() } peers () { @@ -138,41 +141,42 @@ module.exports = class Engine { }) } - receivedBlock (key) { - this._processBlock(key) + receivedBlock (cid) { + this._processBlock(cid) this._outbox() } - _processBlock (key) { + _processBlock (cid) { // Check all connected peers if they want the block we received for (let l of this.ledgerMap.values()) { - const entry = l.wantlistContains(key) + const entry = l.wantlistContains(cid) if (entry) { this.peerRequestQueue.push(entry, l.partner) } } } - _processWantlist (ledger, peerId, entry, cb) { + _processWantlist (ledger, peerId, entry, callback) { + const cidStr = entry.cid.toBaseEncodedString() if (entry.cancel) { - log('cancel %s', mh.toB58String(entry.key)) - ledger.cancelWant(entry.key) - this.peerRequestQueue.remove(entry.key, peerId) - setImmediate(() => cb()) + log('cancel %s', cidStr) + ledger.cancelWant(entry.cid) + this.peerRequestQueue.remove(entry.cid, peerId) + setImmediate(() => callback()) } else { - log('wants %s - %s', mh.toB58String(entry.key), entry.priority) - ledger.wants(entry.key, entry.priority) + log('wants %s - %s', cidStr, entry.priority) + ledger.wants(entry.cid, entry.priority) // If we already have the block, serve it - this.blockstore.has(entry.key, (err, exists) => { + this.blockstore.has(entry.cid, (err, exists) => { if (err) { - log('failed existence check %s', mh.toB58String(entry.key)) + log('failed existence check %s', cidStr) } else if (exists) { - log('has want %s', mh.toB58String(entry.key)) + log('has want %s', cidStr) this.peerRequestQueue.push(entry.entry, peerId) this._outbox() } - cb() + callback() }) } } @@ -183,10 +187,11 @@ module.exports = class Engine { if (err) { return cb(err) } - log('got block %s (%s bytes)', mh.toB58String(key), block.data.length) + const cid = new CID(key) + const cidStr = cid.toBaseEncodedString() + log('got block %s (%s bytes)', cidStr, block.data.length) ledger.receivedBytes(block.data.length) - - this.receivedBlock(key) + this.receivedBlock(cid) cb() }) }, callback) @@ -201,9 +206,10 @@ module.exports = class Engine { if (err) { return cb(err) } + const cid = new CID(key) - ledger.wantlist.remove(key) - this.peerRequestQueue.remove(key, peerId) + ledger.wantlist.remove(cid) + this.peerRequestQueue.remove(cid, peerId) cb() }) }, callback) @@ -223,16 +229,18 @@ module.exports = class Engine { // } // // TODO: figure out how to remove all other references - // in the peerrequestqueue + // in the peer request queue } _findOrCreate (peerId) { - if (this.ledgerMap.has(peerId.toB58String())) { - return this.ledgerMap.get(peerId.toB58String()) + const peerIdStr = peerId.toB58String() + if (this.ledgerMap.has(peerIdStr)) { + return this.ledgerMap.get(peerIdStr) } const l = new Ledger(peerId) - this.ledgerMap.set(peerId.toB58String(), l) + + this.ledgerMap.set(peerIdStr, l) return l } @@ -245,3 +253,5 @@ module.exports = class Engine { this._running = false } } + +module.exports = DecisionEngine diff --git a/src/components/decision/ledger.js b/src/components/decision-engine/ledger.js similarity index 100% rename from src/components/decision/ledger.js rename to src/components/decision-engine/ledger.js diff --git a/src/components/decision/peer-request-queue/active-partner.js b/src/components/decision-engine/peer-request-queue/active-partner.js similarity index 100% rename from src/components/decision/peer-request-queue/active-partner.js rename to src/components/decision-engine/peer-request-queue/active-partner.js diff --git a/src/components/decision/peer-request-queue/index.js b/src/components/decision-engine/peer-request-queue/index.js similarity index 100% rename from src/components/decision/peer-request-queue/index.js rename to src/components/decision-engine/peer-request-queue/index.js diff --git a/src/components/decision/peer-request-queue/peer-request-task.js b/src/components/decision-engine/peer-request-queue/peer-request-task.js similarity index 100% rename from src/components/decision/peer-request-queue/peer-request-task.js rename to src/components/decision-engine/peer-request-queue/peer-request-task.js diff --git a/src/components/decision/peer-request-queue/util.js b/src/components/decision-engine/peer-request-queue/util.js similarity index 100% rename from src/components/decision/peer-request-queue/util.js rename to src/components/decision-engine/peer-request-queue/util.js diff --git a/src/components/decision/priority-queue.js b/src/components/decision-engine/priority-queue.js similarity index 100% rename from src/components/decision/priority-queue.js rename to src/components/decision-engine/priority-queue.js diff --git a/src/components/decision/index.js b/src/components/decision/index.js deleted file mode 100644 index 9e6bb4ad..00000000 --- a/src/components/decision/index.js +++ /dev/null @@ -1,5 +0,0 @@ -'use strict' - -const Engine = require('./engine') - -exports.Engine = Engine diff --git a/src/index.js b/src/index.js index 7a0f66a6..203f23c1 100644 --- a/src/index.js +++ b/src/index.js @@ -16,7 +16,7 @@ const Block = require('ipfs-block') const CONSTANTS = require('./constants') const WantManager = require('./components/want-manager') const Network = require('./components/network') -const decision = require('./components/decision') +const DecisionEngine = require('./components/decision-engine') class Bitswap { constructor (id, libp2p, blockstore, peerBook) { @@ -29,7 +29,7 @@ class Bitswap { // local database this.blockstore = blockstore - this.engine = new decision.Engine(blockstore, this.network) + this.engine = new DecisionEngine(blockstore, this.network) // handle message sending this.wm = new WantManager(this.network) diff --git a/test/browser.js b/test/browser.js index 1a69bf94..f5b811c0 100644 --- a/test/browser.js +++ b/test/browser.js @@ -65,4 +65,4 @@ const repo = { } require('./index-test')(repo) -require('./components/decision/engine-test')(repo) +require('./components/decision-engine/index-test')(repo) diff --git a/test/components/decision/engine-test.js b/test/components/decision-engine/index-test.js similarity index 72% rename from test/components/decision/engine-test.js rename to test/components/decision-engine/index-test.js index 37789af2..19e5bfa0 100644 --- a/test/components/decision/engine-test.js +++ b/test/components/decision-engine/index-test.js @@ -12,33 +12,31 @@ const map = require('async/map') const eachSeries = require('async/eachSeries') const pull = require('pull-stream') const paramap = require('pull-paramap') +const CID = require('cids') const Message = require('../../../src/types/message') -const Engine = require('../../../src/components/decision/engine') +const DecisionEngine = require('../../../src/components/decision-engine') const mockNetwork = require('../../utils').mockNetwork module.exports = (repo) => { - function newEngine (id, done) { + function newEngine (path, done) { parallel([ - (cb) => repo.create(id, cb), + (cb) => repo.create(path, cb), (cb) => PeerId.create(cb) ], (err, results) => { if (err) { return done(err) } const blockstore = results[0].blockstore - const engine = new Engine(blockstore, mockNetwork()) + const engine = new DecisionEngine(blockstore, mockNetwork()) engine.start() - done(null, { - peer: results[1], - engine - }) + done(null, { peer: results[1], engine }) }) } - describe.skip('Engine', () => { + describe('Engine', () => { afterEach((done) => { repo.remove(done) }) @@ -61,10 +59,12 @@ module.exports = (repo) => { }), paramap((block, cb) => { const m = new Message(false) - m.addBlock(block, (err) => { + block.key((err, key) => { if (err) { return cb(err) } + const cid = new CID(key) + m.addBlock(cid, block) sender.engine.messageSent(receiver.peer, m) receiver.engine.messageReceived(sender.peer, m, cb) }) @@ -72,29 +72,17 @@ module.exports = (repo) => { pull.onEnd((err) => { expect(err).to.not.exist - expect( - sender.engine.numBytesSentTo(receiver.peer) - ).to.be.above( - 0 - ) - - expect( - sender.engine.numBytesSentTo(receiver.peer) - ).to.be.eql( - receiver.engine.numBytesReceivedFrom(sender.peer) - ) - - expect( - receiver.engine.numBytesSentTo(sender.peer) - ).to.be.eql( - 0 - ) - - expect( - sender.engine.numBytesReceivedFrom(receiver.peer) - ).to.be.eql( - 0 - ) + expect(sender.engine.numBytesSentTo(receiver.peer)) + .to.be.above(0) + + expect(sender.engine.numBytesSentTo(receiver.peer)) + .to.eql(receiver.engine.numBytesReceivedFrom(sender.peer)) + + expect(receiver.engine.numBytesSentTo(sender.peer)) + .to.eql(0) + + expect(sender.engine.numBytesReceivedFrom(receiver.peer)) + .to.eql(0) done() }) @@ -110,37 +98,27 @@ module.exports = (repo) => { expect(err).to.not.exist const sanfrancisco = res[0] - const seatlle = res[1] + const seattle = res[1] const m = new Message(true) - sanfrancisco.engine.messageSent(seatlle.peer, m) - seatlle.engine.messageReceived(sanfrancisco.peer, m, (err) => { + sanfrancisco.engine.messageSent(seattle.peer, m) + seattle.engine.messageReceived(sanfrancisco.peer, m, (err) => { expect(err).to.not.exist - expect( - seatlle.peer.toHexString() - ).to.not.be.eql( - sanfrancisco.peer.toHexString() - ) - - expect( - sanfrancisco.engine.peers() - ).to.include( - seatlle.peer - ) - - expect( - seatlle.engine.peers() - ).to.include( - sanfrancisco.peer - ) + expect(seattle.peer.toHexString()) + .to.not.eql(sanfrancisco.peer.toHexString()) + + expect(sanfrancisco.engine.peers()).to.include(seattle.peer) + + expect(seattle.engine.peers()) + .to.include(sanfrancisco.peer) done() }) }) }) - it('partner wants then cancels', (done) => { + it.skip('partner wants then cancels', (done) => { const numRounds = 10 const alphabet = 'abcdefghijklmnopqrstuvwxyz'.split('') const vowels = 'aeiou'.split('') @@ -160,7 +138,7 @@ module.exports = (repo) => { if (err) { return cb(err) } - cb(null, {data: block.data, key: key}) + cb(null, { data: block.data, key: key }) }) }), repo.blockstore.putStream(), @@ -173,7 +151,7 @@ module.exports = (repo) => { map(blocks, (b, cb) => b.key(cb), (err, keys) => { expect(err).to.not.exist blocks.forEach((b, i) => { - add.addEntry(keys[i], Math.pow(2, 32) - 1 - i) + add.addEntry(new CID(keys[i]), Math.pow(2, 32) - 1 - i) }) e.messageReceived(p, add, cb) @@ -185,7 +163,7 @@ module.exports = (repo) => { const blocks = keys.map((k) => new Block(k)) map(blocks, (b, cb) => b.key(cb), (err, keys) => { expect(err).to.not.exist - keys.forEach((k) => cancels.cancel(k)) + keys.forEach((k) => cancels.cancel(new CID(k))) e.messageReceived(p, cancels, cb) }) } @@ -206,11 +184,11 @@ module.exports = (repo) => { const network = mockNetwork(keeps.length, (res) => { const msgs = stringifyMessages(res.messages) - expect(msgs).to.be.eql(keeps) + expect(msgs).to.eql(keeps) innerCb() }) - const e = new Engine(repo.blockstore, network) + const e = new DecisionEngine(repo.blockstore, network) e.start() let partner series([ @@ -224,7 +202,7 @@ module.exports = (repo) => { (cb) => partnerWants(e, set, partner, cb), (cb) => partnerCancels(e, cancels, partner, cb) ], (err) => { - if (err) throw err + expect(err).to.not.exist }) }, cb) }, done) diff --git a/test/components/decision/ledger.spec.js b/test/components/decision-engine/ledger.spec.js similarity index 89% rename from test/components/decision/ledger.spec.js rename to test/components/decision-engine/ledger.spec.js index a0817bc4..11c2a89d 100644 --- a/test/components/decision/ledger.spec.js +++ b/test/components/decision-engine/ledger.spec.js @@ -4,7 +4,7 @@ const expect = require('chai').expect const PeerId = require('peer-id') -const Ledger = require('../../../src/components/decision/ledger') +const Ledger = require('../../../src/components/decision-engine/ledger') describe('Ledger', () => { let peerId diff --git a/test/components/decision/peer-request-queue.spec.js b/test/components/decision-engine/peer-request-queue.spec.js similarity index 99% rename from test/components/decision/peer-request-queue.spec.js rename to test/components/decision-engine/peer-request-queue.spec.js index 4f2b6712..ac34ee14 100644 --- a/test/components/decision/peer-request-queue.spec.js +++ b/test/components/decision-engine/peer-request-queue.spec.js @@ -14,7 +14,7 @@ const parallel = require('async/parallel') const CID = require('cids') const WantlistEntry = require('../../../src/types/wantlist').Entry -const PeerRequestQueue = require('../../../src/components/decision/peer-request-queue') +const PeerRequestQueue = require('../../../src/components/decision-engine/peer-request-queue') function getBlockCID (data, callback) { const block = new Block(data) diff --git a/test/components/decision/priority-queue.spec.js b/test/components/decision-engine/priority-queue.spec.js similarity index 94% rename from test/components/decision/priority-queue.spec.js rename to test/components/decision-engine/priority-queue.spec.js index 978bb636..72c73bdf 100644 --- a/test/components/decision/priority-queue.spec.js +++ b/test/components/decision-engine/priority-queue.spec.js @@ -3,7 +3,7 @@ const expect = require('chai').expect -const PriorityQueue = require('../../../src/components/decision/priority-queue') +const PriorityQueue = require('../../../src/components/decision-engine/priority-queue') describe('PriorityQueue', () => { it('sorts with a less operator', () => { diff --git a/test/node.js b/test/node.js index 5af2fa58..91ec1c08 100644 --- a/test/node.js +++ b/test/node.js @@ -38,6 +38,6 @@ const repo = { } require('./index-test')(repo) -require('./components/decision/engine-test')(repo) +require('./components/decision-engine/index-test')(repo) require('./components/network/network.node.js') require('./components/network/gen-bitswap-network.node.js')