Skip to content

Commit

Permalink
feat: decision engine migration to CID (just missing one test)
Browse files Browse the repository at this point in the history
  • Loading branch information
daviddias committed Dec 11, 2016
1 parent 89d922e commit fdac00f
Show file tree
Hide file tree
Showing 15 changed files with 83 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const whilst = require('async/whilst')
const setImmediate = require('async/setImmediate')
const each = require('async/each')
const debounce = require('lodash.debounce')
const CID = require('cids')

const log = debug('bitswap:engine')
log.error = debug('bitswap:engine:error')
Expand All @@ -16,7 +17,7 @@ const Wantlist = require('../../types/wantlist')
const PeerRequestQueue = require('./peer-request-queue')
const Ledger = require('./ledger')

module.exports = class Engine {
class DecisionEngine {
constructor (blockstore, network) {
this.blockstore = blockstore
this.network = network
Expand All @@ -35,6 +36,7 @@ module.exports = class Engine {

_sendBlock (env, cb) {
const msg = new Message(false)

msg.addBlock(env.block, (err) => {
if (err) {
return cb(err)
Expand Down Expand Up @@ -69,7 +71,7 @@ module.exports = class Engine {
log('got task')

pull(
this.blockstore.getStream(nextTask.entry.key),
this.blockstore.getStream(nextTask.entry.cid),
pull.collect((err, blocks) => {
const block = blocks[0]
if (err || !block) {
Expand All @@ -91,11 +93,12 @@ module.exports = class Engine {
}

wantlistForPeer (peerId) {
if (!this.ledgerMap.has(peerId.toB58String())) {
const peerIdStr = peerId.toB58String()
if (!this.ledgerMap.has(peerIdStr)) {
return new Map()
}

return this.ledgerMap.get(peerId.toB58String()).wantlist.sortedEntries()
return this.ledgerMap.get(peerIdStr).wantlist.sortedEntries()
}

peers () {
Expand Down Expand Up @@ -138,41 +141,42 @@ module.exports = class Engine {
})
}

receivedBlock (key) {
this._processBlock(key)
receivedBlock (cid) {
this._processBlock(cid)
this._outbox()
}

_processBlock (key) {
_processBlock (cid) {
// Check all connected peers if they want the block we received
for (let l of this.ledgerMap.values()) {
const entry = l.wantlistContains(key)
const entry = l.wantlistContains(cid)
if (entry) {
this.peerRequestQueue.push(entry, l.partner)
}
}
}

_processWantlist (ledger, peerId, entry, cb) {
_processWantlist (ledger, peerId, entry, callback) {
const cidStr = entry.cid.toBaseEncodedString()
if (entry.cancel) {
log('cancel %s', mh.toB58String(entry.key))
ledger.cancelWant(entry.key)
this.peerRequestQueue.remove(entry.key, peerId)
setImmediate(() => cb())
log('cancel %s', cidStr)
ledger.cancelWant(entry.cid)
this.peerRequestQueue.remove(entry.cid, peerId)
setImmediate(() => callback())
} else {
log('wants %s - %s', mh.toB58String(entry.key), entry.priority)
ledger.wants(entry.key, entry.priority)
log('wants %s - %s', cidStr, entry.priority)
ledger.wants(entry.cid, entry.priority)

// If we already have the block, serve it
this.blockstore.has(entry.key, (err, exists) => {
this.blockstore.has(entry.cid, (err, exists) => {
if (err) {
log('failed existence check %s', mh.toB58String(entry.key))
log('failed existence check %s', cidStr)
} else if (exists) {
log('has want %s', mh.toB58String(entry.key))
log('has want %s', cidStr)
this.peerRequestQueue.push(entry.entry, peerId)
this._outbox()
}
cb()
callback()
})
}
}
Expand All @@ -183,10 +187,11 @@ module.exports = class Engine {
if (err) {
return cb(err)
}
log('got block %s (%s bytes)', mh.toB58String(key), block.data.length)
const cid = new CID(key)
const cidStr = cid.toBaseEncodedString()
log('got block %s (%s bytes)', cidStr, block.data.length)
ledger.receivedBytes(block.data.length)

this.receivedBlock(key)
this.receivedBlock(cid)
cb()
})
}, callback)
Expand All @@ -201,9 +206,10 @@ module.exports = class Engine {
if (err) {
return cb(err)
}
const cid = new CID(key)

ledger.wantlist.remove(key)
this.peerRequestQueue.remove(key, peerId)
ledger.wantlist.remove(cid)
this.peerRequestQueue.remove(cid, peerId)
cb()
})
}, callback)
Expand All @@ -223,16 +229,18 @@ module.exports = class Engine {
// }
//
// TODO: figure out how to remove all other references
// in the peerrequestqueue
// in the peer request queue
}

_findOrCreate (peerId) {
if (this.ledgerMap.has(peerId.toB58String())) {
return this.ledgerMap.get(peerId.toB58String())
const peerIdStr = peerId.toB58String()
if (this.ledgerMap.has(peerIdStr)) {
return this.ledgerMap.get(peerIdStr)
}

const l = new Ledger(peerId)
this.ledgerMap.set(peerId.toB58String(), l)

this.ledgerMap.set(peerIdStr, l)

return l
}
Expand All @@ -245,3 +253,5 @@ module.exports = class Engine {
this._running = false
}
}

module.exports = DecisionEngine
File renamed without changes.
File renamed without changes.
5 changes: 0 additions & 5 deletions src/components/decision/index.js

This file was deleted.

4 changes: 2 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const Block = require('ipfs-block')
const CONSTANTS = require('./constants')
const WantManager = require('./components/want-manager')
const Network = require('./components/network')
const decision = require('./components/decision')
const DecisionEngine = require('./components/decision-engine')

class Bitswap {
constructor (id, libp2p, blockstore, peerBook) {
Expand All @@ -29,7 +29,7 @@ class Bitswap {
// local database
this.blockstore = blockstore

this.engine = new decision.Engine(blockstore, this.network)
this.engine = new DecisionEngine(blockstore, this.network)

// handle message sending
this.wm = new WantManager(this.network)
Expand Down
2 changes: 1 addition & 1 deletion test/browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ const repo = {
}

require('./index-test')(repo)
require('./components/decision/engine-test')(repo)
require('./components/decision-engine/index-test')(repo)
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,31 @@ const map = require('async/map')
const eachSeries = require('async/eachSeries')
const pull = require('pull-stream')
const paramap = require('pull-paramap')
const CID = require('cids')

const Message = require('../../../src/types/message')
const Engine = require('../../../src/components/decision/engine')
const DecisionEngine = require('../../../src/components/decision-engine')

const mockNetwork = require('../../utils').mockNetwork

module.exports = (repo) => {
function newEngine (id, done) {
function newEngine (path, done) {
parallel([
(cb) => repo.create(id, cb),
(cb) => repo.create(path, cb),
(cb) => PeerId.create(cb)
], (err, results) => {
if (err) {
return done(err)
}
const blockstore = results[0].blockstore
const engine = new Engine(blockstore, mockNetwork())
const engine = new DecisionEngine(blockstore, mockNetwork())
engine.start()

done(null, {
peer: results[1],
engine
})
done(null, { peer: results[1], engine })
})
}

describe.skip('Engine', () => {
describe('Engine', () => {
afterEach((done) => {
repo.remove(done)
})
Expand All @@ -61,40 +59,30 @@ module.exports = (repo) => {
}),
paramap((block, cb) => {
const m = new Message(false)
m.addBlock(block, (err) => {
block.key((err, key) => {
if (err) {
return cb(err)
}
const cid = new CID(key)
m.addBlock(cid, block)
sender.engine.messageSent(receiver.peer, m)
receiver.engine.messageReceived(sender.peer, m, cb)
})
}, 100),
pull.onEnd((err) => {
expect(err).to.not.exist

expect(
sender.engine.numBytesSentTo(receiver.peer)
).to.be.above(
0
)

expect(
sender.engine.numBytesSentTo(receiver.peer)
).to.be.eql(
receiver.engine.numBytesReceivedFrom(sender.peer)
)

expect(
receiver.engine.numBytesSentTo(sender.peer)
).to.be.eql(
0
)

expect(
sender.engine.numBytesReceivedFrom(receiver.peer)
).to.be.eql(
0
)
expect(sender.engine.numBytesSentTo(receiver.peer))
.to.be.above(0)

expect(sender.engine.numBytesSentTo(receiver.peer))
.to.eql(receiver.engine.numBytesReceivedFrom(sender.peer))

expect(receiver.engine.numBytesSentTo(sender.peer))
.to.eql(0)

expect(sender.engine.numBytesReceivedFrom(receiver.peer))
.to.eql(0)

done()
})
Expand All @@ -110,37 +98,27 @@ module.exports = (repo) => {
expect(err).to.not.exist

const sanfrancisco = res[0]
const seatlle = res[1]
const seattle = res[1]

const m = new Message(true)

sanfrancisco.engine.messageSent(seatlle.peer, m)
seatlle.engine.messageReceived(sanfrancisco.peer, m, (err) => {
sanfrancisco.engine.messageSent(seattle.peer, m)
seattle.engine.messageReceived(sanfrancisco.peer, m, (err) => {
expect(err).to.not.exist

expect(
seatlle.peer.toHexString()
).to.not.be.eql(
sanfrancisco.peer.toHexString()
)

expect(
sanfrancisco.engine.peers()
).to.include(
seatlle.peer
)

expect(
seatlle.engine.peers()
).to.include(
sanfrancisco.peer
)
expect(seattle.peer.toHexString())
.to.not.eql(sanfrancisco.peer.toHexString())

expect(sanfrancisco.engine.peers()).to.include(seattle.peer)

expect(seattle.engine.peers())
.to.include(sanfrancisco.peer)
done()
})
})
})

it('partner wants then cancels', (done) => {
it.skip('partner wants then cancels', (done) => {
const numRounds = 10
const alphabet = 'abcdefghijklmnopqrstuvwxyz'.split('')
const vowels = 'aeiou'.split('')
Expand All @@ -160,7 +138,7 @@ module.exports = (repo) => {
if (err) {
return cb(err)
}
cb(null, {data: block.data, key: key})
cb(null, { data: block.data, key: key })
})
}),
repo.blockstore.putStream(),
Expand All @@ -173,7 +151,7 @@ module.exports = (repo) => {
map(blocks, (b, cb) => b.key(cb), (err, keys) => {
expect(err).to.not.exist
blocks.forEach((b, i) => {
add.addEntry(keys[i], Math.pow(2, 32) - 1 - i)
add.addEntry(new CID(keys[i]), Math.pow(2, 32) - 1 - i)
})

e.messageReceived(p, add, cb)
Expand All @@ -185,7 +163,7 @@ module.exports = (repo) => {
const blocks = keys.map((k) => new Block(k))
map(blocks, (b, cb) => b.key(cb), (err, keys) => {
expect(err).to.not.exist
keys.forEach((k) => cancels.cancel(k))
keys.forEach((k) => cancels.cancel(new CID(k)))
e.messageReceived(p, cancels, cb)
})
}
Expand All @@ -206,11 +184,11 @@ module.exports = (repo) => {

const network = mockNetwork(keeps.length, (res) => {
const msgs = stringifyMessages(res.messages)
expect(msgs).to.be.eql(keeps)
expect(msgs).to.eql(keeps)
innerCb()
})

const e = new Engine(repo.blockstore, network)
const e = new DecisionEngine(repo.blockstore, network)
e.start()
let partner
series([
Expand All @@ -224,7 +202,7 @@ module.exports = (repo) => {
(cb) => partnerWants(e, set, partner, cb),
(cb) => partnerCancels(e, cancels, partner, cb)
], (err) => {
if (err) throw err
expect(err).to.not.exist
})
}, cb)
}, done)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
const expect = require('chai').expect
const PeerId = require('peer-id')

const Ledger = require('../../../src/components/decision/ledger')
const Ledger = require('../../../src/components/decision-engine/ledger')

describe('Ledger', () => {
let peerId
Expand Down
Loading

0 comments on commit fdac00f

Please sign in to comment.