diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..b942939 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "cSpell.words": ["TCPIO"] +} diff --git a/src/index.ts b/src/index.ts index 0bf1a01..58010ba 100644 --- a/src/index.ts +++ b/src/index.ts @@ -18,7 +18,27 @@ import SafeEventEmitter from "./lib/SafeEventEmitter.js"; import { bytes, kb, mb } from "./lib/sizeHelpers.js"; +import Stream from "./stream/"; import { ControlCharacters, SizedControlCharacters } from "./stream/controlCharacters.js"; import DirectStream, { DirectStreamEventArguments, DirectStreamEvents } from "./stream/DirectStream.js"; +import IOProvider, { IOProviderEventArguments, IOProviderEvents } from "./stream/providers/IOProvider.js"; +import ManualIOProvider from "./stream/providers/ManualIOProvider.js"; +import TCPIOProvider from "./stream/providers/TCPIOProvider.js"; -export { bytes, ControlCharacters, DirectStream, DirectStreamEventArguments, DirectStreamEvents, kb, mb, SafeEventEmitter, SizedControlCharacters }; +export { + bytes, + ControlCharacters, + DirectStream, + DirectStreamEventArguments, + DirectStreamEvents, + IOProvider, + IOProviderEventArguments, + IOProviderEvents, + kb, + ManualIOProvider, + mb, + SafeEventEmitter, + SizedControlCharacters, + Stream, + TCPIOProvider +}; diff --git a/src/stream/DirectStream.ts b/src/stream/DirectStream.ts index 360e80a..42b2cd0 100644 --- a/src/stream/DirectStream.ts +++ b/src/stream/DirectStream.ts @@ -35,7 +35,7 @@ export type DirectStreamEventArguments = { * Fires with three arguments, the size read in ControlCharacters, the number of that size read, and the data read. * ReadByte will return Number, the rest will return Buffer. */ - [DirectStreamEvents.Read]: [ControlCharacters.ReadByte, 1, number] | [Exclude, number, Buffer]; + [DirectStreamEvents.Read]: [ControlCharacters.ReadByte, 1, number] | [SizedControlCharacters, number, Buffer]; /** Emitted when an End of Packet is received along with (size, data) parameters. */ [DirectStreamEvents.Packet]: [number, Buffer]; }; @@ -177,12 +177,13 @@ export default class DirectStream extends SafeEventEmitter; bytes: number; numberOfType: number }, - data: Buffer - ): void; - private _emitRead( - { controlCharacter, bytes, numberOfType }: { controlCharacter: ControlCharacters; bytes: number; numberOfType: number }, + { + controlCharacter, + bytes, + numberOfType + }: { controlCharacter: SizedControlCharacters | ControlCharacters.ReadByte; bytes: number; numberOfType: number }, data: Buffer | number ): void { this._packet = Buffer.concat([this._packet, data instanceof Buffer ? data : Buffer.from([data])]); diff --git a/src/stream/index.ts b/src/stream/index.ts new file mode 100644 index 0000000..bf6d570 --- /dev/null +++ b/src/stream/index.ts @@ -0,0 +1,99 @@ +/* + * node-subdata-2 - SubData 2 wrapper for Node.js + * Copyright (C) 2022 LogN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +import SafeEventEmitter from "../lib/SafeEventEmitter"; +import { ControlCharacters } from "./controlCharacters"; +import DirectStream, { DirectStreamEvents } from "./DirectStream"; +import type IOProvider from "./providers/IOProvider"; +import { IOProviderEvents } from "./providers/IOProvider"; + +export enum StreamEvents { + Read = "read", + Packet = "packet", + Reset = "reset", + Close = "close" +} + +export type StreamEventArguments = { + [StreamEvents.Read]: [Buffer]; + [StreamEvents.Packet]: [number, Buffer]; + [StreamEvents.Reset]: []; + [StreamEvents.Close]: []; +}; + +export default class Stream extends SafeEventEmitter { + private _provider: IOProvider; + private _stream: DirectStream; + + /** @param provider The underlying {@link IOProvider} to use */ + public constructor(provider: IOProvider) { + super(); + this._provider = provider; + this._stream = new DirectStream(); + this._provider.on(IOProviderEvents.Data, (data) => { + this._stream.feed(data); + }); + this._stream.on(DirectStreamEvents.ReadReset, () => { + this.emit(StreamEvents.Reset); + }); + this._stream.on(DirectStreamEvents.Read, (_size, _count, data) => { + this.emit(StreamEvents.Read, data instanceof Buffer ? data : Buffer.from([data])); + }); + this._stream.on(DirectStreamEvents.Packet, (size, data) => { + this.emit(StreamEvents.Packet, size, data); + }); + this._provider.on(IOProviderEvents.Close, () => { + this.emit(StreamEvents.Close); + }); + } + + /** + * Write and encode data to send to the provider + */ + public write(data: Buffer): void { + this._provider.write(this._stream.encode(data)); + } + + /** + * Terminates the current packet. + */ + public endPacket(): void { + this._provider.write(Buffer.from([ControlCharacters.EndOfPacket])); + } + + /** + * Write data and end the current packet. This is equivalent to calling {@link Stream.write} and {@link Stream.endPacket} in sequence. + */ + public writePacket(data: Buffer): void { + this._provider.write(Buffer.concat([this._stream.encode(data), Buffer.from([ControlCharacters.EndOfPacket])])); + } + + /** + * Trigger a read reset. + */ + public readReset(): void { + this._provider.write(Buffer.from([ControlCharacters.ReadReset])); + } + + /** + * Close the connection + */ + public close(): void { + this._provider.close(); + } +} diff --git a/src/stream/providers/IOProvider.ts b/src/stream/providers/IOProvider.ts new file mode 100644 index 0000000..9e37e81 --- /dev/null +++ b/src/stream/providers/IOProvider.ts @@ -0,0 +1,41 @@ +/* + * node-subdata-2 - SubData 2 wrapper for Node.js + * Copyright (C) 2022 LogN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +import type SafeEventEmitter from "../../lib/SafeEventEmitter"; + +/** The list of possible events that an {@link IOProvider} can emit */ +export enum IOProviderEvents { + Data = "data", + Error = "error", + Close = "close" +} + +/** The list of and arguments for all events that an {@link IOProvider} can emit */ +export type IOProviderEventArguments = { + [IOProviderEvents.Data]: [Buffer]; + [IOProviderEvents.Error]: [Error]; + [IOProviderEvents.Close]: []; +}; + +/** Abstract interface that represents something that performs I/O, likely for a {@link Stream} */ +export default interface IOProvider extends SafeEventEmitter { + /** Write new data to the underlying connection */ + write(data: Buffer): void; + /** Close the underlying connection */ + close(): void; +} diff --git a/src/stream/providers/ManualIOProvider.ts b/src/stream/providers/ManualIOProvider.ts new file mode 100644 index 0000000..2dde00a --- /dev/null +++ b/src/stream/providers/ManualIOProvider.ts @@ -0,0 +1,62 @@ +/* + * node-subdata-2 - SubData 2 wrapper for Node.js + * Copyright (C) 2022 LogN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +import SafeEventEmitter from "../../lib/SafeEventEmitter"; +import type IOProvider from "./IOProvider"; +import type { IOProviderEventArguments } from "./IOProvider"; +import { IOProviderEvents } from "./IOProvider"; + +/** + * An {@link IOProvider} that has manual writing functions, used in testing. + * + * **You probably do not want to use this.** This should only be used in unit tests for SD2. We will not + * support use-cases that involve this provider. + */ +export default class ManualIOProvider extends SafeEventEmitter implements IOProvider { + /** Functions for reading/writing */ + public manual: SafeEventEmitter<{ data: [Buffer]; close: [] }> & { + write: (data: Buffer) => void; + close: () => void; + error: (error: Error) => void; + }; + + public constructor() { + super(); + this.manual = Object.assign(new SafeEventEmitter(), { + write: (data: Buffer): void => { + this.emit(IOProviderEvents.Data, data); + }, + close: (): void => { + this.emit(IOProviderEvents.Close); + }, + error: (error: Error): void => { + this.emit(IOProviderEvents.Error, error); + } + }); + } + + /** Write new data to this IOProvider */ + public write(data: Buffer): void { + this.manual.emit("data", data); + } + + /** Close this IOProvider */ + public close(): void { + this.manual.emit("close"); + } +} diff --git a/src/stream/providers/TCPIOProvider.ts b/src/stream/providers/TCPIOProvider.ts new file mode 100644 index 0000000..d36c6cb --- /dev/null +++ b/src/stream/providers/TCPIOProvider.ts @@ -0,0 +1,59 @@ +/* + * node-subdata-2 - SubData 2 wrapper for Node.js + * Copyright (C) 2022 LogN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +import type { NetConnectOpts } from "node:net"; +import { type Socket, createConnection } from "node:net"; + +import SafeEventEmitter from "../../lib/SafeEventEmitter"; +import type IOProvider from "./IOProvider"; +import type { IOProviderEventArguments } from "./IOProvider"; +import { IOProviderEvents } from "./IOProvider"; + +/** + * A simple {@link IOProvider} for TCP connections + */ +export default class TCPIOProvider extends SafeEventEmitter implements IOProvider { + /** The underlying TCP connection */ + private _socket: Socket; + + public constructor(options: NetConnectOpts) { + super(); + this._socket = createConnection(options); + this._socket.on("data", this._handleData).on("close", this._handleClose); + } + + /** Write new data to this IOProvider */ + public write(data: Buffer): void { + this._socket.write(data); + } + + /** Handle new data from the underlying connection */ + private _handleData(data: Buffer): void { + this.emit(IOProviderEvents.Data, data); + } + + /** Handle the underlying connection being closed */ + private _handleClose(): void { + this.emit(IOProviderEvents.Close); + } + + /** Close the underlying connection */ + public close(): void { + this._socket.end(); + } +} diff --git a/src/tests/stream/index.test.ts b/src/tests/stream/index.test.ts new file mode 100644 index 0000000..4fff3e9 --- /dev/null +++ b/src/tests/stream/index.test.ts @@ -0,0 +1,219 @@ +/* + * node-subdata-2 - SubData 2 wrapper for Node.js + * Copyright (C) 2022 LogN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +import Stream, { StreamEvents } from "../../stream"; +import { ControlCharacters } from "../../stream/controlCharacters"; +import ManualIOProvider from "../../stream/providers/ManualIOProvider"; + +/** Makes a new {@link Stream} along with mock functions and a {@link ManualIOProvider} */ +function makeNewStream(): { + provider: ManualIOProvider; + manual: ManualIOProvider["manual"]; + stream: Stream; + onRemoteReceiveData: jest.Mock; + onRemoteReceiveClose: jest.Mock; + onRead: jest.Mock; + onPacket: jest.Mock; + onReset: jest.Mock; + onClose: jest.Mock; +} { + const provider = new ManualIOProvider(); + const { manual } = provider; + const stream = new Stream(provider); + + const onRemoteReceiveData = jest.fn(); + const onRemoteReceiveClose = jest.fn(); + const onRead = jest.fn(); + const onPacket = jest.fn(); + const onReset = jest.fn(); + const onClose = jest.fn(); + + manual.on("data", onRemoteReceiveData); + manual.on("close", onRemoteReceiveClose); + stream.on(StreamEvents.Read, onRead); + stream.on(StreamEvents.Packet, onPacket); + stream.on(StreamEvents.Reset, onReset); + stream.on(StreamEvents.Close, onClose); + + return { + provider, + manual, + stream, + onRemoteReceiveData, + onRemoteReceiveClose, + onRead, + onPacket, + onReset, + onClose + }; +} + +function charCode(char: string): number { + return char.charCodeAt(0); +} + +describe("Stream", () => { + it("properly encodes when writing", () => { + const { stream, onRemoteReceiveData } = makeNewStream(); + + stream.write(Buffer.from("Hello, World!")); + + expect(onRemoteReceiveData).toBeCalledTimes(1); + expect(onRemoteReceiveData).toBeCalledWith( + Buffer.from([ + ControlCharacters.ReadBytes, + 2, // 4(2+1) = 12 + charCode("H"), + charCode("e"), + charCode("l"), + charCode("l"), + charCode("o"), + charCode(","), + charCode(" "), + charCode("W"), // 8th byte + charCode("o"), + charCode("r"), + charCode("l"), + charCode("d"), // 12th byte + ControlCharacters.ReadByte, + charCode("!") + ]) + ); + }); + + it("properly decodes when receiving", () => { + const { manual, onPacket } = makeNewStream(); + + manual.write( + Buffer.from([ + ControlCharacters.ReadBytes, + 2, // 4(2+1) = 12 + charCode("H"), + charCode("e"), + charCode("l"), + charCode("l"), + charCode("o"), + charCode(","), + charCode(" "), + charCode("W"), // 8th byte + charCode("o"), + charCode("r"), + charCode("l"), + charCode("d"), // 12th byte + ControlCharacters.ReadByte, + charCode("!"), + ControlCharacters.EndOfPacket + ]) + ); + + expect(onPacket).toBeCalledTimes(1); + expect(onPacket).toBeCalledWith(13, Buffer.from("Hello, World!")); + }); + + it("emits Reset when a remote read reset is encountered and does discard the packet so far", () => { + const { manual, onPacket, onRead, onReset } = makeNewStream(); + + manual.write( + Buffer.from([ + ControlCharacters.ReadBytes, + 2, // 4(2+1) = 12 + charCode("H"), + charCode("e"), + charCode("l"), + charCode("l"), + charCode("o"), + charCode(","), + charCode(" "), + charCode("W"), // 8th byte + charCode("o"), + charCode("r"), + charCode("l"), + charCode("d"), // 12th byte + ControlCharacters.ReadByte, + charCode("!") + ]) + ); + + expect(onPacket).toBeCalledTimes(0); + expect(onRead).toBeCalledTimes(2); + expect(onRead).toHaveBeenNthCalledWith(1, Buffer.from("Hello, World")); + expect(onRead).toHaveBeenNthCalledWith(2, Buffer.from("!")); + + manual.write(Buffer.from([ControlCharacters.ReadReset])); + + expect(onReset).toBeCalledTimes(1); + expect(onPacket).toBeCalledTimes(0); + expect(onRead).toBeCalledTimes(2); + + manual.write(Buffer.from([ControlCharacters.EndOfPacket])); + + expect(onReset).toBeCalledTimes(1); + expect(onPacket).toBeCalledTimes(1); + expect(onPacket).toBeCalledWith(0, Buffer.alloc(0)); + expect(onRead).toBeCalledTimes(2); + }); + + it("can terminate packets", () => { + const { stream, onRemoteReceiveData } = makeNewStream(); + + stream.write(Buffer.from("A")); + stream.endPacket(); + + expect(onRemoteReceiveData).toBeCalledTimes(2); + expect(onRemoteReceiveData).toHaveBeenNthCalledWith(1, Buffer.from([ControlCharacters.ReadByte, charCode("A")])); + expect(onRemoteReceiveData).toHaveBeenNthCalledWith(2, Buffer.from([ControlCharacters.EndOfPacket])); + }); + + it("writePacket will also do the same thing", () => { + const { stream, onRemoteReceiveData } = makeNewStream(); + + stream.writePacket(Buffer.from("A")); + + expect(onRemoteReceiveData).toBeCalledTimes(1); + expect(onRemoteReceiveData).toHaveBeenCalledWith(Buffer.from([ControlCharacters.ReadByte, charCode("A"), ControlCharacters.EndOfPacket])); + }); + + it("can transmit read reset", () => { + const { stream, onRemoteReceiveData } = makeNewStream(); + + stream.write(Buffer.from("A")); + stream.readReset(); + + expect(onRemoteReceiveData).toBeCalledTimes(2); + expect(onRemoteReceiveData).toHaveBeenNthCalledWith(1, Buffer.from([ControlCharacters.ReadByte, charCode("A")])); + expect(onRemoteReceiveData).toHaveBeenNthCalledWith(2, Buffer.from([ControlCharacters.ReadReset])); + }); + + describe("close", () => { + it("received from remote", () => { + const { manual, onClose } = makeNewStream(); + + manual.close(); + + expect(onClose).toBeCalledTimes(1); + }); + + it("sent from local", () => { + const { stream, onRemoteReceiveClose } = makeNewStream(); + + stream.close(); + + expect(onRemoteReceiveClose).toBeCalledTimes(1); + }); + }); +}); diff --git a/src/tests/stream/providers/ManualIOProvider.test.ts b/src/tests/stream/providers/ManualIOProvider.test.ts new file mode 100644 index 0000000..2bc8f90 --- /dev/null +++ b/src/tests/stream/providers/ManualIOProvider.test.ts @@ -0,0 +1,90 @@ +/* + * node-subdata-2 - SubData 2 wrapper for Node.js + * Copyright (C) 2022 LogN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +import { IOProviderEvents } from "../../../stream/providers/IOProvider"; +import ManualIOProvider from "../../../stream/providers/ManualIOProvider"; + +/** Makes a new {@link ManualIOProvider} and attaches listeners */ +function makeNewMIP(): { + provider: ManualIOProvider; + manual: ManualIOProvider["manual"]; + onDataReceived: jest.Mock; + onCloseFromProvider: jest.Mock; + onData: jest.Mock; + onError: jest.Mock; + onClose: jest.Mock; +} { + const provider = new ManualIOProvider(); + const { manual } = provider; + + const onDataReceived = jest.fn(); + const onCloseFromProvider = jest.fn(); + const onData = jest.fn(); + const onError = jest.fn(); + const onClose = jest.fn(); + + manual.on("data", onDataReceived); + manual.on("close", onCloseFromProvider); + provider.on(IOProviderEvents.Data, onData); + provider.on(IOProviderEvents.Error, onError); + provider.on(IOProviderEvents.Close, onClose); + + return { provider, manual, onDataReceived, onData, onError, onClose, onCloseFromProvider }; +} + +describe("ManualIOProvider", () => { + it("works as expected", () => { + const { provider, manual, onDataReceived, onData } = makeNewMIP(); + + manual.write(Buffer.from("Hello, world!")); + + expect(onData).toHaveBeenCalledTimes(1); + expect(onData).toHaveBeenCalledWith(Buffer.from("Hello, world!")); + + provider.write(Buffer.from("Hello, World! #2")); + + expect(onDataReceived).toHaveBeenCalledTimes(1); + expect(onDataReceived).toHaveBeenCalledWith(Buffer.from("Hello, World! #2")); + expect(onData).toHaveBeenCalledTimes(1); + }); + + it("close from provider side", () => { + const { provider, onCloseFromProvider } = makeNewMIP(); + + provider.close(); + + expect(onCloseFromProvider).toHaveBeenCalledTimes(1); + }); + + it("close from remote side", () => { + const { manual, onClose } = makeNewMIP(); + + manual.close(); + expect(onClose).toHaveBeenCalledTimes(1); + }); + + it("error handled", () => { + const { manual, onError } = makeNewMIP(); + + const error = new Error("Test error"); + manual.error(error); + + expect(onError).toHaveBeenCalledTimes(1); + expect(onError).toHaveBeenCalledWith(error); + }); +});