From f1f46dcb5346c7e0bc51471bbef74486ba2f1600 Mon Sep 17 00:00:00 2001 From: Chinmay Kousik Date: Mon, 31 Oct 2022 23:06:03 +0530 Subject: [PATCH 1/5] Add message length --- package.json | 4 +- src/stream.ts | 147 ++++++++++++++++++++++----------- src/transport.ts | 2 +- test/transport.browser.spec.ts | 1 + 4 files changed, 106 insertions(+), 48 deletions(-) diff --git a/package.json b/package.json index ee1c7281a7..f818c65fe9 100644 --- a/package.json +++ b/package.json @@ -63,6 +63,7 @@ "@libp2p/interface-mocks": "^7.0.2", "@libp2p/peer-id-factory": "^1.0.19", "@types/uuid": "^8.3.4", + "@types/varint": "^6.0.0", "@typescript-eslint/parser": "^5.32.0", "aegir": "^37.4.6", "chai-bytes": "^0.1.2", @@ -97,6 +98,7 @@ "p-defer": "^4.0.0", "socket.io-client": "^4.1.2", "timeout-abort-controller": "^3.0.0", - "uuid": "^9.0.0" + "uuid": "^9.0.0", + "varint": "^6.0.0" } } diff --git a/src/stream.ts b/src/stream.ts index bd8cde476b..935ac33006 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,13 +1,14 @@ -import { Stream, StreamStat, Direction } from '@libp2p/interface-connection'; -import { Source } from 'it-stream-types'; -import { Sink } from 'it-stream-types'; -import { pushable, Pushable } from 'it-pushable'; -import defer, { DeferredPromise } from 'p-defer'; +import {Stream, StreamStat, Direction} from '@libp2p/interface-connection'; +import {Source} from 'it-stream-types'; +import {Sink} from 'it-stream-types'; +import {pushable, Pushable} from 'it-pushable'; +import defer, {DeferredPromise} from 'p-defer'; import merge from 'it-merge'; -import { Uint8ArrayList } from 'uint8arraylist'; -import { fromString } from 'uint8arrays/from-string'; -import { logger } from '@libp2p/logger'; +import {Uint8ArrayList} from 'uint8arraylist'; +import {logger} from '@libp2p/logger'; import * as pb from '../proto_ts/message.js'; +import {concat} from 'uint8arrays/concat'; +import * as varint from 'varint'; const log = logger('libp2p:webrtc:stream'); @@ -56,6 +57,9 @@ export class WebRTCStream implements Stream { closed: boolean = false; closeCb?: (stream: WebRTCStream) => void | undefined + // read state + messageState: MessageState = new MessageState(); + // testing constructor(opts: StreamInitOpts) { @@ -88,39 +92,34 @@ export class WebRTCStream implements Stream { this.opened.resolve(); }; - this.channel.onmessage = async ({ data }) => { + this.channel.onmessage = async ({data}) => { - let res: Uint8Array; - if (typeof data == 'string') { - res = fromString(data); - } else { - res = new Uint8Array(data as ArrayBuffer); - } - log.trace(`[stream:${this.id}][${this.stat.direction}] received message: length: ${res.length} ${res}`); - let m = pb.Message.fromBinary(res); - log(`[stream:${this.id}][${this.stat.direction}] received pb.Message: ${Object.entries(m)}`); - switch (m.flag) { - case undefined: - break; //regular message only - case pb.Message_Flag.STOP_SENDING: - log.trace('Remote has indicated, with "STOP_SENDING" flag, that it will discard any messages we send.'); - this.closeWrite(); - break; - case pb.Message_Flag.FIN: - log.trace('Remote has indicated, with "FIN" flag, that it will not send any further messages.'); - this.closeRead(); - break; - case pb.Message_Flag.RESET: - log.trace('Remote abruptly stopped sending, indicated with "RESET" flag.'); - this.closeRead(); + let offset = 0; + const res = new Uint8Array(data as ArrayBuffer); + if (res.length == 0) { + return } - if (this.readClosed || this.closed) { - return; - } - if (m.message) { - log.trace('%s incoming message %s', this.id, m.message); - (this._src as Pushable).push(new Uint8ArrayList(m.message)); + + // start reading bytes + // + while (offset < res.length) { + + // check if reading prefix length is required + if (this.messageState.messageSize == 0) { + const messageSize = varint.decode(res, offset); + this.messageState.messageSize = messageSize; + offset += varint.decode.bytes; + } else { + const end = Math.min(offset + this.messageState.bytesRemaining(), res.length); + this.messageState.write(res.subarray(offset, end)); + offset = end; + if (this.messageState.hasMessage()) { + this.processIncomingProtobuf(this.messageState.buffer); + this.messageState.clear(); + } + } } + }; this.channel.onclose = (_evt) => { @@ -131,6 +130,7 @@ export class WebRTCStream implements Stream { let err = (evt as RTCErrorEvent).error; this.abort(err); }; + } // If user attempts to set a new source @@ -148,8 +148,8 @@ export class WebRTCStream implements Stream { return; } - let self = this; - let closeWriteIterable = { + const self = this; + const closeWriteIterable = { async *[Symbol.asyncIterator]() { await self.closeWritePromise.promise; yield new Uint8Array(0); @@ -160,10 +160,39 @@ export class WebRTCStream implements Stream { if (closed || this.writeClosed) { break; } - let res = buf.subarray(); - let send_buf = pb.Message.toBinary({ message: buf.subarray() }); - log.trace(`[stream:${this.id}][${this.stat.direction}] sending message: length: ${res.length} ${res}, encoded through pb as ${send_buf}`); - this.channel.send(send_buf); + const res = buf.subarray(); + const msgbuf = pb.Message.toBinary({message: buf.subarray()}); + const prefix = varint.encode(msgbuf.length); + log.trace(`[stream:${this.id}][${this.stat.direction}] sending message: length: ${res.length} ${res}, encoded through pb as ${msgbuf}`); + this.channel.send(concat([prefix, msgbuf])); + } + } + + processIncomingProtobuf(buffer: Uint8Array): void { + log.trace(`[stream:${this.id}][${this.stat.direction}] received message: length: ${buffer.length} ${buffer}`); + const m = pb.Message.fromBinary(buffer); + log.trace(`[stream:${this.id}][${this.stat.direction}] received pb.Message: ${Object.entries(m)}`); + switch (m.flag) { + case undefined: + break; //regular message only + case pb.Message_Flag.STOP_SENDING: + log.trace('Remote has indicated, with "STOP_SENDING" flag, that it will discard any messages we send.'); + this.closeWrite(); + break; + case pb.Message_Flag.FIN: + log.trace('Remote has indicated, with "FIN" flag, that it will not send any further messages.'); + this.closeRead(); + break; + case pb.Message_Flag.RESET: + log.trace('Remote abruptly stopped sending, indicated with "RESET" flag.'); + this.closeRead(); + } + if (this.readClosed || this.closed) { + return; + } + if (m.message) { + log.trace('%s incoming message %s', this.id, m.message); + (this._src as Pushable).push(new Uint8ArrayList(m.message)); } } @@ -231,10 +260,36 @@ export class WebRTCStream implements Stream { private _sendFlag(flag: pb.Message_Flag): void { try { - log('Sending flag: %s', flag.toString()); - this.channel.send(pb.Message.toBinary({ flag: flag })); + log.trace('Sending flag: %s', flag.toString()); + const msgbuf = pb.Message.toBinary({flag: flag}); + const prefix = varint.encode(msgbuf.length); + this.channel.send(concat([prefix, msgbuf])); } catch (e) { log.error(`Exception while sending flag ${flag}: ${e}`); } } } + +class MessageState { + public buffer: Uint8Array = new Uint8Array() + public messageSize: number = 0; + + public bytesRemaining(): number { + return this.messageSize - this.buffer.length; + } + + public hasMessage(): boolean { + return this.messageSize != 0 && this.buffer.length == this.messageSize; + } + + public write(b: Uint8Array) { + this.buffer = concat([this.buffer, b]); + } + + public clear() { + this.buffer = new Uint8Array(); + this.messageSize = 0; + } + + +} diff --git a/src/transport.ts b/src/transport.ts index 06afeda501..3ef8478b47 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -90,7 +90,7 @@ export class WebRTCTransport implements Transport { // create offer sdp let offerSdp = await peerConnection.createOffer(); // generate random string for ufrag - const ufrag = genUuid().replaceAll('-', ''); + const ufrag = "libp2p+webrtc+v1/" + genUuid().replaceAll('-', ''); // munge sdp with ufrag = pwd offerSdp = sdp.munge(offerSdp, ufrag); // set local description diff --git a/test/transport.browser.spec.ts b/test/transport.browser.spec.ts index 873a1195c2..39034f8634 100644 --- a/test/transport.browser.spec.ts +++ b/test/transport.browser.spec.ts @@ -118,6 +118,7 @@ describe('Transport interoperability tests', () => { let data = 'dataToBeEchoedBackToMe\n'; let response = await pipe([uint8arrayFromString(data)], stream, async (source) => await first(source)); expect(response?.subarray()).to.equalBytes(uint8arrayFromString(data)); + await node.stop(); }); }); From 9d29cb022af187dcf05329cb607569b68e96b028 Mon Sep 17 00:00:00 2001 From: Chinmay Kousik Date: Tue, 1 Nov 2022 00:18:32 +0530 Subject: [PATCH 2/5] use it-length-prefixed --- package.json | 23 +++++++-------- src/stream.ts | 79 +++++++++++++++++++++++---------------------------- 2 files changed, 45 insertions(+), 57 deletions(-) diff --git a/package.json b/package.json index f818c65fe9..eef6e8b6f6 100644 --- a/package.json +++ b/package.json @@ -62,43 +62,40 @@ "devDependencies": { "@libp2p/interface-mocks": "^7.0.2", "@libp2p/peer-id-factory": "^1.0.19", + "@protobuf-ts/plugin": "^2.8.0", + "@protobuf-ts/protoc": "^2.8.0", "@types/uuid": "^8.3.4", - "@types/varint": "^6.0.0", "@typescript-eslint/parser": "^5.32.0", "aegir": "^37.4.6", + "chai": "^4.3.6", "chai-bytes": "^0.1.2", - "it-all": "^2.0.0", "it-first": "^2.0.0", "libp2p": "^0.40.0", "npm-run-all": "^4.1.5", "prettier": "^2.7.1", "typescript": "^4.7.4", - "uint8arrays": "^4.0.2", "wait-on": "^6.0.1" }, "dependencies": { "@chainsafe/libp2p-noise": "^9.0.0", "@libp2p/interface-connection": "^3.0.2", - "@libp2p/interface-registrar": "^2.0.3", "@libp2p/interface-stream-muxer": "^3.0.0", "@libp2p/interface-transport": "^2.0.0", - "@libp2p/interfaces": "^3.0.3", "@libp2p/logger": "^2.0.0", - "@libp2p/multistream-select": "^3.0.0", "@libp2p/peer-id": "^1.1.15", "@multiformats/multiaddr": "^11.0.3", - "@protobuf-ts/plugin": "^2.8.0", - "@protobuf-ts/protoc": "^2.8.0", "@protobuf-ts/runtime": "^2.8.0", - "abortable-iterator": "^4.0.2", "err-code": "^3.0.1", + "it-length-prefixed": "^8.0.3", "it-merge": "^2.0.0", + "it-pipe": "^2.0.4", + "it-pushable": "^3.1.0", + "it-stream-types": "^1.0.4", "multiformats": "^10.0.0", "multihashes": "^4.0.3", "p-defer": "^4.0.0", - "socket.io-client": "^4.1.2", - "timeout-abort-controller": "^3.0.0", - "uuid": "^9.0.0", - "varint": "^6.0.0" + "uint8arraylist": "^2.3.3", + "uint8arrays": "^4.0.2", + "uuid": "^9.0.0" } } diff --git a/src/stream.ts b/src/stream.ts index 935ac33006..a27bf8fedf 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,14 +1,15 @@ import {Stream, StreamStat, Direction} from '@libp2p/interface-connection'; import {Source} from 'it-stream-types'; import {Sink} from 'it-stream-types'; -import {pushable, Pushable} from 'it-pushable'; +import {pushable} from 'it-pushable'; +import * as lp from 'it-length-prefixed'; +import { pipe } from 'it-pipe'; import defer, {DeferredPromise} from 'p-defer'; import merge from 'it-merge'; import {Uint8ArrayList} from 'uint8arraylist'; import {logger} from '@libp2p/logger'; import * as pb from '../proto_ts/message.js'; import {concat} from 'uint8arrays/concat'; -import * as varint from 'varint'; const log = logger('libp2p:webrtc:stream'); @@ -46,7 +47,8 @@ export class WebRTCStream implements Stream { metadata: Record; private readonly channel: RTCDataChannel; - _src: Source = pushable(); + private readonly _src: Source; + _innersrc = pushable(); sink: Sink>; // promises @@ -92,36 +94,6 @@ export class WebRTCStream implements Stream { this.opened.resolve(); }; - this.channel.onmessage = async ({data}) => { - - let offset = 0; - const res = new Uint8Array(data as ArrayBuffer); - if (res.length == 0) { - return - } - - // start reading bytes - // - while (offset < res.length) { - - // check if reading prefix length is required - if (this.messageState.messageSize == 0) { - const messageSize = varint.decode(res, offset); - this.messageState.messageSize = messageSize; - offset += varint.decode.bytes; - } else { - const end = Math.min(offset + this.messageState.bytesRemaining(), res.length); - this.messageState.write(res.subarray(offset, end)); - offset = end; - if (this.messageState.hasMessage()) { - this.processIncomingProtobuf(this.messageState.buffer); - this.messageState.clear(); - } - } - } - - }; - this.channel.onclose = (_evt) => { this.close(); }; @@ -131,6 +103,29 @@ export class WebRTCStream implements Stream { this.abort(err); }; + const self = this; + // reader pipe + this.channel.onmessage = async ({data}) => { + const res = new Uint8Array(data as ArrayBuffer); + if (res.length == 0) { + return + } + this._innersrc.push(res) + }; + + this._src = pipe( + this._innersrc, + lp.decode(), + (source) => (async function * () { + for await (const buf of source) { + const { data } = self.processIncomingProtobuf(buf.subarray()); + if (data) { + yield new Uint8ArrayList(data); + } + } + })(), + ) + } // If user attempts to set a new source @@ -162,13 +157,13 @@ export class WebRTCStream implements Stream { } const res = buf.subarray(); const msgbuf = pb.Message.toBinary({message: buf.subarray()}); - const prefix = varint.encode(msgbuf.length); + const sendbuf = lp.encode.single(msgbuf) log.trace(`[stream:${this.id}][${this.stat.direction}] sending message: length: ${res.length} ${res}, encoded through pb as ${msgbuf}`); - this.channel.send(concat([prefix, msgbuf])); + this.channel.send(sendbuf.subarray()) } } - processIncomingProtobuf(buffer: Uint8Array): void { + processIncomingProtobuf(buffer: Uint8Array): { data: Uint8Array | undefined } { log.trace(`[stream:${this.id}][${this.stat.direction}] received message: length: ${buffer.length} ${buffer}`); const m = pb.Message.fromBinary(buffer); log.trace(`[stream:${this.id}][${this.stat.direction}] received pb.Message: ${Object.entries(m)}`); @@ -188,12 +183,9 @@ export class WebRTCStream implements Stream { this.closeRead(); } if (this.readClosed || this.closed) { - return; - } - if (m.message) { - log.trace('%s incoming message %s', this.id, m.message); - (this._src as Pushable).push(new Uint8ArrayList(m.message)); + return { data: undefined }; } + return { data: m.message } } /** @@ -219,7 +211,7 @@ export class WebRTCStream implements Stream { closeRead(): void { this._sendFlag(pb.Message_Flag.STOP_SENDING); this.readClosed = true; - (this.source as Pushable).end(); + (this._innersrc).end(); if (this.readClosed && this.writeClosed) { this.close(); } @@ -262,8 +254,7 @@ export class WebRTCStream implements Stream { try { log.trace('Sending flag: %s', flag.toString()); const msgbuf = pb.Message.toBinary({flag: flag}); - const prefix = varint.encode(msgbuf.length); - this.channel.send(concat([prefix, msgbuf])); + this.channel.send(lp.encode.single(msgbuf).subarray()); } catch (e) { log.error(`Exception while sending flag ${flag}: ${e}`); } From 43a88208ed2c545c237b96e0ab85ac0dc5138ecd Mon Sep 17 00:00:00 2001 From: Chinmay Kousik Date: Tue, 1 Nov 2022 09:05:18 +0530 Subject: [PATCH 3/5] stream transitions --- src/stream.ts | 193 ++++++++++++++++++++++-------------- test/stream.browser.spec.ts | 90 ++++++----------- 2 files changed, 150 insertions(+), 133 deletions(-) diff --git a/src/stream.ts b/src/stream.ts index a27bf8fedf..4ced9c8957 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -2,14 +2,13 @@ import {Stream, StreamStat, Direction} from '@libp2p/interface-connection'; import {Source} from 'it-stream-types'; import {Sink} from 'it-stream-types'; import {pushable} from 'it-pushable'; -import * as lp from 'it-length-prefixed'; -import { pipe } from 'it-pipe'; +import * as lengthPrefixed from 'it-length-prefixed'; +import {pipe} from 'it-pipe'; import defer, {DeferredPromise} from 'p-defer'; import merge from 'it-merge'; import {Uint8ArrayList} from 'uint8arraylist'; import {logger} from '@libp2p/logger'; import * as pb from '../proto_ts/message.js'; -import {concat} from 'uint8arrays/concat'; const log = logger('libp2p:webrtc:stream'); @@ -46,22 +45,18 @@ export class WebRTCStream implements Stream { */ metadata: Record; private readonly channel: RTCDataChannel; + streamState = new StreamState(); private readonly _src: Source; - _innersrc = pushable(); + private readonly _innersrc = pushable(); sink: Sink>; // promises opened: DeferredPromise = defer(); closeWritePromise: DeferredPromise = defer(); - writeClosed: boolean = false; - readClosed: boolean = false; closed: boolean = false; closeCb?: (stream: WebRTCStream) => void | undefined - // read state - messageState: MessageState = new MessageState(); - // testing constructor(opts: StreamInitOpts) { @@ -106,21 +101,24 @@ export class WebRTCStream implements Stream { const self = this; // reader pipe this.channel.onmessage = async ({data}) => { - const res = new Uint8Array(data as ArrayBuffer); - if (res.length == 0) { - return + if (data.length == 0 || !data) { + return; } - this._innersrc.push(res) + this._innersrc.push(new Uint8Array(data as ArrayBufferLike)) }; + // pipe framed protobuf messages through + // a length prefixed decoder, and surface + // data from the `Message.message` field + // through a source. this._src = pipe( this._innersrc, - lp.decode(), - (source) => (async function * () { + lengthPrefixed.decode(), + (source) => (async function* () { for await (const buf of source) { - const { data } = self.processIncomingProtobuf(buf.subarray()); - if (data) { - yield new Uint8ArrayList(data); + const {message} = self.processIncomingProtobuf(buf.subarray()); + if (message) { + yield new Uint8ArrayList(message); } } })(), @@ -139,7 +137,7 @@ export class WebRTCStream implements Stream { private async _sinkFn(src: Source): Promise { await this.opened.promise; - if (closed || this.writeClosed) { + if (this.streamState.state == StreamStates.CLOSED || this.streamState.state == StreamStates.WRITE_CLOSED) { return; } @@ -152,40 +150,34 @@ export class WebRTCStream implements Stream { }; for await (const buf of merge(closeWriteIterable, src)) { - if (closed || this.writeClosed) { - break; + const state = self.streamState.state; + if (state == StreamStates.CLOSED || state == StreamStates.WRITE_CLOSED) { + return; } - const res = buf.subarray(); const msgbuf = pb.Message.toBinary({message: buf.subarray()}); - const sendbuf = lp.encode.single(msgbuf) - log.trace(`[stream:${this.id}][${this.stat.direction}] sending message: length: ${res.length} ${res}, encoded through pb as ${msgbuf}`); + const sendbuf = lengthPrefixed.encode.single(msgbuf) this.channel.send(sendbuf.subarray()) } } - processIncomingProtobuf(buffer: Uint8Array): { data: Uint8Array | undefined } { - log.trace(`[stream:${this.id}][${this.stat.direction}] received message: length: ${buffer.length} ${buffer}`); + processIncomingProtobuf(buffer: Uint8Array): pb.Message { const m = pb.Message.fromBinary(buffer); - log.trace(`[stream:${this.id}][${this.stat.direction}] received pb.Message: ${Object.entries(m)}`); - switch (m.flag) { - case undefined: - break; //regular message only - case pb.Message_Flag.STOP_SENDING: - log.trace('Remote has indicated, with "STOP_SENDING" flag, that it will discard any messages we send.'); - this.closeWrite(); - break; - case pb.Message_Flag.FIN: - log.trace('Remote has indicated, with "FIN" flag, that it will not send any further messages.'); - this.closeRead(); - break; - case pb.Message_Flag.RESET: - log.trace('Remote abruptly stopped sending, indicated with "RESET" flag.'); - this.closeRead(); - } - if (this.readClosed || this.closed) { - return { data: undefined }; + if (m.flag) { + const [currentState, nextState] = this.streamState.transition({direction: 'inbound', flag: m.flag!}); + if (currentState != nextState) { + switch (nextState) { + case StreamStates.READ_CLOSED: + this._innersrc.end(); + break; + case StreamStates.WRITE_CLOSED: + this.closeWritePromise.resolve(); + break; + case StreamStates.CLOSED: + this.close(); + } + } } - return { data: m.message } + return m; } /** @@ -196,9 +188,9 @@ export class WebRTCStream implements Stream { return; } this.stat.timeline.close = new Date().getTime(); - this.closed = true; - this.readClosed = true; - this.writeClosed = true; + this.streamState.state = StreamStates.CLOSED; + this._innersrc.end(); + this.closeWritePromise.resolve(); this.channel.close(); if (this.closeCb) { this.closeCb(this) @@ -209,10 +201,12 @@ export class WebRTCStream implements Stream { * Close a stream for reading only */ closeRead(): void { - this._sendFlag(pb.Message_Flag.STOP_SENDING); - this.readClosed = true; - (this._innersrc).end(); - if (this.readClosed && this.writeClosed) { + const [currentState, nextState] = this.streamState.transition({direction: 'outbound', flag: pb.Message_Flag.STOP_SENDING}); + if (currentState == StreamStates.OPEN || currentState == StreamStates.WRITE_CLOSED) { + this._sendFlag(pb.Message_Flag.STOP_SENDING); + (this._innersrc).end(); + } + if (currentState != nextState && nextState == StreamStates.CLOSED) { this.close(); } } @@ -221,10 +215,12 @@ export class WebRTCStream implements Stream { * Close a stream for writing only */ closeWrite(): void { - this._sendFlag(pb.Message_Flag.FIN); - this.writeClosed = true; - this.closeWritePromise.resolve(); - if (this.readClosed && this.writeClosed) { + const [currentState, nextState] = this.streamState.transition({direction: 'outbound', flag: pb.Message_Flag.FIN}); + if (currentState == StreamStates.OPEN || currentState == StreamStates.READ_CLOSED) { + this._sendFlag(pb.Message_Flag.FIN); + this.closeWritePromise.resolve(); + } + if (currentState != nextState && nextState == StreamStates.CLOSED) { this.close(); } } @@ -243,9 +239,8 @@ export class WebRTCStream implements Stream { reset(): void { this.stat = defaultStat(this.stat.direction); this._sendFlag(pb.Message_Flag.RESET); - this.writeClosed = true; - this.closeWritePromise.resolve(); - if (this.readClosed && this.writeClosed) { + const [currentState, nextState] = this.streamState.transition({direction: 'outbound', flag: pb.Message_Flag.RESET}); + if (currentState != nextState) { this.close(); } } @@ -254,33 +249,79 @@ export class WebRTCStream implements Stream { try { log.trace('Sending flag: %s', flag.toString()); const msgbuf = pb.Message.toBinary({flag: flag}); - this.channel.send(lp.encode.single(msgbuf).subarray()); + this.channel.send(lengthPrefixed.encode.single(msgbuf).subarray()); } catch (e) { log.error(`Exception while sending flag ${flag}: ${e}`); } } } -class MessageState { - public buffer: Uint8Array = new Uint8Array() - public messageSize: number = 0; +/* + * State transitions for a stream + */ +type StreamStateInput = { + direction: 'inbound' | 'outbound', + flag: pb.Message_Flag, +}; - public bytesRemaining(): number { - return this.messageSize - this.buffer.length; - } +export enum StreamStates { + OPEN, + READ_CLOSED, + WRITE_CLOSED, + CLOSED, +} - public hasMessage(): boolean { - return this.messageSize != 0 && this.buffer.length == this.messageSize; - } +class StreamState { + state: StreamStates = StreamStates.OPEN - public write(b: Uint8Array) { - this.buffer = concat([this.buffer, b]); - } + transition({direction, flag}: StreamStateInput): [StreamStates, StreamStates] { + let prev = this.state; + if (this.state == StreamStates.CLOSED) { + return [prev, StreamStates.CLOSED]; + } + if (direction == 'inbound') { + switch (flag) { + case pb.Message_Flag.FIN: + if (this.state == StreamStates.OPEN) { + this.state = StreamStates.READ_CLOSED; + } else if (this.state == StreamStates.WRITE_CLOSED) { + this.state = StreamStates.CLOSED; + } + break; - public clear() { - this.buffer = new Uint8Array(); - this.messageSize = 0; - } + case pb.Message_Flag.STOP_SENDING: + if (this.state == StreamStates.OPEN) { + this.state = StreamStates.WRITE_CLOSED; + } else if (this.state == StreamStates.READ_CLOSED) { + this.state = StreamStates.CLOSED; + } + break; + case pb.Message_Flag.RESET: + this.state = StreamStates.CLOSED; + } + } else { + switch (flag) { + case pb.Message_Flag.FIN: + if (this.state == StreamStates.OPEN) { + this.state = StreamStates.WRITE_CLOSED; + } else if (this.state == StreamStates.READ_CLOSED) { + this.state = StreamStates.CLOSED; + } + break; + + case pb.Message_Flag.STOP_SENDING: + if (this.state == StreamStates.OPEN) { + this.state = StreamStates.READ_CLOSED; + } else if (this.state == StreamStates.WRITE_CLOSED) { + this.state = StreamStates.CLOSED; + } + break; + case pb.Message_Flag.RESET: + this.state = StreamStates.CLOSED; + } + } + return [prev, this.state]; + } } diff --git a/test/stream.browser.spec.ts b/test/stream.browser.spec.ts index ab7dc17f43..c4121dc6cf 100644 --- a/test/stream.browser.spec.ts +++ b/test/stream.browser.spec.ts @@ -1,98 +1,74 @@ -import * as underTest from '../src/stream.js'; -import { expect, assert } from 'chai' +import * as underTest from '../src/stream'; +import {expect, assert} from 'chai' describe('stream stats', () => { it('can construct', () => { let pc = new RTCPeerConnection(); - let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); - let s = new underTest.WebRTCStream({ channel: dc, stat: underTest.defaultStat('outbound') }); + let dc = pc.createDataChannel('whatever', {negotiated: true, id: 91}); + let s = new underTest.WebRTCStream({channel: dc, stat: underTest.defaultStat('outbound')}); // expect(s.stat.timeline.close).to.not.exist(); assert.notExists(s.stat.timeline.close); }); it('close marks it closed', () => { let pc = new RTCPeerConnection(); - let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); - let s = new underTest.WebRTCStream({ channel: dc, stat: underTest.defaultStat('outbound') }); - expect(s.closed).to.equal(false); - expect(s.readClosed).to.equal(false); - expect(s.writeClosed).to.equal(false); - // expect(s.stat.timeline.close).to.not.exist(); + let dc = pc.createDataChannel('whatever', {negotiated: true, id: 91}); + let s = new underTest.WebRTCStream({channel: dc, stat: underTest.defaultStat('outbound')}); + + expect(s.streamState.state).to.equal(underTest.StreamStates.OPEN); s.close(); - expect(s.closed).to.equal(true); - expect(s.readClosed).to.equal(true); - expect(s.writeClosed).to.equal(true); - // expect(s.stat.timeline.close).to.exist(); + expect(s.streamState.state).to.equal(underTest.StreamStates.CLOSED); }); it('closeRead marks it read-closed only', () => { let pc = new RTCPeerConnection(); - let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); - let s = new underTest.WebRTCStream({ channel: dc, stat: underTest.defaultStat('outbound') }); - expect(s.closed).to.equal(false); - expect(s.readClosed).to.equal(false); - expect(s.writeClosed).to.equal(false); + let dc = pc.createDataChannel('whatever', {negotiated: true, id: 91}); + let s = new underTest.WebRTCStream({channel: dc, stat: underTest.defaultStat('outbound')}); + expect(s.streamState.state).to.equal(underTest.StreamStates.OPEN); s.closeRead(); - expect(s.closed).to.equal(false); - expect(s.readClosed).to.equal(true); - expect(s.writeClosed).to.equal(false); + expect(s.streamState.state).to.equal(underTest.StreamStates.READ_CLOSED); }); it('closeWrite marks it write-closed only', () => { let pc = new RTCPeerConnection(); - let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); - let s = new underTest.WebRTCStream({ channel: dc, stat: underTest.defaultStat('outbound') }); - expect(s.closed).to.equal(false); - expect(s.readClosed).to.equal(false); - expect(s.writeClosed).to.equal(false); + let dc = pc.createDataChannel('whatever', {negotiated: true, id: 91}); + let s = new underTest.WebRTCStream({channel: dc, stat: underTest.defaultStat('outbound')}); + expect(s.streamState.state).to.equal(underTest.StreamStates.OPEN); s.closeWrite(); - expect(s.closed).to.equal(false); - expect(s.readClosed).to.equal(false); - expect(s.writeClosed).to.equal(true); + expect(s.streamState.state).to.equal(underTest.StreamStates.WRITE_CLOSED); }); it('closeWrite AND closeRead = close', () => { let pc = new RTCPeerConnection(); - let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); - let s = new underTest.WebRTCStream({ channel: dc, stat: underTest.defaultStat('outbound') }); - expect(s.closed).to.equal(false); - expect(s.readClosed).to.equal(false); - expect(s.writeClosed).to.equal(false); - s.closeRead(); + let dc = pc.createDataChannel('whatever', {negotiated: true, id: 91}); + let s = new underTest.WebRTCStream({channel: dc, stat: underTest.defaultStat('outbound')}); s.closeWrite(); - expect(s.closed).to.equal(true); - expect(s.readClosed).to.equal(true); - expect(s.writeClosed).to.equal(true); + expect(s.streamState.state).to.equal(underTest.StreamStates.WRITE_CLOSED); + s.closeRead(); + expect(s.streamState.state).to.equal(underTest.StreamStates.CLOSED); }); it('abort = close', () => { let pc = new RTCPeerConnection(); - let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); - let s = new underTest.WebRTCStream({ channel: dc, stat: underTest.defaultStat('outbound') }); - expect(s.closed).to.equal(false); - expect(s.readClosed).to.equal(false); - expect(s.writeClosed).to.equal(false); + let dc = pc.createDataChannel('whatever', {negotiated: true, id: 91}); + let s = new underTest.WebRTCStream({channel: dc, stat: underTest.defaultStat('outbound')}); // expect(s.stat.timeline.close).to.not.exist(); - s.abort({ name: 'irrelevant', message: 'this parameter is actually ignored' }); - expect(s.closed).to.equal(true); - expect(s.readClosed).to.equal(true); - expect(s.writeClosed).to.equal(true); + expect(s.streamState.state).to.equal(underTest.StreamStates.OPEN); + s.abort({name: 'irrelevant', message: 'this parameter is actually ignored'}); + expect(s.streamState.state).to.equal(underTest.StreamStates.CLOSED); // expect(s.stat.timeline.close).to.exist(); expect(s.stat.timeline.close).to.be.greaterThan(s.stat.timeline.open); }); - it('reset = close + newStat', () => { + it('reset = close', () => { let pc = new RTCPeerConnection(); - let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); - let s = new underTest.WebRTCStream({ channel: dc, stat: underTest.defaultStat('outbound') }); - expect(s.closed).to.equal(false); - expect(s.readClosed).to.equal(false); - expect(s.writeClosed).to.equal(false); + let dc = pc.createDataChannel('whatever', {negotiated: true, id: 91}); + let s = new underTest.WebRTCStream({channel: dc, stat: underTest.defaultStat('outbound')}); // expect(s.stat.timeline.close).to.not.exist(); + expect(s.streamState.state).to.equal(underTest.StreamStates.OPEN); s.reset(); //only resets the write side - expect(s.closed).to.equal(false); - expect(s.readClosed).to.equal(false); - expect(s.writeClosed).to.equal(true); + expect(s.streamState.state).to.equal(underTest.StreamStates.CLOSED); // expect(s.stat.timeline.close).to.not.exist(); + expect(dc.readyState).to.be.oneOf(['closing', 'closed']); }); }); From 51aa49db85e7af0f9d5058ce1bcfa003df792f4e Mon Sep 17 00:00:00 2001 From: Chinmay Kousik Date: Tue, 1 Nov 2022 09:20:01 +0530 Subject: [PATCH 4/5] add comments --- src/muxer.ts | 3 +-- src/stream.ts | 23 +++++++++++++++-------- src/transport.ts | 22 ++++++++++++++-------- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/src/muxer.ts b/src/muxer.ts index 0db0422ac3..9b426132ff 100644 --- a/src/muxer.ts +++ b/src/muxer.ts @@ -2,7 +2,6 @@ import {Stream} from "@libp2p/interface-connection" import {StreamMuxer, StreamMuxerFactory, StreamMuxerInit} from "@libp2p/interface-stream-muxer" import {Source, Sink} from "it-stream-types" -import {v4} from "uuid" import {WebRTCStream} from "./stream.js" import {nopSink, nopSource} from "./util.js" @@ -53,7 +52,7 @@ export class DataChannelMuxer implements StreamMuxer { } newStream(name?: string | undefined): Stream { - const streamName = name || v4(); + const streamName = name || ''; const channel = this.peerConnection.createDataChannel(streamName) const stream = new WebRTCStream({ channel, diff --git a/src/stream.ts b/src/stream.ts index 4ced9c8957..98a3014c5d 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -44,21 +44,30 @@ export class WebRTCStream implements Stream { * User defined stream metadata */ metadata: Record; + private readonly channel: RTCDataChannel; streamState = new StreamState(); + // _src is exposed to the user via the `source` getter to read unwrapped protobuf + // data from the underlying datachannel. private readonly _src: Source; + + // _innersrc is used to push data from the underlying datachannel to the + // length prefix decoder and then the protobuf decoder. private readonly _innersrc = pushable(); + + // sink is used to write data to the remote. It takes care of wrapping + // data in a protobuf and adding the length prefix. sink: Sink>; // promises + // opened is resolved when the underlying datachannel is in the open state. opened: DeferredPromise = defer(); + // closeWritePromise is used to trigger a generator which can be used to close + // the sink. closeWritePromise: DeferredPromise = defer(); - closed: boolean = false; closeCb?: (stream: WebRTCStream) => void | undefined - // testing - constructor(opts: StreamInitOpts) { this.channel = opts.channel; this.id = this.channel.label; @@ -70,7 +79,7 @@ export class WebRTCStream implements Stream { break; case 'closed': case 'closing': - this.closed = true; + this.streamState.state = StreamStates.CLOSED; if (!this.stat.timeline.close) { this.stat.timeline.close = new Date().getTime(); } @@ -99,6 +108,7 @@ export class WebRTCStream implements Stream { }; const self = this; + // reader pipe this.channel.onmessage = async ({data}) => { if (data.length == 0 || !data) { @@ -184,9 +194,6 @@ export class WebRTCStream implements Stream { * Close a stream for reading and writing */ close(): void { - if (this.closed) { - return; - } this.stat.timeline.close = new Date().getTime(); this.streamState.state = StreamStates.CLOSED; this._innersrc.end(); @@ -238,9 +245,9 @@ export class WebRTCStream implements Stream { */ reset(): void { this.stat = defaultStat(this.stat.direction); - this._sendFlag(pb.Message_Flag.RESET); const [currentState, nextState] = this.streamState.transition({direction: 'outbound', flag: pb.Message_Flag.RESET}); if (currentState != nextState) { + this._sendFlag(pb.Message_Flag.RESET); this.close(); } } diff --git a/src/transport.ts b/src/transport.ts index 3ef8478b47..49fa09ecaf 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -27,7 +27,7 @@ export interface WebRTCTransportComponents { export class WebRTCTransport implements Transport { private components: WebRTCTransportComponents - constructor (components: WebRTCTransportComponents) { + constructor(components: WebRTCTransportComponents) { this.components = components } @@ -70,7 +70,9 @@ export class WebRTCTransport implements Transport { } as any); const peerConnection = new RTCPeerConnection({certificates: [certificate]}); - // create data channel + // create data channel for running the noise handshake. Once the data channel is opened, + // the remote will initiate the noise handshake. This is used to confirm the identity of + // the peer. const dataChannelOpenPromise = defer(); const handshakeDataChannel = peerConnection.createDataChannel('handshake', {negotiated: true, id: 0}); const handhsakeTimeout = setTimeout(() => { @@ -82,22 +84,24 @@ export class WebRTCTransport implements Transport { clearTimeout(handhsakeTimeout) dataChannelOpenPromise.resolve(); } + handshakeDataChannel.onerror = (ev: Event) => { clearTimeout(handhsakeTimeout) log.error('Error opening a data channel for handshaking: %s', ev.toString()); dataChannelOpenPromise.reject(dataChannelError('data', `error opening datachannel: ${ev.toString()}`)); }; - // create offer sdp + + let offerSdp = await peerConnection.createOffer(); - // generate random string for ufrag const ufrag = "libp2p+webrtc+v1/" + genUuid().replaceAll('-', ''); - // munge sdp with ufrag = pwd + // munge sdp with ufrag = pwd. This allows the remote to respond to + // STUN messages without performing an actual SDP exchange. This is because + // it can infer the passwd field by reading the USERNAME attribute + // of the STUN message. offerSdp = sdp.munge(offerSdp, ufrag); - // set local description await peerConnection.setLocalDescription(offerSdp); // construct answer sdp from multiaddr const answerSdp = sdp.fromMultiAddr(ma, ufrag); - // set remote description await peerConnection.setRemoteDescription(answerSdp); // wait for peerconnection.onopen to fire, or for the datachannel to open await dataChannelOpenPromise.promise; @@ -135,6 +139,8 @@ export class WebRTCTransport implements Transport { }) const muxerFactory = new DataChannelMuxerFactory(peerConnection) + // For outbound connections, the remote is expected to start the noise handshake. + // Therefore, we need to secure an inbound noise connection from the remote. await noise.secureInbound(myPeerId, wrappedDuplex, theirPeerId); const upgraded = await options.upgrader.upgradeOutbound(maConn, {skipProtection: true, skipEncryption: true, muxerFactory}) return upgraded @@ -153,7 +159,7 @@ export class WebRTCTransport implements Transport { const localFpString = localFingerprint.value!.replaceAll(':', ''); const localFpArray = uint8arrayFromString(localFpString, 'hex'); if (localFingerprint.algorithm! != 'sha-256') { - throw unsupportedHashAlgorithm(localFingerprint.algorithm || 'none'); + throw unsupportedHashAlgorithm(localFingerprint.algorithm || 'none'); } const local = multihashes.encode(localFpArray, multihashes.names['sha2-256']); From f80baba0658b1e13956b9c622f2bcdaf48740770 Mon Sep 17 00:00:00 2001 From: Chinmay Kousik Date: Wed, 2 Nov 2022 10:02:59 +0530 Subject: [PATCH 5/5] fix --- src/stream.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stream.ts b/src/stream.ts index 98a3014c5d..6e10db9950 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -126,7 +126,7 @@ export class WebRTCStream implements Stream { lengthPrefixed.decode(), (source) => (async function* () { for await (const buf of source) { - const {message} = self.processIncomingProtobuf(buf.subarray()); + const message = self.processIncomingProtobuf(buf.subarray()); if (message) { yield new Uint8ArrayList(message); } @@ -170,7 +170,7 @@ export class WebRTCStream implements Stream { } } - processIncomingProtobuf(buffer: Uint8Array): pb.Message { + processIncomingProtobuf(buffer: Uint8Array): Uint8Array | undefined { const m = pb.Message.fromBinary(buffer); if (m.flag) { const [currentState, nextState] = this.streamState.transition({direction: 'inbound', flag: m.flag!}); @@ -187,7 +187,7 @@ export class WebRTCStream implements Stream { } } } - return m; + return m.message; } /**