Skip to content

Commit

Permalink
fix: write peer stream messages atomically (#484)
Browse files Browse the repository at this point in the history
* deps: update @libp2p/crypto, multiformats, uint8arrays, etc

Updates all deps to reduce duplication in web bundles.

* fix: use it-merge to allow stream to apply backpressure

* chore: deps again

* fix: guarantee single write of length prefixed data

* chore: remove double-pipe
  • Loading branch information
achingbrain committed Jan 30, 2024
1 parent 986ff6c commit cc4ff3b
Show file tree
Hide file tree
Showing 10 changed files with 4,334 additions and 5,217 deletions.
6 changes: 6 additions & 0 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/** @type {import('aegir').PartialOptions} */
export default {
build: {
bundlesizeMax: '85KB'
}
}
3 changes: 0 additions & 3 deletions .mocharc.yaml

This file was deleted.

9,458 changes: 4,292 additions & 5,166 deletions package-lock.json

Large diffs are not rendered by default.

50 changes: 25 additions & 25 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,42 +72,41 @@
},
"homepage": "https://github.com/ChainSafe/js-libp2p-gossipsub#readme",
"dependencies": {
"@libp2p/crypto": "^3.0.1",
"@libp2p/interface": "^1.0.1",
"@libp2p/interface-internal": "^1.0.1",
"@libp2p/peer-id": "^4.0.1",
"@libp2p/pubsub": "^9.0.0",
"@multiformats/multiaddr": "^12.1.3",
"@libp2p/crypto": "^4.0.1",
"@libp2p/interface": "^1.1.2",
"@libp2p/interface-internal": "^1.0.7",
"@libp2p/peer-id": "^4.0.5",
"@libp2p/pubsub": "^9.0.8",
"@multiformats/multiaddr": "^12.1.14",
"abortable-iterator": "^5.0.1",
"denque": "^2.1.0",
"it-length-prefixed": "^9.0.1",
"it-length-prefixed": "^9.0.4",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.0",
"multiformats": "^12.0.1",
"protobufjs": "^7.2.4",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.4"
"it-pushable": "^3.2.3",
"multiformats": "^13.0.1",
"protobufjs": "^7.2.6",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.0.1"
},
"devDependencies": {
"@chainsafe/as-sha256": "^0.2.4",
"@chainsafe/as-sha256": "^0.4.1",
"@dapplion/benchmark": "^0.2.4",
"@libp2p/floodsub": "^9.0.0",
"@libp2p/interface-compliance-tests": "^5.0.2",
"@libp2p/logger": "^4.0.1",
"@libp2p/peer-id-factory": "^4.0.0",
"@libp2p/peer-store": "^10.0.0",
"@types/node": "^17.0.21",
"aegir": "^41.0.0",
"datastore-core": "^9.1.1",
"@libp2p/floodsub": "^9.0.9",
"@libp2p/interface-compliance-tests": "^5.2.0",
"@libp2p/logger": "^4.0.5",
"@libp2p/peer-id-factory": "^4.0.5",
"@libp2p/peer-store": "^10.0.8",
"@types/node": "^20.11.6",
"aegir": "^42.2.2",
"datastore-core": "^9.2.7",
"delay": "^6.0.0",
"mkdirp": "^3.0.1",
"p-defer": "^4.0.0",
"p-event": "^6.0.0",
"p-retry": "^5.1.2",
"p-retry": "^6.2.0",
"p-wait-for": "^5.0.2",
"sinon": "^15.1.2",
"sinon": "^17.0.1",
"time-cache": "^0.3.0",
"ts-node": "^10.7.0",
"ts-sinon": "^2.0.2"
},
"engines": {
Expand Down Expand Up @@ -137,5 +136,6 @@
"Hugo Dias <hugomrdias@gmail.com>",
"Franck Royer <franck@royer.one>",
"ChainSafe <superadmin@chainsafe.io>"
]
],
"sideEffects": false
}
3 changes: 1 addition & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ import type {
ComponentLogger,
Topology
} from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar'
import type { ConnectionManager, IncomingStreamData, Registrar } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Uint8ArrayList } from 'uint8arraylist'

Expand Down
18 changes: 5 additions & 13 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,19 @@ interface InboundStreamOpts {
}

