Skip to content

Commit

Permalink
fix: replace node buffers with uint8arrays (#41)
Browse files Browse the repository at this point in the history
* fix: replace node streams with uint8arrays

BREAKING CHANGES:

- All deps of this module now use uint8arrays in place of node buffers

* chore: bump deps

Co-authored-by: Jacob Heun <jacobheun@gmail.com>
  • Loading branch information
achingbrain and jacobheun committed Aug 23, 2020
1 parent 62dd4cf commit cd009d5
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 69 deletions.
37 changes: 19 additions & 18 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,41 +37,42 @@
],
"license": "MIT",
"devDependencies": {
"aegir": "^22.0.0",
"aegir": "^26.0.0",
"chai": "^4.2.0",
"chai-bytes": "^0.1.2",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"it-pair": "^1.0.0",
"mocha": "^7.0.1",
"multihashes": "^0.4.15",
"mocha": "^8.1.1",
"multihashes": "^3.0.1",
"p-defer": "^3.0.0",
"sinon": "^9.0.0"
},
"dependencies": {
"cids": "^0.8.0",
"cids": "^1.0.0",
"debug": "^4.1.1",
"it-buffer": "^0.1.1",
"it-handshake": "^1.0.1",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.1.0",
"it-pushable": "^1.4.0",
"libp2p": "^0.28.0",
"libp2p-bootstrap": "^0.11.0",
"libp2p-floodsub": "^0.21.3",
"libp2p-gossipsub": "^0.4.5",
"libp2p-kad-dht": "^0.19.5",
"libp2p-mplex": "^0.9.3",
"libp2p-noise": "^1.0.0",
"libp2p-secio": "^0.12.2",
"libp2p-tcp": "^0.14.3",
"libp2p-websockets": "^0.13.2",
"multiaddr": "^7.1.0",
"peer-id": "^0.13.3",
"libp2p": "libp2p/js-libp2p#16a894d7e9ffb63b3c4f7f9929f82864618e0a5c",
"libp2p-bootstrap": "^0.12.1",
"libp2p-floodsub": "^0.22.0",
"libp2p-gossipsub": "^0.5.0",
"libp2p-kad-dht": "^0.20.1",
"libp2p-mplex": "^0.10.0",
"libp2p-noise": "^2.0.0",
"libp2p-secio": "^0.13.1",
"libp2p-tcp": "^0.15.1",
"libp2p-websockets": "^0.14.0",
"multiaddr": "^8.0.0",
"peer-id": "^0.14.0",
"promisify-es6": "^1.0.3",
"protons": "^1.1.0",
"protons": "^2.0.0",
"stream-to-it": "^0.2.0",
"streaming-iterables": "^4.1.1",
"streaming-iterables": "^5.0.2",
"uint8arrays": "^1.1.0",
"yargs": "^15.0.2",
"yargs-promise": "^1.1.0"
},
Expand Down
28 changes: 14 additions & 14 deletions src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const pipe = require('it-pipe')
const pushable = require('it-pushable')
const StreamHandler = require('./stream-handler')
const { concat } = require('streaming-iterables')
const uint8ArrayFromString = require('uint8arrays/from-string')
const uint8ArrayToString = require('uint8arrays/to-string')
const { passThroughUpgrader } = require('./util')
const {
Request,
Expand Down Expand Up @@ -80,15 +82,15 @@ class Daemon {
async openStream (request) {
const { peer, proto } = request.streamOpen

const peerId = PeerId.createFromB58String(peer.toString())
const peerId = PeerId.createFromB58String(uint8ArrayToString(peer, 'base58btc'))

const connection = this.libp2p.connectionManager.get(peerId)
const { stream, protocol } = await connection.newStream(proto)

return {
streamInfo: {
peer: peerId.toBytes(),
addr: connection.remoteAddr.buffer,
addr: connection.remoteAddr.bytes,
proto: protocol
},
connection: stream
Expand Down Expand Up @@ -119,7 +121,7 @@ class Daemon {
this.libp2p.handle(proto, ({ connection, stream, protocol }) => {
const message = StreamInfo.encode({
peer: connection.remotePeer.toBytes(),
addr: connection.remoteAddr.buffer,
addr: connection.remoteAddr.bytes,
proto: protocol
})
const encodedMessage = lp.encode.single(message)
Expand Down Expand Up @@ -226,7 +228,7 @@ class Daemon {

await daemon.libp2p.pubsub.subscribe(topic, (msg) => {
onMessage.push(PSMessage.encode({
from: msg.from && Buffer.from(msg.from),
from: msg.from && uint8ArrayFromString(msg.from),
data: msg.data,
seqno: msg.seqno,
topicIDs: msg.topicIDs,
Expand Down Expand Up @@ -276,7 +278,7 @@ class Daemon {
type: DHTResponse.Type.VALUE,
peer: {
id: peer.id.toBytes(),
addrs: peer.multiaddrs.map(m => m.buffer)
addrs: peer.multiaddrs.map(m => m.bytes)
}
}
})
Expand Down Expand Up @@ -305,7 +307,7 @@ class Daemon {
type: DHTResponse.Type.VALUE,
peer: {
id: provider.id.toBytes(),
addrs: (provider.multiaddrs || []).map(m => m.buffer)
addrs: (provider.multiaddrs || []).map(m => m.bytes)
}
})
}
Expand All @@ -330,10 +332,10 @@ class Daemon {
}
})

for await (const peerId of daemon.libp2p._dht.getClosestPeers(Buffer.from(dht.key))) {
for await (const peerId of daemon.libp2p._dht.getClosestPeers(dht.key)) {
yield DHTResponse.encode({
type: DHTResponse.Type.VALUE,
value: peerId.toB58String()
value: peerId.toBytes()
})
}

Expand All @@ -354,9 +356,7 @@ class Daemon {
},
[DHTRequest.Type.GET_VALUE]: async function * (daemon) {
try {
const value = await daemon.libp2p.contentRouting.get(
Buffer.from(dht.key)
)
const value = await daemon.libp2p.contentRouting.get(dht.key)
yield OkResponse({
dht: {
type: DHTResponse.Type.VALUE,
Expand All @@ -369,7 +369,7 @@ class Daemon {
},
[DHTRequest.Type.PUT_VALUE]: async function * (daemon) {
await daemon.libp2p.contentRouting.put(
Buffer.from(dht.key),
dht.key,
dht.value
)

Expand Down Expand Up @@ -425,7 +425,7 @@ class Daemon {
yield OkResponse({
identify: {
id: daemon.libp2p.peerId.toBytes(),
addrs: daemon.libp2p.multiaddrs.map(m => m.buffer)
addrs: daemon.libp2p.multiaddrs.map(m => m.bytes)
}
})
break
Expand All @@ -439,7 +439,7 @@ class Daemon {

return {
id: peer.id.toBytes(),
addrs: [addr ? addr.buffer : null]
addrs: [addr ? addr.bytes : null]
}
})
yield OkResponse({ peers })
Expand Down
1 change: 0 additions & 1 deletion src/stream-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class StreamHandler {
if (msg.value) {
return msg.value.slice()
}

log('read received no value, closing stream')
// End the stream, we didn't get data
this.close()
Expand Down
6 changes: 3 additions & 3 deletions test/daemon/core.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ describe('core features', () => {
const request = {
type: Request.Type.CONNECT,
connect: {
peer: Buffer.from(libp2pPeer.peerId.toBytes()),
addrs: libp2pPeer.multiaddrs.map(addr => addr.buffer)
peer: libp2pPeer.peerId.toBytes(),
addrs: libp2pPeer.multiaddrs.map(addr => addr.bytes)
},
streamOpen: null,
streamHandler: null,
Expand Down Expand Up @@ -151,7 +151,7 @@ describe('core features', () => {

expect(response.identify).to.eql({
id: daemon.libp2p.peerId.toBytes(),
addrs: daemon.libp2p.multiaddrs.map(m => m.buffer)
addrs: daemon.libp2p.multiaddrs.map(m => m.bytes)
})
streamHandler.close()
})
Expand Down
32 changes: 17 additions & 15 deletions test/daemon/dht.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const CID = require('cids')
const ma = require('multiaddr')
const delay = require('delay')
const PeerId = require('peer-id')
const uint8ArrayFromString = require('uint8arrays/from-string')
const uint8ArrayToString = require('uint8arrays/to-string')

const StreamHandler = require('../../src/stream-handler')
const { createDaemon } = require('../../src/daemon')
Expand Down Expand Up @@ -112,7 +114,7 @@ describe('dht', () => {
type: DHTResponse.Type.VALUE,
peer: {
id: libp2pPeer.peerId.toBytes(),
addrs: libp2pPeer.multiaddrs.map(m => m.buffer)
addrs: libp2pPeer.multiaddrs.map(m => m.bytes)
},
value: null
})
Expand Down Expand Up @@ -160,7 +162,7 @@ describe('dht', () => {
streamHandler: null,
dht: {
type: DHTRequest.Type.PROVIDE,
cid: cid.buffer
cid: cid.bytes
},
disconnect: null,
pubsub: null,
Expand Down Expand Up @@ -199,7 +201,7 @@ describe('dht', () => {
streamHandler: null,
dht: {
type: DHTRequest.Type.FIND_PROVIDERS,
cid: cid.buffer,
cid: cid.bytes,
count: 1
},
disconnect: null,
Expand All @@ -224,7 +226,7 @@ describe('dht', () => {
expect(response.type).to.eql(DHTResponse.Type.VALUE)
expect(response.peer).to.eql({
id: libp2pPeer.peerId.toBytes(),
addrs: libp2pPeer.multiaddrs.map(m => m.buffer)
addrs: libp2pPeer.multiaddrs.map(m => m.bytes)
})
},
(message) => {
Expand Down Expand Up @@ -258,7 +260,7 @@ describe('dht', () => {
streamHandler: null,
dht: {
type: DHTRequest.Type.FIND_PROVIDERS,
cid: cid.buffer,
cid: cid.bytes,
count: 1
},
disconnect: null,
Expand Down Expand Up @@ -297,7 +299,7 @@ describe('dht', () => {
streamHandler: null,
dht: {
type: DHTRequest.Type.GET_CLOSEST_PEERS,
key: 'foobar'
key: uint8ArrayFromString('foobar')
},
disconnect: null,
pubsub: null,
Expand All @@ -319,7 +321,7 @@ describe('dht', () => {
(message) => {
const response = DHTResponse.decode(message)
expect(response.type).to.eql(DHTResponse.Type.VALUE)
expect(response.value.toString()).to.eql(libp2pPeer.peerId.toB58String())
expect(uint8ArrayToString(response.value, 'base58btc')).to.eql(libp2pPeer.peerId.toB58String())
},
(message) => {
const response = DHTResponse.decode(message)
Expand Down Expand Up @@ -373,7 +375,7 @@ describe('dht', () => {
const maConn = await client.connect()
const streamHandler = new StreamHandler({ stream: maConn })

await libp2pPeer.contentRouting.put(Buffer.from('/hello'), Buffer.from('world'))
await libp2pPeer.contentRouting.put(uint8ArrayFromString('/hello'), uint8ArrayFromString('world'))

const request = {
type: Request.Type.DHT,
Expand All @@ -382,7 +384,7 @@ describe('dht', () => {
streamHandler: null,
dht: {
type: DHTRequest.Type.GET_VALUE,
key: '/hello'
key: uint8ArrayFromString('/hello')
},
disconnect: null,
pubsub: null,
Expand All @@ -396,7 +398,7 @@ describe('dht', () => {
expect(response.dht).to.eql({
type: DHTResponse.Type.VALUE,
peer: null,
value: Buffer.from('world')
value: uint8ArrayFromString('world')
})
streamHandler.close()
})
Expand All @@ -414,7 +416,7 @@ describe('dht', () => {
streamHandler: null,
dht: {
type: DHTRequest.Type.GET_VALUE,
key: '/v/doesntexist'
key: uint8ArrayFromString('/v/doesntexist')
},
disconnect: null,
pubsub: null,
Expand All @@ -441,8 +443,8 @@ describe('dht', () => {
streamHandler: null,
dht: {
type: DHTRequest.Type.PUT_VALUE,
key: '/hello2',
value: Buffer.from('world2')
key: uint8ArrayFromString('/hello2'),
value: uint8ArrayFromString('world2')
},
disconnect: null,
pubsub: null,
Expand All @@ -456,7 +458,7 @@ describe('dht', () => {
expect(response.dht).to.eql(null)
streamHandler.close()

const value = await libp2pPeer.contentRouting.get(Buffer.from('/hello2'))
expect(value).to.eql(Buffer.from('world2'))
const value = await libp2pPeer.contentRouting.get(uint8ArrayFromString('/hello2'))
expect(value).to.eql(uint8ArrayFromString('world2'))
})
})
4 changes: 2 additions & 2 deletions test/daemon/peerstore.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ describe('peerstore features', () => {
type: Request.Type.PEERSTORE,
peerStore: {
type: PeerstoreRequest.Type.GET_PROTOCOLS,
id: Buffer.from(libp2pPeer.peerId.toBytes())
id: libp2pPeer.peerId.toBytes()
}
}

Expand Down Expand Up @@ -121,7 +121,7 @@ describe('peerstore features', () => {
type: Request.Type.PEERSTORE,
peerStore: {
type: PeerstoreRequest.Type.GET_PEER_INFO,
id: Buffer.from(libp2pPeer.peerId.toBytes())
id: libp2pPeer.peerId.toBytes()
}
}

Expand Down
5 changes: 3 additions & 2 deletions test/daemon/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const lp = require('it-length-prefixed')
const pDefer = require('p-defer')
const toBuffer = require('it-buffer')
const pushable = require('it-pushable')
const uint8ArrayFromString = require('uint8arrays/from-string')

const StreamHandler = require('../../src/stream-handler')
const { createDaemon } = require('../../src/daemon')
Expand Down Expand Up @@ -181,7 +182,7 @@ const testPubsub = (router) => {
this.timeout(10e3)

const topic = 'test-topic'
const data = Buffer.from('test-data')
const data = uint8ArrayFromString('test-data')
const deferred = pDefer()

client = new Client(daemonAddr)
Expand Down Expand Up @@ -231,7 +232,7 @@ const testPubsub = (router) => {

it('should be able to receive messages from subscribed topics', async function () {
const topic = 'test-topic'
const data = Buffer.from('test-data')
const data = uint8ArrayFromString('test-data')

client = new Client(daemonAddr)

Expand Down
Loading

0 comments on commit cd009d5

Please sign in to comment.