Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add first io provider and stream wrapper #3

Merged
merged 3 commits into from
Feb 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"cSpell.words": ["TCPIO"]
}
22 changes: 21 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,27 @@

import SafeEventEmitter from "./lib/SafeEventEmitter.js";
import { bytes, kb, mb } from "./lib/sizeHelpers.js";
import Stream from "./stream/";
import { ControlCharacters, SizedControlCharacters } from "./stream/controlCharacters.js";
import DirectStream, { DirectStreamEventArguments, DirectStreamEvents } from "./stream/DirectStream.js";
import IOProvider, { IOProviderEventArguments, IOProviderEvents } from "./stream/providers/IOProvider.js";
import ManualIOProvider from "./stream/providers/ManualIOProvider.js";
import TCPIOProvider from "./stream/providers/TCPIOProvider.js";

export { bytes, ControlCharacters, DirectStream, DirectStreamEventArguments, DirectStreamEvents, kb, mb, SafeEventEmitter, SizedControlCharacters };
export {
bytes,
ControlCharacters,
DirectStream,
DirectStreamEventArguments,
DirectStreamEvents,
IOProvider,
IOProviderEventArguments,
IOProviderEvents,
kb,
ManualIOProvider,
mb,
SafeEventEmitter,
SizedControlCharacters,
Stream,
TCPIOProvider
};
13 changes: 7 additions & 6 deletions src/stream/DirectStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export type DirectStreamEventArguments = {
* Fires with three arguments, the size read in ControlCharacters, the number of that size read, and the data read.
* ReadByte will return Number, the rest will return Buffer.
*/
[DirectStreamEvents.Read]: [ControlCharacters.ReadByte, 1, number] | [Exclude<ControlCharacters, ControlCharacters.ReadByte>, number, Buffer];
[DirectStreamEvents.Read]: [ControlCharacters.ReadByte, 1, number] | [SizedControlCharacters, number, Buffer];
/** Emitted when an End of Packet is received along with (size, data) parameters. */
[DirectStreamEvents.Packet]: [number, Buffer];
};
Expand Down Expand Up @@ -177,12 +177,13 @@ export default class DirectStream extends SafeEventEmitter<DirectStreamEventArgu
* @param data The actual data that was read
*/
private _emitRead(info: { controlCharacter: ControlCharacters.ReadByte; bytes: 1; numberOfType: 1 }, data: number): void;
private _emitRead(info: { controlCharacter: SizedControlCharacters; bytes: number; numberOfType: number }, data: Buffer): void;
private _emitRead(
info: { controlCharacter: Exclude<ControlCharacters, ControlCharacters.ReadByte>; bytes: number; numberOfType: number },
data: Buffer
): void;
private _emitRead(
{ controlCharacter, bytes, numberOfType }: { controlCharacter: ControlCharacters; bytes: number; numberOfType: number },
{
controlCharacter,
bytes,
numberOfType
}: { controlCharacter: SizedControlCharacters | ControlCharacters.ReadByte; bytes: number; numberOfType: number },
data: Buffer | number
): void {
this._packet = Buffer.concat([this._packet, data instanceof Buffer ? data : Buffer.from([data])]);
Expand Down
99 changes: 99 additions & 0 deletions src/stream/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* node-subdata-2 - SubData 2 wrapper for Node.js
* Copyright (C) 2022 LogN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import SafeEventEmitter from "../lib/SafeEventEmitter";
import { ControlCharacters } from "./controlCharacters";
import DirectStream, { DirectStreamEvents } from "./DirectStream";
import type IOProvider from "./providers/IOProvider";
import { IOProviderEvents } from "./providers/IOProvider";

export enum StreamEvents {
Read = "read",
Packet = "packet",
Reset = "reset",
Close = "close"
}

export type StreamEventArguments = {
[StreamEvents.Read]: [Buffer];
[StreamEvents.Packet]: [number, Buffer];
[StreamEvents.Reset]: [];
[StreamEvents.Close]: [];
};

export default class Stream extends SafeEventEmitter<StreamEventArguments> {
private _provider: IOProvider;
private _stream: DirectStream;

/** @param provider The underlying {@link IOProvider} to use */
public constructor(provider: IOProvider) {
super();
this._provider = provider;
this._stream = new DirectStream();
this._provider.on(IOProviderEvents.Data, (data) => {
this._stream.feed(data);
});
this._stream.on(DirectStreamEvents.ReadReset, () => {
this.emit(StreamEvents.Reset);
});
this._stream.on(DirectStreamEvents.Read, (_size, _count, data) => {
this.emit(StreamEvents.Read, data instanceof Buffer ? data : Buffer.from([data]));
});
this._stream.on(DirectStreamEvents.Packet, (size, data) => {
this.emit(StreamEvents.Packet, size, data);
});
this._provider.on(IOProviderEvents.Close, () => {
this.emit(StreamEvents.Close);
});
}

/**
* Write and encode data to send to the provider
*/
public write(data: Buffer): void {
this._provider.write(this._stream.encode(data));
}

/**
* Terminates the current packet.
*/
public endPacket(): void {
this._provider.write(Buffer.from([ControlCharacters.EndOfPacket]));
}

/**
* Write data and end the current packet. This is equivalent to calling {@link Stream.write} and {@link Stream.endPacket} in sequence.
*/
public writePacket(data: Buffer): void {
this._provider.write(Buffer.concat([this._stream.encode(data), Buffer.from([ControlCharacters.EndOfPacket])]));
}

/**
* Trigger a read reset.
*/
public readReset(): void {
this._provider.write(Buffer.from([ControlCharacters.ReadReset]));
}

/**
* Close the connection
*/
public close(): void {
this._provider.close();
}
}
41 changes: 41 additions & 0 deletions src/stream/providers/IOProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* node-subdata-2 - SubData 2 wrapper for Node.js
* Copyright (C) 2022 LogN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import type SafeEventEmitter from "../../lib/SafeEventEmitter";

/** The list of possible events that an {@link IOProvider} can emit */
export enum IOProviderEvents {
Data = "data",
Error = "error",
Close = "close"
}

/** The list of and arguments for all events that an {@link IOProvider} can emit */
export type IOProviderEventArguments = {
[IOProviderEvents.Data]: [Buffer];
[IOProviderEvents.Error]: [Error];
[IOProviderEvents.Close]: [];
};

/** Abstract interface that represents something that performs I/O, likely for a {@link Stream} */
export default interface IOProvider extends SafeEventEmitter<IOProviderEventArguments> {
/** Write new data to the underlying connection */
write(data: Buffer): void;
/** Close the underlying connection */
close(): void;
}
62 changes: 62 additions & 0 deletions src/stream/providers/ManualIOProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* node-subdata-2 - SubData 2 wrapper for Node.js
* Copyright (C) 2022 LogN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import SafeEventEmitter from "../../lib/SafeEventEmitter";
import type IOProvider from "./IOProvider";
import type { IOProviderEventArguments } from "./IOProvider";
import { IOProviderEvents } from "./IOProvider";

/**
* An {@link IOProvider} that has manual writing functions, used in testing.
*
* **You probably do not want to use this.** This should only be used in unit tests for SD2. We will not
* support use-cases that involve this provider.
*/
export default class ManualIOProvider extends SafeEventEmitter<IOProviderEventArguments> implements IOProvider {
/** Functions for reading/writing */
public manual: SafeEventEmitter<{ data: [Buffer]; close: [] }> & {
write: (data: Buffer) => void;
close: () => void;
error: (error: Error) => void;
};

public constructor() {
super();
this.manual = Object.assign(new SafeEventEmitter(), {
write: (data: Buffer): void => {
this.emit(IOProviderEvents.Data, data);
},
close: (): void => {
this.emit(IOProviderEvents.Close);
},
error: (error: Error): void => {
this.emit(IOProviderEvents.Error, error);
}
});
}

/** Write new data to this IOProvider */
public write(data: Buffer): void {
this.manual.emit("data", data);
}

/** Close this IOProvider */
public close(): void {
this.manual.emit("close");
}
}
59 changes: 59 additions & 0 deletions src/stream/providers/TCPIOProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* node-subdata-2 - SubData 2 wrapper for Node.js
* Copyright (C) 2022 LogN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import type { NetConnectOpts } from "node:net";
import { type Socket, createConnection } from "node:net";

import SafeEventEmitter from "../../lib/SafeEventEmitter";
import type IOProvider from "./IOProvider";
import type { IOProviderEventArguments } from "./IOProvider";
import { IOProviderEvents } from "./IOProvider";

/**
* A simple {@link IOProvider} for TCP connections
*/
export default class TCPIOProvider extends SafeEventEmitter<IOProviderEventArguments> implements IOProvider {
/** The underlying TCP connection */
private _socket: Socket;

public constructor(options: NetConnectOpts) {
super();
this._socket = createConnection(options);
this._socket.on("data", this._handleData).on("close", this._handleClose);
}

/** Write new data to this IOProvider */
public write(data: Buffer): void {
this._socket.write(data);
}

/** Handle new data from the underlying connection */
private _handleData(data: Buffer): void {
this.emit(IOProviderEvents.Data, data);
}

/** Handle the underlying connection being closed */
private _handleClose(): void {
this.emit(IOProviderEvents.Close);
}

/** Close the underlying connection */
public close(): void {
this._socket.end();
}
}
Loading