export class OutboundStream {
private readonly pushable: Pushable<Uint8Array>
private readonly lpPushable: Pushable<Uint8ArrayList>
private readonly pushable: Pushable<Uint8Array | Uint8ArrayList>
private readonly closeController: AbortController
private readonly maxBufferSize: number

constructor (private readonly rawStream: Stream, errCallback: (e: Error) => void, opts: OutboundStreamOpts) {
this.pushable = pushable({ objectMode: false })
this.lpPushable = pushable({ objectMode: false })
this.pushable = pushable()
this.closeController = new AbortController()
this.maxBufferSize = opts.maxBufferSize ?? Infinity

pipe(
abortableSource(this.pushable, this.closeController.signal, { returnOnAbort: true }),
(source) => encode(source),
this.rawStream
).catch(errCallback)

pipe(abortableSource(this.lpPushable, this.closeController.signal, { returnOnAbort: true }), this.rawStream).catch(
errCallback
)
}

get protocol (): string {
Expand All @@ -49,24 +42,23 @@ export class OutboundStream {
throw Error(`OutboundStream buffer full, size > ${this.maxBufferSize}`)
}

this.pushable.push(data)
this.pushable.push(encode.single(data))
}

/**
* Same to push() but this is prefixed data so no need to encode length prefixed again
*/
pushPrefixed (data: Uint8ArrayList): void {
if (this.lpPushable.readableLength > this.maxBufferSize) {
if (this.pushable.readableLength > this.maxBufferSize) {
throw Error(`OutboundStream buffer full, size > ${this.maxBufferSize}`)
}
this.lpPushable.push(data)
this.pushable.push(data)
}

async close (): Promise<void> {
this.closeController.abort()
// similar to pushable.end() but clear the internal buffer
await this.pushable.return()
await this.lpPushable.return()
await this.rawStream.close()
}
}
Expand Down
3 changes: 1 addition & 2 deletions test/accept-from.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import { stubInterface } from 'ts-sinon'
import { GossipSub } from '../src/index.js'
import { createPeerId } from './utils/index.js'
import { fastMsgIdFn } from './utils/msgId.js'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { Registrar } from '@libp2p/interface-internal/registrar'
import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'

const peerA = '16Uiu2HAmMkH6ZLen2tbhiuNCTZLLvrZaDgufNdT5MPjtC9Hr9YNA'

Expand Down
3 changes: 1 addition & 2 deletions test/gossip.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import { GossipsubDhi } from '../src/constants.js'
import { GossipSub } from '../src/index.js'
import { connectAllPubSubNodes, createComponentsArray, type GossipSubAndComponents } from './utils/create-pubsub.js'
import type { PeerStore } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { Registrar } from '@libp2p/interface-internal/registrar'
import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'

describe('gossip', () => {
let nodes: GossipSubAndComponents[]
Expand Down
2 changes: 1 addition & 1 deletion test/utils/create-pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { MemoryDatastore } from 'datastore-core'
import { stubInterface } from 'ts-sinon'
import { GossipSub, type GossipSubComponents, type GossipsubOpts } from '../../src/index.js'
import type { TypedEventTarget, Libp2pEvents, PubSub } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { ConnectionManager } from '@libp2p/interface-internal'

export interface CreateComponentsOpts {
init?: Partial<GossipsubOpts>
Expand Down
5 changes: 2 additions & 3 deletions test/utils/msgId.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import SHA256 from '@chainsafe/as-sha256'
import { digest } from '@chainsafe/as-sha256'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { messageIdToString } from '../../src/utils/messageIdToString.js'
import type { RPC } from '../../src/message/rpc.js'
Expand All @@ -16,5 +16,4 @@ export const getMsgIdStr = (msg: RPC.IMessage): string => messageIdToString(getM

export const fastMsgIdFn = (msg: RPC.IMessage): string =>
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-expect-error @chainsafe/as-sha256 types are wrong
msg.data != null ? messageIdToString(SHA256.default.digest(msg.data)) : '0'
msg.data != null ? messageIdToString(digest(msg.data)) : '0'

0 comments on commit cc4ff3b

Please sign in to comment.