From f4161e8e77bdffc686e192433d9ddcb3c749ea71 Mon Sep 17 00:00:00 2001 From: 0xLogN Date: Sun, 5 Mar 2023 17:43:37 -0800 Subject: [PATCH] refactor!: deprecate DirectStream, prefer Stream --- .vscode/settings.json | 2 +- src/index.ts | 1 + src/stream/DirectStream.ts | 286 ++-------------- src/stream/index.ts | 288 +++++++++++++++-- src/tests/stream/DirectStream.test.ts | 355 -------------------- src/tests/stream/index.test.ts | 449 ++++++++++++++++++++++---- 6 files changed, 677 insertions(+), 704 deletions(-) delete mode 100644 src/tests/stream/DirectStream.test.ts diff --git a/.vscode/settings.json b/.vscode/settings.json index ab896b5..b6e6fc2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,3 @@ { - "cSpell.words": ["bufncmp", "TCPIO"] + "cSpell.words": ["bufncmp", "subdata", "TCPIO"] } diff --git a/src/index.ts b/src/index.ts index 001c23b..e6d79f2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -30,6 +30,7 @@ import DirectStream, { DirectStreamEventArguments, DirectStreamEvents } from "./ export { bytes, ControlCharacters, + // eslint-disable-next-line deprecation/deprecation DirectStream, DirectStreamEventArguments, DirectStreamEvents, diff --git a/src/stream/DirectStream.ts b/src/stream/DirectStream.ts index 3502514..008d36f 100644 --- a/src/stream/DirectStream.ts +++ b/src/stream/DirectStream.ts @@ -16,11 +16,14 @@ * along with this program. If not, see . */ +import { PassThrough } from "node:stream"; + import debug from "debug"; import { Emitter } from "strict-event-emitter"; -import { bytes, kb, mb } from "../lib/sizeHelpers"; -import { type SizedControlCharacters, ControlCharacters } from "./controlCharacters"; +import Stream, { StreamEvents } from "."; +import type { ControlCharacters } from "./controlCharacters"; +import { type SizedControlCharacters } from "./controlCharacters"; const log = debug("node-subdata-2:stream:DirectStream"); @@ -55,24 +58,29 @@ export type DirectStreamEventArguments = { * * This stream does not connect with an actual socket, it is just a wrapper around a buffer. You probably * want to use {@link Stream} for this purpose. + * + * @deprecated This class is deprecated and will be removed in a future version. The implementation from this class + * has been moved to {@link Stream}. */ +// TODO: Remove this class in a future release export default class DirectStream extends Emitter { - /** The buffer of the stream */ - private _buffer: Buffer; - /** The size of the buffer */ - private _bufferSize: number; - /** The buffer of the current read packet */ - private _packet: Buffer; - /** The size of the current read packet */ - private _packetSize: number; + private _stream: Stream; + private _passThrough: PassThrough; public constructor() { super(); - log("initializing"); - this._buffer = Buffer.alloc(0); - this._bufferSize = 0; - this._packet = Buffer.alloc(0); - this._packetSize = 0; + log("initializing deprecated pass-through class"); + this._passThrough = new PassThrough(); + this._stream = new Stream(this._passThrough); + this._stream.on(StreamEvents.Read, (data) => { + this.emit(DirectStreamEvents.Read, data.length, Math.floor(data.length / 4), data); + }); + this._stream.on(StreamEvents.Packet, (data) => { + this.emit(DirectStreamEvents.Packet, data); + }); + this._stream.on(StreamEvents.Reset, () => { + this.emit(DirectStreamEvents.ReadReset); + }); } /** @@ -80,201 +88,7 @@ export default class DirectStream extends Emitter { * @param data The data to feed */ public feed(data: Buffer): void { - log("feeding data", data, "size", data.length); - this._buffer = Buffer.concat([this._buffer, data]); - this._bufferSize += data.length; - log("new data", this._buffer, "size", this._bufferSize); - this._handleNewData(); - } - - /** - * Invalidates all previous data. Called internally when a {@link ControlCharacters.ReadReset} is encountered. - */ - private _readReset(): void { - log("triggering read reset"); - this._buffer = Buffer.alloc(0); - this._bufferSize = 0; - this._packet = Buffer.alloc(0); - this._packetSize = 0; - this.emit(DirectStreamEvents.ReadReset); - } - - /** - * Command the stream to attempt to process new data. Called internally after {@link DirectStream.feed} is called. - */ - private _handleNewData(): void { - if (this._bufferSize === 0) return; - const char = this._buffer[0]; - - if (!(char.toString() in ControlCharacters)) - throw new Error(`I don't know what I am looking at! Encountered unknown control character 0x${this._buffer[0].toString(16)} in stream.`); - - log("handling control character", ControlCharacters[char]); - - switch (this._buffer[0]) { - case ControlCharacters.ReadReset: - const postReset = this._buffer.subarray(1); - log("triggering read reset, following data", postReset); - this._readReset(); - // Re-insert all data that follows the reset - log("re-feeding data"); - this.feed(postReset); - break; - case ControlCharacters.ReadByte: - if (this._bufferSize < 2) return; - this._emitRead({ controlCharacter: ControlCharacters.ReadByte, bytes: 1, numberOfType: 1 }, this._buffer[1]); - this._shiftBuffer(2); - break; - case ControlCharacters.ReadBytes: - if (this._bufferSize < 2) return; - const byteSize = (this._buffer[1] + 1) * 4; - if (this._bufferSize < byteSize + 2) return; - this._emitRead( - { controlCharacter: ControlCharacters.ReadBytes, bytes: byteSize, numberOfType: this._buffer[1] }, - this._buffer.subarray(2, byteSize + 2) - ); - this._shiftBuffer(byteSize + 2); - break; - case ControlCharacters.ReadKB: - if (this._bufferSize < 2) return; - const kbSize = kb(this._buffer[1] + 1) * 4; - if (this._bufferSize < kbSize + 2) return; - this._emitRead( - { controlCharacter: ControlCharacters.ReadKB, bytes: kbSize, numberOfType: this._buffer[1] }, - this._buffer.subarray(2, kbSize + 2) - ); - this._shiftBuffer(kbSize + 2); - break; - case ControlCharacters.ReadMB: - if (this._bufferSize < 2) return; - const mbSize = mb(this._buffer[1] + 1) * 4; - if (this._bufferSize < mbSize + 2) return; - this._emitRead( - { controlCharacter: ControlCharacters.ReadMB, bytes: mbSize, numberOfType: this._buffer[1] }, - this._buffer.subarray(2, mbSize + 2) - ); - this._shiftBuffer(mbSize + 2); - break; - case ControlCharacters.ReadGB: - case ControlCharacters.ReadTB: - case ControlCharacters.ReadPB: - // TODO: The SubData Java implementation does not yet support these either. This is a placeholder until we make sizes BigInts. - throw new Error("Internal error: ReadGB, ReadTB, and ReadPB are not yet implemented."); - case ControlCharacters.KeepAlive: - this._shiftBuffer(1); - break; - case ControlCharacters.EndOfPacket: - this._emitPacket(); - this._shiftBuffer(1); - break; - /* istanbul ignore next: this should never happen */ - default: - throw new Error( - `Internal bug: Unhandled control character 0x${this._buffer[0].toString(16)}. Please report this as an issue on our GitHub.` - ); - } - - if (this._bufferSize > 0) this._handleNewData(); - } - - /** - * Called internally to add to the packet buffer and emit the {@link DirectStreamEvents.Read} event. - * - * The object in the first parameter accepts: - * - controlCharacter: The control character that was read - * - bytes: The number of bytes that were read - * - numberOfType: The number of the type (e.g. for readKB(1), this should be 1) - * - * @param info See above - * @param data The actual data that was read - */ - private _emitRead(info: { controlCharacter: ControlCharacters.ReadByte; bytes: 1; numberOfType: 1 }, data: number): void; - private _emitRead(info: { controlCharacter: SizedControlCharacters; bytes: number; numberOfType: number }, data: Buffer): void; - private _emitRead( - { - controlCharacter, - bytes, - numberOfType - }: { controlCharacter: SizedControlCharacters | ControlCharacters.ReadByte; bytes: number; numberOfType: number }, - data: Buffer | number - ): void { - log("triggering read event of type", ControlCharacters[controlCharacter], "with data", data, "and size", bytes, "bytes"); - this._packet = Buffer.concat([this._packet, data instanceof Buffer ? data : Buffer.from([data])]); - this._packetSize += bytes; - /* istanbul ignore if: this should never reasonably happen & is untestable without a LOT of work */ - if (this._packetSize >= Number.MAX_SAFE_INTEGER - 1000) { - this._packet = Buffer.alloc(0); - this._packetSize = 0; - throw new Error("Packet size exceeded or got very close to maximum safe integer. This is probably a bug."); - } - if (controlCharacter === ControlCharacters.ReadByte) { - // TODO: Remove this cast. Probably need to use some sorta weird discrimination over the types. - this.emit(DirectStreamEvents.Read, ControlCharacters.ReadByte, 1, data as number); - return; - } else this.emit(DirectStreamEvents.Read, controlCharacter, numberOfType, data as Buffer); - } - - /** Called internally to clear and emit a packet. */ - private _emitPacket(): void { - debug("triggering packet"); - this.emit(DirectStreamEvents.Packet, this._packet); - this._packet = Buffer.alloc(0); - this._packetSize = 0; - } - - /** - * Shift the buffer by a certain amount. - * @param count The amount to shift the buffer by - */ - private _shiftBuffer(count: number): void { - log("shifting internal buffer by", count); - this._buffer = this._buffer.subarray(count); - this._bufferSize -= count; - } - - /** - * Do some funny math to calculate the return value for {@link DirectStream._bestControlCharacter}. - * - * @param controlCharacter The control character being suggested - * @param size The size in bytes - * @param sizeFn The size function to use (probably one of {@link kb}, {@link mb}, or {@link bytes}) - */ - private _calculateControlCharacterRemainder( - controlCharacter: SizedControlCharacters, - size: number, - sizeFn: (s: number) => number - ): { controlCharacter: SizedControlCharacters; number: number; remainder: number } { - const result = { - controlCharacter, - number: Math.min(255, Math.floor(size / sizeFn(4)) - 1), - remainder: size - sizeFn(4) * (Math.min(255, Math.floor(size / sizeFn(4)) - 1) + 1) - }; - log("calculated control character data for size", size, "bytes and control character", ControlCharacters[controlCharacter], "as", result); - return result; - } - - /** - * Determine the best suitable control character for a given size, along with the remainder after - * using that control character. - * @param size The size in bytes - */ - private _bestControlCharacter( - size: number - ): - | { controlCharacter: ControlCharacters.ReadByte; number: -1; remainder: number } - | { controlCharacter: SizedControlCharacters; number: number; remainder: number } - | false { - let result: - | { controlCharacter: ControlCharacters.ReadByte; number: -1; remainder: number } - | { controlCharacter: SizedControlCharacters; number: number; remainder: number } - | false; - if (size < 1) result = false; - else if (size < 4) result = { controlCharacter: ControlCharacters.ReadByte, number: -1, remainder: size - 1 }; - else if (size < kb(4)) result = this._calculateControlCharacterRemainder(ControlCharacters.ReadBytes, size, bytes); - else if (size < mb(4)) result = this._calculateControlCharacterRemainder(ControlCharacters.ReadKB, size, kb); - else result = this._calculateControlCharacterRemainder(ControlCharacters.ReadMB, size, mb); - log("best control character for size", size, "is", result); - return result; + this._passThrough.write(data); } /** @@ -282,56 +96,6 @@ export default class DirectStream extends Emitter { * @param data The data to encode */ public encode(input: Buffer): Buffer { - log("encoding data", input); - const size = input.length; - let remaining = size; - // TODO: Change this when GB/TB/PB support is added. - - // The algorithm here is as follows: - // We repeatedly call to _bestControlCharacter and start populating an array with - // the results of each along with the data it contains, then we flatten the array - // and convert it to a Buffer. - const results = []; - while (remaining > 0) { - log("remaining size", remaining); - const data = this._bestControlCharacter(remaining); - /* istanbul ignore next: this should never happen */ - if (data === false) throw new Error("This should never happen"); - const { controlCharacter, number, remainder } = data; - if (controlCharacter === ControlCharacters.ReadByte) { - results.push([controlCharacter, input.subarray(size - remaining, size - remaining + 1)]); - remaining -= 1; - continue; - } - - /* istanbul ignore else */ - if ([ControlCharacters.ReadBytes, ControlCharacters.ReadKB, ControlCharacters.ReadMB].includes(controlCharacter)) { - let byteSize; - switch (controlCharacter) { - case ControlCharacters.ReadBytes: - byteSize = (number + 1) * 4; - break; - case ControlCharacters.ReadKB: - byteSize = kb(number + 1) * 4; - break; - case ControlCharacters.ReadMB: - byteSize = mb(number + 1) * 4; - break; - /* istanbul ignore next: this should never happen */ - default: - throw new Error("This should never happen"); - } - - results.push([controlCharacter, number, input.subarray(size - remaining, size - remaining + byteSize)]); - remaining -= byteSize; - continue; - } - - /* istanbul ignore next */ - throw new Error(`Unrecognized control character returned by _bestControlCharacter for size ${remainder}`); - } - const result = Buffer.from(results.flat().flatMap((x) => (x instanceof Buffer ? Array.from(x.values()) : x))); - log("final result", result); - return result; + return this._stream.encode(input); } } diff --git a/src/stream/index.ts b/src/stream/index.ts index 79f7531..bc19473 100644 --- a/src/stream/index.ts +++ b/src/stream/index.ts @@ -22,8 +22,9 @@ import debug from "debug"; import { Emitter } from "strict-event-emitter"; import { endStream, writeTo } from "../lib/promisifiedStreamHelpers"; +import { bytes, kb, mb } from "../lib/sizeHelpers"; +import type { SizedControlCharacters } from "./controlCharacters"; import { ControlCharacters } from "./controlCharacters"; -import DirectStream, { DirectStreamEvents } from "./DirectStream"; const log = debug("node-subdata-2:stream"); @@ -74,8 +75,15 @@ export type StreamEventArguments = { export default class Stream extends Emitter { /** The underlying socket that is serving this Stream */ private _socket: Duplex; - /** The underlying DirectStream, responsible for encoding */ - private _stream: DirectStream; + + /** The buffer of the stream */ + private _buffer: Buffer; + /** The size of the buffer */ + private _bufferSize: number; + /** The buffer of the currently in-progress */ + private _packet: Buffer; + /** The size of the currently in-progress packet */ + private _packetSize: number; /** * Create a new Stream. @@ -84,23 +92,16 @@ export default class Stream extends Emitter { public constructor(socket: Duplex) { super(); log("initializing"); + this._buffer = Buffer.alloc(0); + this._bufferSize = 0; + this._packet = Buffer.alloc(0); + this._packetSize = 0; + this._socket = socket; - this._stream = new DirectStream(); + this._socket.on("data", (data) => { log("feeding data", data); - this._stream.feed(data); - }); - this._stream.on(DirectStreamEvents.ReadReset, () => { - log("forwarding reset"); - this.emit(StreamEvents.Reset); - }); - this._stream.on(DirectStreamEvents.Read, (_size, _count, data) => { - log("forwarding read", data); - this.emit(StreamEvents.Read, data instanceof Buffer ? data : Buffer.from([data])); - }); - this._stream.on(DirectStreamEvents.Packet, (data) => { - log("forwarding packet", data); - this.emit(StreamEvents.Packet, data); + this._feed(data); }); this._socket.on("end", () => { log("forwarding close"); @@ -111,6 +112,255 @@ export default class Stream extends Emitter { }); } + /** + * Feed new data internally to this Stream. Called when new data is received + * over the socket. + * @param data The data to feed + */ + private _feed(data: Buffer): void { + log("feeding data", data, "size", data.length); + this._buffer = Buffer.concat([this._buffer, data]); + this._bufferSize += data.length; + log("new data", this._buffer, "size", this._bufferSize); + this._handleNewData(); + } + + /** + * Invalidates all previous data. Called internally when a {@link ControlCharacters.ReadReset} is encountered. + */ + private _readReset(): void { + log("triggering read reset"); + this._buffer = Buffer.alloc(0); + this._bufferSize = 0; + this._packet = Buffer.alloc(0); + this._packetSize = 0; + this.emit(StreamEvents.Reset); + } + + /** + * Command the stream to attempt to process new data. Called internally after {@link DirectStream.feed} is called. + */ + private _handleNewData(): void { + if (this._bufferSize === 0) return; + const char = this._buffer[0]; + + if (!(char.toString() in ControlCharacters)) + throw new Error(`I don't know what I am looking at! Encountered unknown control character 0x${this._buffer[0].toString(16)} in stream.`); + + log("handling control character", ControlCharacters[char]); + + switch (this._buffer[0]) { + case ControlCharacters.ReadReset: + const postReset = this._buffer.subarray(1); + log("triggering read reset, following data", postReset); + this._readReset(); + // Re-insert all data that follows the reset + log("re-feeding data"); + this._feed(postReset); + break; + case ControlCharacters.ReadByte: + if (this._bufferSize < 2) return; + this._emitRead({ controlCharacter: ControlCharacters.ReadByte, bytes: 1 }, this._buffer[1]); + this._shiftBuffer(2); + break; + case ControlCharacters.ReadBytes: + if (this._bufferSize < 2) return; + const byteSize = (this._buffer[1] + 1) * 4; + if (this._bufferSize < byteSize + 2) return; + this._emitRead({ controlCharacter: ControlCharacters.ReadBytes, bytes: byteSize }, this._buffer.subarray(2, byteSize + 2)); + this._shiftBuffer(byteSize + 2); + break; + case ControlCharacters.ReadKB: + if (this._bufferSize < 2) return; + const kbSize = kb(this._buffer[1] + 1) * 4; + if (this._bufferSize < kbSize + 2) return; + this._emitRead({ controlCharacter: ControlCharacters.ReadKB, bytes: kbSize }, this._buffer.subarray(2, kbSize + 2)); + this._shiftBuffer(kbSize + 2); + break; + case ControlCharacters.ReadMB: + if (this._bufferSize < 2) return; + const mbSize = mb(this._buffer[1] + 1) * 4; + if (this._bufferSize < mbSize + 2) return; + this._emitRead({ controlCharacter: ControlCharacters.ReadMB, bytes: mbSize }, this._buffer.subarray(2, mbSize + 2)); + this._shiftBuffer(mbSize + 2); + break; + case ControlCharacters.ReadGB: + case ControlCharacters.ReadTB: + case ControlCharacters.ReadPB: + // TODO: The SubData Java implementation does not yet support these either. This is a placeholder until we make sizes BigInts. + throw new Error("Internal error: ReadGB, ReadTB, and ReadPB are not yet implemented."); + case ControlCharacters.KeepAlive: + this._shiftBuffer(1); + break; + case ControlCharacters.EndOfPacket: + this._emitPacket(); + this._shiftBuffer(1); + break; + /* istanbul ignore next: this should never happen */ + default: + throw new Error( + `Internal bug: Unhandled control character 0x${this._buffer[0].toString(16)}. Please report this as an issue on our GitHub.` + ); + } + + if (this._bufferSize > 0) this._handleNewData(); + } + + /** + * Called internally to add to the packet buffer and emit the {@link DirectStreamEvents.Read} event. + * + * The object in the first parameter accepts: + * - controlCharacter: The control character that was read + * - bytes: The number of bytes that were read + * - numberOfType: The number of the type (e.g. for readKB(1), this should be 1) + * + * @param info See above + * @param data The actual data that was read + */ + private _emitRead(info: { controlCharacter: ControlCharacters.ReadByte; bytes: 1 }, data: number): void; + private _emitRead(info: { controlCharacter: SizedControlCharacters; bytes: number }, data: Buffer): void; + private _emitRead( + { controlCharacter, bytes }: { controlCharacter: SizedControlCharacters | ControlCharacters.ReadByte; bytes: number }, + data: Buffer | number + ): void { + log("triggering read event of type", ControlCharacters[controlCharacter], "with data", data, "and size", bytes, "bytes"); + this._packet = Buffer.concat([this._packet, data instanceof Buffer ? data : Buffer.from([data])]); + this._packetSize += bytes; + /* istanbul ignore if: this should never reasonably happen & is untestable without a LOT of work */ + if (this._packetSize >= Number.MAX_SAFE_INTEGER - 1000) { + this._packet = Buffer.alloc(0); + this._packetSize = 0; + throw new Error("Packet size exceeded or got very close to maximum safe integer. This is probably a bug."); + } + if (controlCharacter === ControlCharacters.ReadByte) { + // TODO: Remove this cast. Probably need to use some sorta weird discrimination over the types. + this.emit(StreamEvents.Read, Buffer.from([data as number])); + return; + } else this.emit(StreamEvents.Read, data as Buffer); + } + + /** Called internally to clear and emit a packet. */ + private _emitPacket(): void { + debug("triggering packet"); + this.emit(StreamEvents.Packet, this._packet); + this._packet = Buffer.alloc(0); + this._packetSize = 0; + } + + /** + * Shift the buffer by a certain amount. + * @param count The amount to shift the buffer by + */ + private _shiftBuffer(count: number): void { + log("shifting internal buffer by", count); + this._buffer = this._buffer.subarray(count); + this._bufferSize -= count; + } + + /** + * Do some funny math to calculate the return value for {@link DirectStream._bestControlCharacter}. + * + * @param controlCharacter The control character being suggested + * @param size The size in bytes + * @param sizeFn The size function to use (probably one of {@link kb}, {@link mb}, or {@link bytes}) + */ + private _calculateControlCharacterRemainder( + controlCharacter: SizedControlCharacters, + size: number, + sizeFn: (s: number) => number + ): { controlCharacter: SizedControlCharacters; number: number; remainder: number } { + const result = { + controlCharacter, + number: Math.min(255, Math.floor(size / sizeFn(4)) - 1), + remainder: size - sizeFn(4) * (Math.min(255, Math.floor(size / sizeFn(4)) - 1) + 1) + }; + log("calculated control character data for size", size, "bytes and control character", ControlCharacters[controlCharacter], "as", result); + return result; + } + + /** + * Determine the best suitable control character for a given size, along with the remainder after + * using that control character. + * @param size The size in bytes + */ + private _bestControlCharacter( + size: number + ): + | { controlCharacter: ControlCharacters.ReadByte; number: -1; remainder: number } + | { controlCharacter: SizedControlCharacters; number: number; remainder: number } + | false { + let result: + | { controlCharacter: ControlCharacters.ReadByte; number: -1; remainder: number } + | { controlCharacter: SizedControlCharacters; number: number; remainder: number } + | false; + if (size < 1) result = false; + else if (size < 4) result = { controlCharacter: ControlCharacters.ReadByte, number: -1, remainder: size - 1 }; + else if (size < kb(4)) result = this._calculateControlCharacterRemainder(ControlCharacters.ReadBytes, size, bytes); + else if (size < mb(4)) result = this._calculateControlCharacterRemainder(ControlCharacters.ReadKB, size, kb); + else result = this._calculateControlCharacterRemainder(ControlCharacters.ReadMB, size, mb); + log("best control character for size", size, "is", result); + return result; + } + + /** + * Encode a Buffer into Layer 1 stream data. + * @param data The data to encode + */ + // TODO: Make private? + public encode(input: Buffer): Buffer { + log("encoding data", input); + const size = input.length; + let remaining = size; + // TODO: Change this when GB/TB/PB support is added. + + // The algorithm here is as follows: + // We repeatedly call to _bestControlCharacter and start populating an array with + // the results of each along with the data it contains, then we flatten the array + // and convert it to a Buffer. + const results = []; + while (remaining > 0) { + log("remaining size", remaining); + const data = this._bestControlCharacter(remaining); + /* istanbul ignore next: this should never happen */ + if (data === false) throw new Error("This should never happen"); + const { controlCharacter, number, remainder } = data; + if (controlCharacter === ControlCharacters.ReadByte) { + results.push([controlCharacter, input.subarray(size - remaining, size - remaining + 1)]); + remaining -= 1; + continue; + } + + /* istanbul ignore else */ + if ([ControlCharacters.ReadBytes, ControlCharacters.ReadKB, ControlCharacters.ReadMB].includes(controlCharacter)) { + let byteSize; + switch (controlCharacter) { + case ControlCharacters.ReadBytes: + byteSize = (number + 1) * 4; + break; + case ControlCharacters.ReadKB: + byteSize = kb(number + 1) * 4; + break; + case ControlCharacters.ReadMB: + byteSize = mb(number + 1) * 4; + break; + /* istanbul ignore next: this should never happen */ + default: + throw new Error("This should never happen"); + } + + results.push([controlCharacter, number, input.subarray(size - remaining, size - remaining + byteSize)]); + remaining -= byteSize; + continue; + } + + /* istanbul ignore next */ + throw new Error(`Unrecognized control character returned by _bestControlCharacter for size ${remainder}`); + } + const result = Buffer.from(results.flat().flatMap((x) => (x instanceof Buffer ? Array.from(x.values()) : x))); + log("final result", result); + return result; + } + /** * Write and encode data to send to the provider * Note: You probably want {@link Stream.writePacket} instead. @@ -118,7 +368,7 @@ export default class Stream extends Emitter { */ public async write(data: Buffer): Promise { log("writing", data); - return writeTo(this._socket, this._stream.encode(data)); + return writeTo(this._socket, this.encode(data)); } /** @@ -137,7 +387,7 @@ export default class Stream extends Emitter { */ public async writePacket(data: Buffer): Promise { log("writing packet of", data); - return writeTo(this._socket, Buffer.concat([this._stream.encode(data), Buffer.from([ControlCharacters.EndOfPacket])])); + return writeTo(this._socket, Buffer.concat([this.encode(data), Buffer.from([ControlCharacters.EndOfPacket])])); } /** diff --git a/src/tests/stream/DirectStream.test.ts b/src/tests/stream/DirectStream.test.ts deleted file mode 100644 index 034bfbc..0000000 --- a/src/tests/stream/DirectStream.test.ts +++ /dev/null @@ -1,355 +0,0 @@ -/* - * node-subdata-2 - SubData 2 client for Node.js - * Copyright (C) 2022, 2023 LogN - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -import { kb, mb } from "../../lib/sizeHelpers"; -import { ControlCharacters } from "../../stream/controlCharacters"; -import DirectStream, { DirectStreamEvents } from "../../stream/DirectStream"; - -/** Like C's memncmp() for buffers. Used in the large-buffer tests to avoid long test times */ -function bufncmp(n: number, left: Buffer, right: Buffer): number { - const len = Math.min(left.length, right.length, n); - for (let i = 0; i < len; i++) { - if (left[i] < right[i]) return -1; - if (left[i] > right[i]) return 1; - } - return 0; -} - -/** Makes a new {@link DirectStream} and attaches listeners. */ -function makeNewStream(): { stream: DirectStream; onRead: jest.Mock; onPacket: jest.Mock; onReset: jest.Mock } { - const stream = new DirectStream(); - - const onRead = jest.fn(() => undefined); - const onPacket = jest.fn(() => undefined); - const onReset = jest.fn(() => undefined); - - stream.on(DirectStreamEvents.Read, onRead); - stream.on(DirectStreamEvents.Packet, onPacket); - stream.on(DirectStreamEvents.ReadReset, onReset); - - return { stream, onRead, onPacket, onReset }; -} - -describe("Stream", () => { - it("errors when provided jumbled data", () => { - const stream = new DirectStream(); - - expect(() => stream.feed(Buffer.from([0xff]))).toThrow( - "I don't know what I am looking at! Encountered unknown control character 0xff in stream." - ); - }); - - it("does nothing for KeepAlive", () => { - const { stream, onRead, onPacket, onReset } = makeNewStream(); - - stream.feed(Buffer.from([ControlCharacters.KeepAlive])); - - expect(onRead).not.toHaveBeenCalled(); - expect(onPacket).not.toHaveBeenCalled(); - expect(onReset).not.toHaveBeenCalled(); - }); - - it("single reset", () => { - const { stream, onRead, onPacket, onReset } = makeNewStream(); - - stream.feed(Buffer.from([ControlCharacters.ReadReset])); - - expect(onRead).not.toHaveBeenCalled(); - expect(onPacket).not.toHaveBeenCalled(); - expect(onReset).toHaveBeenCalledTimes(1); - expect(onReset).toHaveBeenCalledWith(); - }); - - it("multiple resets", () => { - const { stream, onRead, onPacket, onReset } = makeNewStream(); - - stream.feed(Buffer.from([ControlCharacters.ReadReset, ControlCharacters.ReadReset])); - - expect(onRead).not.toHaveBeenCalled(); - expect(onPacket).not.toHaveBeenCalled(); - expect(onReset).toHaveBeenCalledTimes(2); - expect(onReset).toHaveBeenLastCalledWith(); - expect(onReset).toHaveBeenNthCalledWith(1); - }); - - it("empty packet", () => { - const { stream, onRead, onPacket, onReset } = makeNewStream(); - - stream.feed(Buffer.from([ControlCharacters.EndOfPacket])); - - expect(onRead).not.toHaveBeenCalled(); - expect(onPacket).toHaveBeenCalledTimes(1); - expect(onPacket).toHaveBeenCalledWith(Buffer.alloc(0)); - expect(onReset).not.toHaveBeenCalled(); - }); - - it("single byte packet", () => { - const { stream, onRead, onPacket, onReset } = makeNewStream(); - - stream.feed(Buffer.from([ControlCharacters.ReadByte, 0x01, ControlCharacters.EndOfPacket])); - - expect(onRead).toHaveBeenCalledTimes(1); - expect(onRead).toHaveBeenCalledWith(ControlCharacters.ReadByte, 1, 1); - expect(onPacket).toHaveBeenCalledTimes(1); - expect(onPacket).toHaveBeenCalledWith(Buffer.from([1])); - expect(onReset).not.toHaveBeenCalled(); - }); - - it("waits when read is incomplete", () => { - const { stream, onRead, onPacket, onReset } = makeNewStream(); - - stream.feed(Buffer.from([ControlCharacters.ReadByte])); - - expect(onRead).not.toHaveBeenCalled(); - expect(onPacket).not.toHaveBeenCalled(); - expect(onReset).not.toHaveBeenCalled(); - - stream.feed(Buffer.from([0x01])); - - expect(onRead).toHaveBeenCalledTimes(1); - expect(onRead).toHaveBeenCalledWith(ControlCharacters.ReadByte, 1, 1); - expect(onPacket).not.toHaveBeenCalled(); - expect(onReset).not.toHaveBeenCalled(); - }); - - it("longer read", () => { - const { stream, onRead, onPacket, onReset } = makeNewStream(); - - stream.feed( - Buffer.from([ - ControlCharacters.ReadBytes, - 0x2, // 4 * (2+1) = 12 bytes - 0x0, - 0x1, - 0x2, - 0x3, - 0x4, - 0x5, - 0x6, - 0x7, - 0x8, - 0x9, - 0xa, - 0xb, // 12 bytes done - ControlCharacters.EndOfPacket - ]) - ); - - expect(onRead).toHaveBeenCalledTimes(1); - expect(onRead).toHaveBeenCalledWith( - ControlCharacters.ReadBytes, - 2, - Buffer.from([0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb]) - ); - expect(onPacket).toHaveBeenCalledTimes(1); - expect(onPacket).toHaveBeenCalledWith(Buffer.from([0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb])); - expect(onReset).not.toHaveBeenCalled(); - }); - - it("long read with missing size", () => { - const { stream, onRead, onPacket, onReset } = makeNewStream(); - - stream.feed(Buffer.from([ControlCharacters.ReadBytes])); - - expect(onRead).not.toHaveBeenCalled(); - expect(onPacket).not.toHaveBeenCalled(); - expect(onReset).not.toHaveBeenCalled(); - - stream.feed(Buffer.from([0x2, 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, ControlCharacters.EndOfPacket])); - - expect(onRead).toHaveBeenCalledTimes(1); - expect(onRead).toHaveBeenCalledWith( - ControlCharacters.ReadBytes, - 2, - Buffer.from([0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb]) - ); - expect(onPacket).toHaveBeenCalledTimes(1); - expect(onPacket).toHaveBeenCalledWith(Buffer.from([0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb])); - expect(onReset).not.toHaveBeenCalled(); - }); - - it("long read interrupted in the middle", () => { - const { stream, onRead, onPacket, onReset } = makeNewStream(); - - stream.feed(Buffer.from([ControlCharacters.ReadBytes, 0x02, 0x00, 0x01, 0x02, 0x03, 0x04])); - - expect(onRead).not.toHaveBeenCalled(); - expect(onPacket).not.toHaveBeenCalled(); - expect(onReset).not.toHaveBeenCalled(); - - stream.feed(Buffer.from([0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, ControlCharacters.EndOfPacket])); - - expect(onRead).toHaveBeenCalledTimes(1); - expect(onRead).toHaveBeenCalledWith( - ControlCharacters.ReadBytes, - 2, - Buffer.from([0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b]) - ); - expect(onPacket).toHaveBeenCalledTimes(1); - expect(onPacket).toHaveBeenCalledWith(Buffer.from([0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b])); - expect(onReset).not.toHaveBeenCalled(); - }); - - it("Read Reset after a read will actually reset", () => { - const { stream, onRead, onPacket, onReset } = makeNewStream(); - - stream.feed(Buffer.from([ControlCharacters.ReadByte, 0x00, ControlCharacters.ReadReset, ControlCharacters.EndOfPacket])); - - expect(onRead).toHaveBeenCalledTimes(1); - expect(onRead).toHaveBeenCalledWith(ControlCharacters.ReadByte, 1, 0); - expect(onPacket).toHaveBeenCalledTimes(1); - expect(onPacket).toHaveBeenCalledWith(Buffer.alloc(0)); - expect(onReset).toHaveBeenCalledTimes(1); - expect(onReset).toHaveBeenCalledWith(); - }); - - describe("_bestControlCharacter", () => { - it("nothing => returns false", () => { - const stream = new DirectStream(); - - // @ts-expect-error: testing a private method - expect(stream._bestControlCharacter(0)).toBe(false); - }); - - function factory(name: string, size: number, result: ReturnType): void { - it(name, () => { - const stream = new DirectStream(); - - // @ts-expect-error: testing a private method - expect(stream._bestControlCharacter(size)).toEqual(result); - }); - } - - factory("one byte => ReadByte", 1, { controlCharacter: ControlCharacters.ReadByte, number: -1, remainder: 0 }); - factory("two bytes => ReadByte", 2, { controlCharacter: ControlCharacters.ReadByte, number: -1, remainder: 1 }); - factory("three bytes => ReadByte", 3, { controlCharacter: ControlCharacters.ReadByte, number: -1, remainder: 2 }); - factory("four bytes => ReadBytes", 4, { controlCharacter: ControlCharacters.ReadBytes, number: 0, remainder: 0 }); - factory("five bytes => ReadBytes", 5, { controlCharacter: ControlCharacters.ReadBytes, number: 0, remainder: 1 }); - factory("six bytes => ReadBytes", 6, { controlCharacter: ControlCharacters.ReadBytes, number: 0, remainder: 2 }); - factory("seven bytes => ReadBytes", 7, { controlCharacter: ControlCharacters.ReadBytes, number: 0, remainder: 3 }); - factory("eight bytes => ReadBytes", 8, { controlCharacter: ControlCharacters.ReadBytes, number: 1, remainder: 0 }); - factory("nine bytes => ReadBytes", 9, { controlCharacter: ControlCharacters.ReadBytes, number: 1, remainder: 1 }); - factory("ten bytes => ReadBytes", 10, { controlCharacter: ControlCharacters.ReadBytes, number: 1, remainder: 2 }); - factory("eleven bytes => ReadBytes", 11, { controlCharacter: ControlCharacters.ReadBytes, number: 1, remainder: 3 }); - factory("twelve bytes => ReadBytes", 12, { controlCharacter: ControlCharacters.ReadBytes, number: 2, remainder: 0 }); - factory("thirteen bytes => ReadBytes", 13, { controlCharacter: ControlCharacters.ReadBytes, number: 2, remainder: 1 }); - factory("fourteen bytes => ReadBytes", 14, { controlCharacter: ControlCharacters.ReadBytes, number: 2, remainder: 2 }); - factory("fifteen bytes => ReadBytes", 15, { controlCharacter: ControlCharacters.ReadBytes, number: 2, remainder: 3 }); - // That's enough. - - factory("2000 bytes yields number=255", 2000, { controlCharacter: ControlCharacters.ReadBytes, number: 255, remainder: 2000 - 4 * 256 }); - - factory("4kb => ReadKB", kb(4), { controlCharacter: ControlCharacters.ReadKB, number: 0, remainder: 0 }); - factory("5kb => ReadKB", kb(5), { controlCharacter: ControlCharacters.ReadKB, number: 0, remainder: kb(1) }); - factory("8kb => ReadKB", kb(8), { controlCharacter: ControlCharacters.ReadKB, number: 1, remainder: 0 }); - factory("2000kb yields number=255", kb(2000), { controlCharacter: ControlCharacters.ReadKB, number: 255, remainder: kb(2000) - 4 * kb(256) }); - - factory("4mb => ReadMB", mb(4), { controlCharacter: ControlCharacters.ReadMB, number: 0, remainder: 0 }); - factory("5mb => ReadMB", mb(5), { controlCharacter: ControlCharacters.ReadMB, number: 0, remainder: mb(1) }); - factory("8mb => ReadMB", mb(8), { controlCharacter: ControlCharacters.ReadMB, number: 1, remainder: 0 }); - factory("2000mb yields number=255", mb(2000), { controlCharacter: ControlCharacters.ReadMB, number: 255, remainder: mb(2000) - 4 * mb(256) }); - }); - - describe("encode", () => { - function factory(name: string, input: Buffer, output: Buffer): void { - it(name, () => { - const stream = new DirectStream(); - - expect(stream.encode(input)).toEqual(output); - }); - } - - factory("nothing => returns empty buffer", Buffer.alloc(0), Buffer.alloc(0)); - factory("one byte => read-byte once", Buffer.from([0x00]), Buffer.from([ControlCharacters.ReadByte, 0x00])); - factory( - "two bytes => read-byte twice", - Buffer.from([0x00, 0x01]), - Buffer.from([ControlCharacters.ReadByte, 0x00, ControlCharacters.ReadByte, 0x01]) - ); - factory( - "three bytes => read-byte three times", - Buffer.from([0x00, 0x01, 0x02]), - Buffer.from([ControlCharacters.ReadByte, 0x00, ControlCharacters.ReadByte, 0x01, ControlCharacters.ReadByte, 0x02]) - ); - factory( - "four bytes => read-bytes once", - Buffer.from([0x00, 0x01, 0x02, 0x03]), - Buffer.from([ControlCharacters.ReadBytes, 0x00, 0x00, 0x01, 0x02, 0x03]) - ); - factory( - "five bytes => read-bytes once and a read-byte", - Buffer.from([0x00, 0x01, 0x02, 0x03, 0x04]), - Buffer.from([ControlCharacters.ReadBytes, 0x00, 0x00, 0x01, 0x02, 0x03, ControlCharacters.ReadByte, 0x04]) - ); - factory( - "eight bytes => read-bytes 2x", - Buffer.from([0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]), - Buffer.from([ControlCharacters.ReadBytes, 0x01, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]) - ); - factory( - "15 bytes does what you'd expect", - Buffer.alloc(15).fill(0xff), - Buffer.from([ - ControlCharacters.ReadBytes, - 0x02, - 0xff, - 0xff, - 0xff, - 0xff, - 0xff, - 0xff, - 0xff, - 0xff, - 0xff, - 0xff, - 0xff, - 0xff, - ControlCharacters.ReadByte, - 0xff, - ControlCharacters.ReadByte, - 0xff, - ControlCharacters.ReadByte, - 0xff - ]) - ); - it("4kb", () => { - const stream = new DirectStream(); - - const expected = Buffer.concat([Buffer.from([ControlCharacters.ReadKB, 0x00]), Buffer.alloc(kb(4))]); - - // Running .toEqual is extremely slow on huge buffers, so we use a custom bufncmp function - expect(bufncmp(3, stream.encode(expected.subarray(2)), expected)).toBe(0); - }); - it("4mb", () => { - const stream = new DirectStream(); - - const expected = Buffer.concat([Buffer.from([ControlCharacters.ReadMB, 0x00]), Buffer.alloc(mb(4))]); - - // See above - expect(bufncmp(3, stream.encode(expected.subarray(2)), expected)).toBe(0); - }); - }); - - it("fails for unsupported read sizes", () => { - const stream = new DirectStream(); - - expect(() => stream.feed(Buffer.from([ControlCharacters.ReadGB]))).toThrow( - "Internal error: ReadGB, ReadTB, and ReadPB are not yet implemented." - ); - }); -}); diff --git a/src/tests/stream/index.test.ts b/src/tests/stream/index.test.ts index b4f206c..1be448a 100644 --- a/src/tests/stream/index.test.ts +++ b/src/tests/stream/index.test.ts @@ -16,11 +16,23 @@ * along with this program. If not, see . */ +import { Duplex } from "node:stream"; + import ConnectedDuplex from "../../lib/ConnectedDuplex"; import { endStream, writeTo } from "../../lib/promisifiedStreamHelpers"; +import { kb, mb } from "../../lib/sizeHelpers"; import Stream, { StreamEvents } from "../../stream"; import { ControlCharacters } from "../../stream/controlCharacters"; -import DirectStream from "../../stream/DirectStream"; + +/** A duplex stream which does nothing. */ +class NullDuplex extends Duplex { + public _read(): void { + this.push(null); + } + public _write(_chunk: unknown, _encoding: unknown, callback: (error?: Error | null | undefined) => void): void { + callback(); + } +} /** Makes a new {@link Stream} along with mock functions and a {@link ManualIOProvider} */ function makeNewStream(): { @@ -58,109 +70,410 @@ function charCode(char: string): number { return char.charCodeAt(0); } -const encodeMock = jest.spyOn(DirectStream.prototype, "encode"); -const feedMock = jest.spyOn(DirectStream.prototype, "feed"); +/** Like C's memcmp() for buffers. Used in the large-buffer tests to avoid long test times */ +function bufncmp(n: number, left: Buffer, right: Buffer): number { + const len = Math.min(left.length, right.length, n); + for (let i = 0; i < len; i++) { + if (left[i] < right[i]) return -1; + if (left[i] > right[i]) return 1; + } + return 0; +} + +const encodeMock = jest.spyOn(Stream.prototype, "encode"); beforeEach(() => { encodeMock.mockClear(); - feedMock.mockClear(); }); describe("Stream", () => { - it("properly encodes when writing", async () => { - const { stream, onRemoteRx } = makeNewStream(); + describe("overall behavior", () => { + it("properly encodes when writing", async () => { + const { stream, onRemoteRx } = makeNewStream(); - await stream.write(Buffer.from("Hello, World!")); + await stream.write(Buffer.from("Hello, World!")); - expect(encodeMock).toHaveBeenCalledWith(Buffer.from("Hello, World!")); - expect(onRemoteRx).toHaveBeenCalledWith(encodeMock.mock.results[0].value); - }); + expect(encodeMock).toHaveBeenCalledWith(Buffer.from("Hello, World!")); + expect(onRemoteRx).toHaveBeenCalledWith(encodeMock.mock.results[0].value); + }); - it("properly decodes when receiving", async () => { - const { remote, onPacket } = makeNewStream(); - const direct = new DirectStream(); + it("properly decodes when receiving", async () => { + const { stream, remote, onPacket } = makeNewStream(); - const data = Buffer.concat([direct.encode(Buffer.from("Hello, World!")), Buffer.from([ControlCharacters.EndOfPacket])]); + const data = Buffer.concat([stream.encode(Buffer.from("Hello, World!")), Buffer.from([ControlCharacters.EndOfPacket])]); - await writeTo(remote, data); + await writeTo(remote, data); - expect(feedMock).toBeCalledTimes(1); - expect(feedMock).toBeCalledWith(data); - expect(onPacket).toBeCalledTimes(1); - expect(onPacket).toBeCalledWith(Buffer.from("Hello, World!")); - }); + expect(onPacket).toBeCalledTimes(1); + expect(onPacket).toBeCalledWith(Buffer.from("Hello, World!")); + }); - it("emits Reset when a remote read reset is encountered and does discard the packet so far", async () => { - const { remote, onPacket, onRead, onReset } = makeNewStream(); - const direct = new DirectStream(); + it("emits Reset when a remote read reset is encountered and does discard the packet so far", async () => { + const { stream, remote, onPacket, onRead, onReset } = makeNewStream(); - await writeTo(remote, direct.encode(Buffer.from("Hello, World!"))); + await writeTo(remote, stream.encode(Buffer.from("Hello, World!"))); - expect(onPacket).toBeCalledTimes(0); - expect(onRead).toBeCalledTimes(2); - expect(onRead).toHaveBeenNthCalledWith(1, Buffer.from("Hello, World")); - expect(onRead).toHaveBeenNthCalledWith(2, Buffer.from("!")); + expect(onPacket).toBeCalledTimes(0); + expect(onRead).toBeCalledTimes(2); + expect(onRead).toHaveBeenNthCalledWith(1, Buffer.from("Hello, World")); + expect(onRead).toHaveBeenNthCalledWith(2, Buffer.from("!")); - await writeTo(remote, Buffer.from([ControlCharacters.ReadReset])); + await writeTo(remote, Buffer.from([ControlCharacters.ReadReset])); - expect(onReset).toBeCalledTimes(1); - expect(onPacket).toBeCalledTimes(0); - expect(onRead).toBeCalledTimes(2); + expect(onReset).toBeCalledTimes(1); + expect(onPacket).toBeCalledTimes(0); + expect(onRead).toBeCalledTimes(2); - await writeTo(remote, Buffer.from([ControlCharacters.EndOfPacket])); + await writeTo(remote, Buffer.from([ControlCharacters.EndOfPacket])); - expect(onReset).toBeCalledTimes(1); - expect(onPacket).toBeCalledTimes(1); - expect(onPacket).toBeCalledWith(Buffer.alloc(0)); - expect(onRead).toBeCalledTimes(2); - }); + expect(onReset).toBeCalledTimes(1); + expect(onPacket).toBeCalledTimes(1); + expect(onPacket).toBeCalledWith(Buffer.alloc(0)); + expect(onRead).toBeCalledTimes(2); + }); - it("can terminate packets", async () => { - const { stream, onRemoteRx } = makeNewStream(); + it("can terminate packets", async () => { + const { stream, onRemoteRx } = makeNewStream(); - await stream.write(Buffer.from("A")); - await stream.endPacket(); + await stream.write(Buffer.from("A")); + await stream.endPacket(); - expect(onRemoteRx).toBeCalledTimes(2); - expect(onRemoteRx).toHaveBeenNthCalledWith(1, Buffer.from([ControlCharacters.ReadByte, charCode("A")])); - expect(onRemoteRx).toHaveBeenNthCalledWith(2, Buffer.from([ControlCharacters.EndOfPacket])); - }); + expect(onRemoteRx).toBeCalledTimes(2); + expect(onRemoteRx).toHaveBeenNthCalledWith(1, Buffer.from([ControlCharacters.ReadByte, charCode("A")])); + expect(onRemoteRx).toHaveBeenNthCalledWith(2, Buffer.from([ControlCharacters.EndOfPacket])); + }); - it("writePacket will also do the same thing", async () => { - const { stream, onRemoteRx } = makeNewStream(); + it("writePacket will also do the same thing", async () => { + const { stream, onRemoteRx } = makeNewStream(); - await stream.writePacket(Buffer.from("A")); + await stream.writePacket(Buffer.from("A")); - expect(onRemoteRx).toBeCalledTimes(1); - expect(onRemoteRx).toHaveBeenCalledWith(Buffer.from([ControlCharacters.ReadByte, charCode("A"), ControlCharacters.EndOfPacket])); - }); + expect(onRemoteRx).toBeCalledTimes(1); + expect(onRemoteRx).toHaveBeenCalledWith(Buffer.from([ControlCharacters.ReadByte, charCode("A"), ControlCharacters.EndOfPacket])); + }); + + it("can transmit read reset", async () => { + const { stream, onRemoteRx } = makeNewStream(); - it("can transmit read reset", async () => { - const { stream, onRemoteRx } = makeNewStream(); + await stream.write(Buffer.from("A")); + await stream.readReset(); - await stream.write(Buffer.from("A")); - await stream.readReset(); + expect(onRemoteRx).toBeCalledTimes(2); + expect(onRemoteRx).toHaveBeenNthCalledWith(1, Buffer.from([ControlCharacters.ReadByte, charCode("A")])); + expect(onRemoteRx).toHaveBeenNthCalledWith(2, Buffer.from([ControlCharacters.ReadReset])); + }); + + describe("close", () => { + it("received from remote", async () => { + const { remote, onLocalRxEnd } = makeNewStream(); + + await endStream(remote); + + expect(onLocalRxEnd).toBeCalledTimes(1); + }); - expect(onRemoteRx).toBeCalledTimes(2); - expect(onRemoteRx).toHaveBeenNthCalledWith(1, Buffer.from([ControlCharacters.ReadByte, charCode("A")])); - expect(onRemoteRx).toHaveBeenNthCalledWith(2, Buffer.from([ControlCharacters.ReadReset])); + it("sent from local", async () => { + const { stream, onRemoteRxEnd } = makeNewStream(); + + await stream.close(); + + expect(onRemoteRxEnd).toBeCalledTimes(1); + }); + }); }); + describe("layer-1 handling", () => { + // TODO: Hard to test because of the function throwing an error from within the event queue + // instead of synchronously. + it.todo("errors when provided jumbled data"); + it.todo("fails for unsupported read sizes"); + + it("does nothing for KeepAlive", async () => { + const { remote, onRead, onPacket, onReset } = makeNewStream(); - describe("close", () => { - it("received from remote", async () => { - const { remote, onLocalRxEnd } = makeNewStream(); + await writeTo(remote, Buffer.from([ControlCharacters.KeepAlive])); - await endStream(remote); + expect(onRead).not.toHaveBeenCalled(); + expect(onPacket).not.toHaveBeenCalled(); + expect(onReset).not.toHaveBeenCalled(); + }); + + it("single reset", async () => { + const { remote, onRead, onPacket, onReset } = makeNewStream(); + + await writeTo(remote, Buffer.from([ControlCharacters.ReadReset])); + + expect(onRead).not.toHaveBeenCalled(); + expect(onPacket).not.toHaveBeenCalled(); + expect(onReset).toHaveBeenCalledTimes(1); + expect(onReset).toHaveBeenCalledWith(); + }); + + it("multiple resets", async () => { + const { remote, onRead, onPacket, onReset } = makeNewStream(); - expect(onLocalRxEnd).toBeCalledTimes(1); + await writeTo(remote, Buffer.from([ControlCharacters.ReadReset, ControlCharacters.ReadReset])); + + expect(onRead).not.toHaveBeenCalled(); + expect(onPacket).not.toHaveBeenCalled(); + expect(onReset).toHaveBeenCalledTimes(2); + expect(onReset).toHaveBeenLastCalledWith(); + expect(onReset).toHaveBeenNthCalledWith(1); }); - it("sent from local", async () => { - const { stream, onRemoteRxEnd } = makeNewStream(); + it("empty packet", async () => { + const { remote, onRead, onPacket, onReset } = makeNewStream(); + + await writeTo(remote, Buffer.from([ControlCharacters.EndOfPacket])); - await stream.close(); + expect(onRead).not.toHaveBeenCalled(); + expect(onPacket).toHaveBeenCalledTimes(1); + expect(onPacket).toHaveBeenCalledWith(Buffer.alloc(0)); + expect(onReset).not.toHaveBeenCalled(); + }); + + it("single byte packet", async () => { + const { remote, onRead, onPacket, onReset } = makeNewStream(); + + await writeTo(remote, Buffer.from([ControlCharacters.ReadByte, 0x01, ControlCharacters.EndOfPacket])); + + expect(onRead).toHaveBeenCalledTimes(1); + expect(onRead).toHaveBeenCalledWith(Buffer.from([1])); + expect(onPacket).toHaveBeenCalledTimes(1); + expect(onPacket).toHaveBeenCalledWith(Buffer.from([1])); + expect(onReset).not.toHaveBeenCalled(); + }); + + it("waits when read is incomplete", async () => { + const { remote, onRead, onPacket, onReset } = makeNewStream(); + + await writeTo(remote, Buffer.from([ControlCharacters.ReadByte])); + + expect(onRead).not.toHaveBeenCalled(); + expect(onPacket).not.toHaveBeenCalled(); + expect(onReset).not.toHaveBeenCalled(); + + await writeTo(remote, Buffer.from([0x01, ControlCharacters.EndOfPacket])); + + expect(onRead).toHaveBeenCalledTimes(1); + expect(onRead).toHaveBeenCalledWith(Buffer.from([1])); + expect(onPacket).toHaveBeenCalledTimes(1); + expect(onPacket).toHaveBeenCalledWith(Buffer.from([1])); + expect(onReset).not.toHaveBeenCalled(); + }); + + it("longer read", async () => { + const { remote, onRead, onPacket, onReset } = makeNewStream(); + + await writeTo( + remote, + Buffer.from([ + ControlCharacters.ReadBytes, + 0x2, // 4 * (2+1) = 12 bytes + 0x0, + 0x1, + 0x2, + 0x3, + 0x4, + 0x5, + 0x6, + 0x7, + 0x8, + 0x9, + 0xa, + 0xb, // 12 bytes done + ControlCharacters.EndOfPacket + ]) + ); + + expect(onRead).toHaveBeenCalledTimes(1); + expect(onRead).toHaveBeenCalledWith(Buffer.from([0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb])); + expect(onPacket).toHaveBeenCalledTimes(1); + expect(onPacket).toHaveBeenCalledWith(Buffer.from([0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb])); + expect(onReset).not.toHaveBeenCalled(); + }); + + it("long read with missing size", async () => { + const { remote, onRead, onPacket, onReset } = makeNewStream(); + + await writeTo(remote, Buffer.from([ControlCharacters.ReadBytes])); + + expect(onRead).not.toHaveBeenCalled(); + expect(onPacket).not.toHaveBeenCalled(); + expect(onReset).not.toHaveBeenCalled(); + + await writeTo(remote, Buffer.from([0x2, 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, ControlCharacters.EndOfPacket])); + + expect(onRead).toHaveBeenCalledTimes(1); + expect(onRead).toHaveBeenCalledWith(Buffer.from([0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb])); + expect(onPacket).toHaveBeenCalledTimes(1); + expect(onPacket).toHaveBeenCalledWith(Buffer.from([0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb])); + expect(onReset).not.toHaveBeenCalled(); + }); + + it("long read interrupted in the middle", async () => { + const { remote, onRead, onPacket, onReset } = makeNewStream(); + + await writeTo(remote, Buffer.from([ControlCharacters.ReadBytes, 0x02, 0x00, 0x01, 0x02, 0x03, 0x04])); + + expect(onRead).not.toHaveBeenCalled(); + expect(onPacket).not.toHaveBeenCalled(); + expect(onReset).not.toHaveBeenCalled(); + + await writeTo(remote, Buffer.from([0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, ControlCharacters.EndOfPacket])); + + expect(onRead).toHaveBeenCalledTimes(1); + expect(onRead).toHaveBeenCalledWith(Buffer.from([0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b])); + expect(onPacket).toHaveBeenCalledTimes(1); + expect(onPacket).toHaveBeenCalledWith(Buffer.from([0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b])); + expect(onReset).not.toHaveBeenCalled(); + }); + + it("Read Reset after a read will actually reset", async () => { + const { remote, onRead, onPacket, onReset } = makeNewStream(); + + await writeTo(remote, Buffer.from([ControlCharacters.ReadByte, 0x00, ControlCharacters.ReadReset, ControlCharacters.EndOfPacket])); + + expect(onRead).toHaveBeenCalledTimes(1); + expect(onRead).toHaveBeenCalledWith(Buffer.from([0])); + expect(onPacket).toHaveBeenCalledTimes(1); + expect(onPacket).toHaveBeenCalledWith(Buffer.alloc(0)); + expect(onReset).toHaveBeenCalledTimes(1); + expect(onReset).toHaveBeenCalledWith(); + }); + + describe("_bestControlCharacter", () => { + it("nothing => returns false", () => { + const stream = new Stream(new NullDuplex()); + + // @ts-expect-error: testing a private method + expect(stream._bestControlCharacter(0)).toBe(false); + }); + + function factory(name: string, size: number, result: ReturnType): void { + it(name, () => { + const stream = new Stream(new NullDuplex()); + + // @ts-expect-error: testing a private method + expect(stream._bestControlCharacter(size)).toEqual(result); + }); + } + + factory("one byte => ReadByte", 1, { controlCharacter: ControlCharacters.ReadByte, number: -1, remainder: 0 }); + factory("two bytes => ReadByte", 2, { controlCharacter: ControlCharacters.ReadByte, number: -1, remainder: 1 }); + factory("three bytes => ReadByte", 3, { controlCharacter: ControlCharacters.ReadByte, number: -1, remainder: 2 }); + factory("four bytes => ReadBytes", 4, { controlCharacter: ControlCharacters.ReadBytes, number: 0, remainder: 0 }); + factory("five bytes => ReadBytes", 5, { controlCharacter: ControlCharacters.ReadBytes, number: 0, remainder: 1 }); + factory("six bytes => ReadBytes", 6, { controlCharacter: ControlCharacters.ReadBytes, number: 0, remainder: 2 }); + factory("seven bytes => ReadBytes", 7, { controlCharacter: ControlCharacters.ReadBytes, number: 0, remainder: 3 }); + factory("eight bytes => ReadBytes", 8, { controlCharacter: ControlCharacters.ReadBytes, number: 1, remainder: 0 }); + factory("nine bytes => ReadBytes", 9, { controlCharacter: ControlCharacters.ReadBytes, number: 1, remainder: 1 }); + factory("ten bytes => ReadBytes", 10, { controlCharacter: ControlCharacters.ReadBytes, number: 1, remainder: 2 }); + factory("eleven bytes => ReadBytes", 11, { controlCharacter: ControlCharacters.ReadBytes, number: 1, remainder: 3 }); + factory("twelve bytes => ReadBytes", 12, { controlCharacter: ControlCharacters.ReadBytes, number: 2, remainder: 0 }); + factory("thirteen bytes => ReadBytes", 13, { controlCharacter: ControlCharacters.ReadBytes, number: 2, remainder: 1 }); + factory("fourteen bytes => ReadBytes", 14, { controlCharacter: ControlCharacters.ReadBytes, number: 2, remainder: 2 }); + factory("fifteen bytes => ReadBytes", 15, { controlCharacter: ControlCharacters.ReadBytes, number: 2, remainder: 3 }); + // That's enough. + + factory("2000 bytes yields number=255", 2000, { controlCharacter: ControlCharacters.ReadBytes, number: 255, remainder: 2000 - 4 * 256 }); + + factory("4kb => ReadKB", kb(4), { controlCharacter: ControlCharacters.ReadKB, number: 0, remainder: 0 }); + factory("5kb => ReadKB", kb(5), { controlCharacter: ControlCharacters.ReadKB, number: 0, remainder: kb(1) }); + factory("8kb => ReadKB", kb(8), { controlCharacter: ControlCharacters.ReadKB, number: 1, remainder: 0 }); + factory("2000kb yields number=255", kb(2000), { + controlCharacter: ControlCharacters.ReadKB, + number: 255, + remainder: kb(2000) - 4 * kb(256) + }); + + factory("4mb => ReadMB", mb(4), { controlCharacter: ControlCharacters.ReadMB, number: 0, remainder: 0 }); + factory("5mb => ReadMB", mb(5), { controlCharacter: ControlCharacters.ReadMB, number: 0, remainder: mb(1) }); + factory("8mb => ReadMB", mb(8), { controlCharacter: ControlCharacters.ReadMB, number: 1, remainder: 0 }); + factory("2000mb yields number=255", mb(2000), { + controlCharacter: ControlCharacters.ReadMB, + number: 255, + remainder: mb(2000) - 4 * mb(256) + }); + }); - expect(onRemoteRxEnd).toBeCalledTimes(1); + describe("encode", () => { + function factory(name: string, input: Buffer, output: Buffer): void { + it(name, () => { + const stream = new Stream(new NullDuplex()); + + expect(stream.encode(input)).toEqual(output); + }); + } + + factory("nothing => returns empty buffer", Buffer.alloc(0), Buffer.alloc(0)); + factory("one byte => read-byte once", Buffer.from([0x00]), Buffer.from([ControlCharacters.ReadByte, 0x00])); + factory( + "two bytes => read-byte twice", + Buffer.from([0x00, 0x01]), + Buffer.from([ControlCharacters.ReadByte, 0x00, ControlCharacters.ReadByte, 0x01]) + ); + factory( + "three bytes => read-byte three times", + Buffer.from([0x00, 0x01, 0x02]), + Buffer.from([ControlCharacters.ReadByte, 0x00, ControlCharacters.ReadByte, 0x01, ControlCharacters.ReadByte, 0x02]) + ); + factory( + "four bytes => read-bytes once", + Buffer.from([0x00, 0x01, 0x02, 0x03]), + Buffer.from([ControlCharacters.ReadBytes, 0x00, 0x00, 0x01, 0x02, 0x03]) + ); + factory( + "five bytes => read-bytes once and a read-byte", + Buffer.from([0x00, 0x01, 0x02, 0x03, 0x04]), + Buffer.from([ControlCharacters.ReadBytes, 0x00, 0x00, 0x01, 0x02, 0x03, ControlCharacters.ReadByte, 0x04]) + ); + factory( + "eight bytes => read-bytes 2x", + Buffer.from([0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]), + Buffer.from([ControlCharacters.ReadBytes, 0x01, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]) + ); + factory( + "15 bytes does what you'd expect", + Buffer.alloc(15).fill(0xff), + Buffer.from([ + ControlCharacters.ReadBytes, + 0x02, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + ControlCharacters.ReadByte, + 0xff, + ControlCharacters.ReadByte, + 0xff, + ControlCharacters.ReadByte, + 0xff + ]) + ); + it("4kb", () => { + const stream = new Stream(new NullDuplex()); + + const expected = Buffer.concat([Buffer.from([ControlCharacters.ReadKB, 0x00]), Buffer.alloc(kb(4))]); + + // Running .toEqual is extremely slow on huge buffers, so we use a custom bufncmp function + expect(bufncmp(3, stream.encode(expected.subarray(2)), expected)).toBe(0); + }); + it("4mb", () => { + const stream = new Stream(new NullDuplex()); + + const expected = Buffer.concat([Buffer.from([ControlCharacters.ReadMB, 0x00]), Buffer.alloc(mb(4))]); + + // See above + expect(bufncmp(3, stream.encode(expected.subarray(2)), expected)).toBe(0); + }); }); }); });