Skip to content

Commit

Permalink
Merge pull request #3 from thetayloredman/feat/io-providers
Browse files Browse the repository at this point in the history
feat: add first io provider and stream wrapper
  • Loading branch information
thetayloredman committed Feb 19, 2023
2 parents 79a405c + c0cbe10 commit b9a85e1
Show file tree
Hide file tree
Showing 9 changed files with 601 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"cSpell.words": ["TCPIO"]
}
22 changes: 21 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,27 @@

import SafeEventEmitter from "./lib/SafeEventEmitter.js";
import { bytes, kb, mb } from "./lib/sizeHelpers.js";
import Stream from "./stream/";
import { ControlCharacters, SizedControlCharacters } from "./stream/controlCharacters.js";
import DirectStream, { DirectStreamEventArguments, DirectStreamEvents } from "./stream/DirectStream.js";
import IOProvider, { IOProviderEventArguments, IOProviderEvents } from "./stream/providers/IOProvider.js";
import ManualIOProvider from "./stream/providers/ManualIOProvider.js";
import TCPIOProvider from "./stream/providers/TCPIOProvider.js";

export { bytes, ControlCharacters, DirectStream, DirectStreamEventArguments, DirectStreamEvents, kb, mb, SafeEventEmitter, SizedControlCharacters };
export {
bytes,
ControlCharacters,
DirectStream,
DirectStreamEventArguments,
DirectStreamEvents,
IOProvider,
IOProviderEventArguments,
IOProviderEvents,
kb,
ManualIOProvider,
mb,
SafeEventEmitter,
SizedControlCharacters,
Stream,
TCPIOProvider
};
13 changes: 7 additions & 6 deletions src/stream/DirectStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export type DirectStreamEventArguments = {
* Fires with three arguments, the size read in ControlCharacters, the number of that size read, and the data read.
* ReadByte will return Number, the rest will return Buffer.
*/
[DirectStreamEvents.Read]: [ControlCharacters.ReadByte, 1, number] | [Exclude<ControlCharacters, ControlCharacters.ReadByte>, number, Buffer];
[DirectStreamEvents.Read]: [ControlCharacters.ReadByte, 1, number] | [SizedControlCharacters, number, Buffer];
/** Emitted when an End of Packet is received along with (size, data) parameters. */
[DirectStreamEvents.Packet]: [number, Buffer];
};
Expand Down Expand Up @@ -177,12 +177,13 @@ export default class DirectStream extends SafeEventEmitter<DirectStreamEventArgu
* @param data The actual data that was read
*/
private _emitRead(info: { controlCharacter: ControlCharacters.ReadByte; bytes: 1; numberOfType: 1 }, data: number): void;
private _emitRead(info: { controlCharacter: SizedControlCharacters; bytes: number; numberOfType: number }, data: Buffer): void;
private _emitRead(
info: { controlCharacter: Exclude<ControlCharacters, ControlCharacters.ReadByte>; bytes: number; numberOfType: number },
data: Buffer
): void;
private _emitRead(
{ controlCharacter, bytes, numberOfType }: { controlCharacter: ControlCharacters; bytes: number; numberOfType: number },
{
controlCharacter,
bytes,
numberOfType
}: { controlCharacter: SizedControlCharacters | ControlCharacters.ReadByte; bytes: number; numberOfType: number },
data: Buffer | number
): void {
this._packet = Buffer.concat([this._packet, data instanceof Buffer ? data : Buffer.from([data])]);
Expand Down
99 changes: 99 additions & 0 deletions src/stream/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* node-subdata-2 - SubData 2 wrapper for Node.js
* Copyright (C) 2022 LogN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import SafeEventEmitter from "../lib/SafeEventEmitter";
import { ControlCharacters } from "./controlCharacters";
import DirectStream, { DirectStreamEvents } from "./DirectStream";
import type IOProvider from "./providers/IOProvider";
import { IOProviderEvents } from "./providers/IOProvider";

export enum StreamEvents {
Read = "read",
Packet = "packet",
Reset = "reset",
Close = "close"
}

export type StreamEventArguments = {
[StreamEvents.Read]: [Buffer];
[StreamEvents.Packet]: [number, Buffer];
[StreamEvents.Reset]: [];
[StreamEvents.Close]: [];
};

export default class Stream extends SafeEventEmitter<StreamEventArguments> {
private _provider: IOProvider;
private _stream: DirectStream;

/** @param provider The underlying {@link IOProvider} to use */
public constructor(provider: IOProvider) {
super();
this._provider = provider;
this._stream = new DirectStream();
this._provider.on(IOProviderEvents.Data, (data) => {
this._stream.feed(data);
});
this._stream.on(DirectStreamEvents.ReadReset, () => {
this.emit(StreamEvents.Reset);
});
this._stream.on(DirectStreamEvents.Read, (_size, _count, data) => {
this.emit(StreamEvents.Read, data instanceof Buffer ? data : Buffer.from([data]));
});
this._stream.on(DirectStreamEvents.Packet, (size, data) => {
this.emit(StreamEvents.Packet, size, data);
});
this._provider.on(IOProviderEvents.Close, () => {
this.emit(StreamEvents.Close);
});
}

/**
* Write and encode data to send to the provider
*/
public write(data: Buffer): void {
this._provider.write(this._stream.encode(data));
}

/**
* Terminates the current packet.
*/
public endPacket(): void {
this._provider.write(Buffer.from([ControlCharacters.EndOfPacket]));
}

/**
* Write data and end the current packet. This is equivalent to calling {@link Stream.write} and {@link Stream.endPacket} in sequence.
*/
public writePacket(data: Buffer): void {
this._provider.write(Buffer.concat([this._stream.encode(data), Buffer.from([ControlCharacters.EndOfPacket])]));
}

/**
* Trigger a read reset.
*/
public readReset(): void {
this._provider.write(Buffer.from([ControlCharacters.ReadReset]));
}

/**
* Close the connection
*/
public close(): void {
this._provider.close();
}
}
41 changes: 41 additions & 0 deletions src/stream/providers/IOProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* node-subdata-2 - SubData 2 wrapper for Node.js
* Copyright (C) 2022 LogN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import type SafeEventEmitter from "../../lib/SafeEventEmitter";

/** The list of possible events that an {@link IOProvider} can emit */
export enum IOProviderEvents {
Data = "data",
Error = "error",
Close = "close"
}

/** The list of and arguments for all events that an {@link IOProvider} can emit */
export type IOProviderEventArguments = {
[IOProviderEvents.Data]: [Buffer];
[IOProviderEvents.Error]: [Error];
[IOProviderEvents.Close]: [];
};

/** Abstract interface that represents something that performs I/O, likely for a {@link Stream} */
export default interface IOProvider extends SafeEventEmitter<IOProviderEventArguments> {
/** Write new data to the underlying connection */
write(data: Buffer): void;
/** Close the underlying connection */
close(): void;
}
62 changes: 62 additions & 0 deletions src/stream/providers/ManualIOProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* node-subdata-2 - SubData 2 wrapper for Node.js
* Copyright (C) 2022 LogN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import SafeEventEmitter from "../../lib/SafeEventEmitter";
import type IOProvider from "./IOProvider";
import type { IOProviderEventArguments } from "./IOProvider";
import { IOProviderEvents } from "./IOProvider";

/**
* An {@link IOProvider} that has manual writing functions, used in testing.
*
* **You probably do not want to use this.** This should only be used in unit tests for SD2. We will not
* support use-cases that involve this provider.
*/
export default class ManualIOProvider extends SafeEventEmitter<IOProviderEventArguments> implements IOProvider {
/** Functions for reading/writing */
public manual: SafeEventEmitter<{ data: [Buffer]; close: [] }> & {
write: (data: Buffer) => void;
close: () => void;
error: (error: Error) => void;
};

public constructor() {
super();
this.manual = Object.assign(new SafeEventEmitter(), {
write: (data: Buffer): void => {
this.emit(IOProviderEvents.Data, data);
},
close: (): void => {
this.emit(IOProviderEvents.Close);
},
error: (error: Error): void => {
this.emit(IOProviderEvents.Error, error);
}
});
}

/** Write new data to this IOProvider */
public write(data: Buffer): void {
this.manual.emit("data", data);
}

/** Close this IOProvider */
public close(): void {
this.manual.emit("close");
}
}
59 changes: 59 additions & 0 deletions src/stream/providers/TCPIOProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* node-subdata-2 - SubData 2 wrapper for Node.js
* Copyright (C) 2022 LogN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import type { NetConnectOpts } from "node:net";
import { type Socket, createConnection } from "node:net";

import SafeEventEmitter from "../../lib/SafeEventEmitter";
import type IOProvider from "./IOProvider";
import type { IOProviderEventArguments } from "./IOProvider";
import { IOProviderEvents } from "./IOProvider";

/**
* A simple {@link IOProvider} for TCP connections
*/
export default class TCPIOProvider extends SafeEventEmitter<IOProviderEventArguments> implements IOProvider {
/** The underlying TCP connection */
private _socket: Socket;

public constructor(options: NetConnectOpts) {
super();
this._socket = createConnection(options);
this._socket.on("data", this._handleData).on("close", this._handleClose);
}

/** Write new data to this IOProvider */
public write(data: Buffer): void {
this._socket.write(data);
}

/** Handle new data from the underlying connection */
private _handleData(data: Buffer): void {
this.emit(IOProviderEvents.Data, data);
}

/** Handle the underlying connection being closed */
private _handleClose(): void {
this.emit(IOProviderEvents.Close);
}

/** Close the underlying connection */
public close(): void {
this._socket.end();
}
}
Loading

0 comments on commit b9a85e1

Please sign in to comment.