Skip to content

Commit

Permalink
refactor!: deprecate DirectStream, prefer Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
thetayloredman committed Mar 6, 2023
1 parent dace779 commit f4161e8
Show file tree
Hide file tree
Showing 6 changed files with 677 additions and 704 deletions.
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"cSpell.words": ["bufncmp", "TCPIO"]
"cSpell.words": ["bufncmp", "subdata", "TCPIO"]
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import DirectStream, { DirectStreamEventArguments, DirectStreamEvents } from "./
export {
bytes,
ControlCharacters,
// eslint-disable-next-line deprecation/deprecation
DirectStream,
DirectStreamEventArguments,
DirectStreamEvents,
Expand Down
286 changes: 25 additions & 261 deletions src/stream/DirectStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import { PassThrough } from "node:stream";

import debug from "debug";
import { Emitter } from "strict-event-emitter";

import { bytes, kb, mb } from "../lib/sizeHelpers";
import { type SizedControlCharacters, ControlCharacters } from "./controlCharacters";
import Stream, { StreamEvents } from ".";
import type { ControlCharacters } from "./controlCharacters";
import { type SizedControlCharacters } from "./controlCharacters";

const log = debug("node-subdata-2:stream:DirectStream");

Expand Down Expand Up @@ -55,283 +58,44 @@ export type DirectStreamEventArguments = {
*
* This stream does not connect with an actual socket, it is just a wrapper around a buffer. You probably
* want to use {@link Stream} for this purpose.
*
* @deprecated This class is deprecated and will be removed in a future version. The implementation from this class
* has been moved to {@link Stream}.
*/
// TODO: Remove this class in a future release
export default class DirectStream extends Emitter<DirectStreamEventArguments> {
/** The buffer of the stream */
private _buffer: Buffer;
/** The size of the buffer */
private _bufferSize: number;
/** The buffer of the current read packet */
private _packet: Buffer;
/** The size of the current read packet */
private _packetSize: number;
private _stream: Stream;
private _passThrough: PassThrough;

public constructor() {
super();
log("initializing");
this._buffer = Buffer.alloc(0);
this._bufferSize = 0;
this._packet = Buffer.alloc(0);
this._packetSize = 0;
log("initializing deprecated pass-through class");
this._passThrough = new PassThrough();
this._stream = new Stream(this._passThrough);
this._stream.on(StreamEvents.Read, (data) => {
this.emit(DirectStreamEvents.Read, data.length, Math.floor(data.length / 4), data);
});
this._stream.on(StreamEvents.Packet, (data) => {
this.emit(DirectStreamEvents.Packet, data);
});
this._stream.on(StreamEvents.Reset, () => {
this.emit(DirectStreamEvents.ReadReset);
});
}

/**
* Feed new data to this Stream.
* @param data The data to feed
*/
public feed(data: Buffer): void {
log("feeding data", data, "size", data.length);
this._buffer = Buffer.concat([this._buffer, data]);
this._bufferSize += data.length;
log("new data", this._buffer, "size", this._bufferSize);
this._handleNewData();
}

/**
* Invalidates all previous data. Called internally when a {@link ControlCharacters.ReadReset} is encountered.
*/
private _readReset(): void {
log("triggering read reset");
this._buffer = Buffer.alloc(0);
this._bufferSize = 0;
this._packet = Buffer.alloc(0);
this._packetSize = 0;
this.emit(DirectStreamEvents.ReadReset);
}

/**
* Command the stream to attempt to process new data. Called internally after {@link DirectStream.feed} is called.
*/
private _handleNewData(): void {
if (this._bufferSize === 0) return;
const char = this._buffer[0];

if (!(char.toString() in ControlCharacters))
throw new Error(`I don't know what I am looking at! Encountered unknown control character 0x${this._buffer[0].toString(16)} in stream.`);

log("handling control character", ControlCharacters[char]);

switch (this._buffer[0]) {
case ControlCharacters.ReadReset:
const postReset = this._buffer.subarray(1);
log("triggering read reset, following data", postReset);
this._readReset();
// Re-insert all data that follows the reset
log("re-feeding data");
this.feed(postReset);
break;
case ControlCharacters.ReadByte:
if (this._bufferSize < 2) return;
this._emitRead({ controlCharacter: ControlCharacters.ReadByte, bytes: 1, numberOfType: 1 }, this._buffer[1]);
this._shiftBuffer(2);
break;
case ControlCharacters.ReadBytes:
if (this._bufferSize < 2) return;
const byteSize = (this._buffer[1] + 1) * 4;
if (this._bufferSize < byteSize + 2) return;
this._emitRead(
{ controlCharacter: ControlCharacters.ReadBytes, bytes: byteSize, numberOfType: this._buffer[1] },
this._buffer.subarray(2, byteSize + 2)
);
this._shiftBuffer(byteSize + 2);
break;
case ControlCharacters.ReadKB:
if (this._bufferSize < 2) return;
const kbSize = kb(this._buffer[1] + 1) * 4;
if (this._bufferSize < kbSize + 2) return;
this._emitRead(
{ controlCharacter: ControlCharacters.ReadKB, bytes: kbSize, numberOfType: this._buffer[1] },
this._buffer.subarray(2, kbSize + 2)
);
this._shiftBuffer(kbSize + 2);
break;
case ControlCharacters.ReadMB:
if (this._bufferSize < 2) return;
const mbSize = mb(this._buffer[1] + 1) * 4;
if (this._bufferSize < mbSize + 2) return;
this._emitRead(
{ controlCharacter: ControlCharacters.ReadMB, bytes: mbSize, numberOfType: this._buffer[1] },
this._buffer.subarray(2, mbSize + 2)
);
this._shiftBuffer(mbSize + 2);
break;
case ControlCharacters.ReadGB:
case ControlCharacters.ReadTB:
case ControlCharacters.ReadPB:
// TODO: The SubData Java implementation does not yet support these either. This is a placeholder until we make sizes BigInts.
throw new Error("Internal error: ReadGB, ReadTB, and ReadPB are not yet implemented.");
case ControlCharacters.KeepAlive:
this._shiftBuffer(1);
break;
case ControlCharacters.EndOfPacket:
this._emitPacket();
this._shiftBuffer(1);
break;
/* istanbul ignore next: this should never happen */
default:
throw new Error(
`Internal bug: Unhandled control character 0x${this._buffer[0].toString(16)}. Please report this as an issue on our GitHub.`
);
}

if (this._bufferSize > 0) this._handleNewData();
}

/**
* Called internally to add to the packet buffer and emit the {@link DirectStreamEvents.Read} event.
*
* The object in the first parameter accepts:
* - controlCharacter: The control character that was read
* - bytes: The number of bytes that were read
* - numberOfType: The number of the type (e.g. for readKB(1), this should be 1)
*
* @param info See above
* @param data The actual data that was read
*/
private _emitRead(info: { controlCharacter: ControlCharacters.ReadByte; bytes: 1; numberOfType: 1 }, data: number): void;
private _emitRead(info: { controlCharacter: SizedControlCharacters; bytes: number; numberOfType: number }, data: Buffer): void;
private _emitRead(
{
controlCharacter,
bytes,
numberOfType
}: { controlCharacter: SizedControlCharacters | ControlCharacters.ReadByte; bytes: number; numberOfType: number },
data: Buffer | number
): void {
log("triggering read event of type", ControlCharacters[controlCharacter], "with data", data, "and size", bytes, "bytes");
this._packet = Buffer.concat([this._packet, data instanceof Buffer ? data : Buffer.from([data])]);
this._packetSize += bytes;
/* istanbul ignore if: this should never reasonably happen & is untestable without a LOT of work */
if (this._packetSize >= Number.MAX_SAFE_INTEGER - 1000) {
this._packet = Buffer.alloc(0);
this._packetSize = 0;
throw new Error("Packet size exceeded or got very close to maximum safe integer. This is probably a bug.");
}
if (controlCharacter === ControlCharacters.ReadByte) {
// TODO: Remove this cast. Probably need to use some sorta weird discrimination over the types.
this.emit(DirectStreamEvents.Read, ControlCharacters.ReadByte, 1, data as number);
return;
} else this.emit(DirectStreamEvents.Read, controlCharacter, numberOfType, data as Buffer);
}

/** Called internally to clear and emit a packet. */
private _emitPacket(): void {
debug("triggering packet");
this.emit(DirectStreamEvents.Packet, this._packet);
this._packet = Buffer.alloc(0);
this._packetSize = 0;
}

/**
* Shift the buffer by a certain amount.
* @param count The amount to shift the buffer by
*/
private _shiftBuffer(count: number): void {
log("shifting internal buffer by", count);
this._buffer = this._buffer.subarray(count);
this._bufferSize -= count;
}

/**
* Do some funny math to calculate the return value for {@link DirectStream._bestControlCharacter}.
*
* @param controlCharacter The control character being suggested
* @param size The size in bytes
* @param sizeFn The size function to use (probably one of {@link kb}, {@link mb}, or {@link bytes})
*/
private _calculateControlCharacterRemainder(
controlCharacter: SizedControlCharacters,
size: number,
sizeFn: (s: number) => number
): { controlCharacter: SizedControlCharacters; number: number; remainder: number } {
const result = {
controlCharacter,
number: Math.min(255, Math.floor(size / sizeFn(4)) - 1),
remainder: size - sizeFn(4) * (Math.min(255, Math.floor(size / sizeFn(4)) - 1) + 1)
};
log("calculated control character data for size", size, "bytes and control character", ControlCharacters[controlCharacter], "as", result);
return result;
}

/**
* Determine the best suitable control character for a given size, along with the remainder after
* using that control character.
* @param size The size in bytes
*/
private _bestControlCharacter(
size: number
):
| { controlCharacter: ControlCharacters.ReadByte; number: -1; remainder: number }
| { controlCharacter: SizedControlCharacters; number: number; remainder: number }
| false {
let result:
| { controlCharacter: ControlCharacters.ReadByte; number: -1; remainder: number }
| { controlCharacter: SizedControlCharacters; number: number; remainder: number }
| false;
if (size < 1) result = false;
else if (size < 4) result = { controlCharacter: ControlCharacters.ReadByte, number: -1, remainder: size - 1 };
else if (size < kb(4)) result = this._calculateControlCharacterRemainder(ControlCharacters.ReadBytes, size, bytes);
else if (size < mb(4)) result = this._calculateControlCharacterRemainder(ControlCharacters.ReadKB, size, kb);
else result = this._calculateControlCharacterRemainder(ControlCharacters.ReadMB, size, mb);
log("best control character for size", size, "is", result);
return result;
this._passThrough.write(data);
}

/**
* Encode a Buffer into Layer 1 stream data.
* @param data The data to encode
*/
public encode(input: Buffer): Buffer {
log("encoding data", input);
const size = input.length;
let remaining = size;
// TODO: Change this when GB/TB/PB support is added.

// The algorithm here is as follows:
// We repeatedly call to _bestControlCharacter and start populating an array with
// the results of each along with the data it contains, then we flatten the array
// and convert it to a Buffer.
const results = [];
while (remaining > 0) {
log("remaining size", remaining);
const data = this._bestControlCharacter(remaining);
/* istanbul ignore next: this should never happen */
if (data === false) throw new Error("This should never happen");
const { controlCharacter, number, remainder } = data;
if (controlCharacter === ControlCharacters.ReadByte) {
results.push([controlCharacter, input.subarray(size - remaining, size - remaining + 1)]);
remaining -= 1;
continue;
}

/* istanbul ignore else */
if ([ControlCharacters.ReadBytes, ControlCharacters.ReadKB, ControlCharacters.ReadMB].includes(controlCharacter)) {
let byteSize;
switch (controlCharacter) {
case ControlCharacters.ReadBytes:
byteSize = (number + 1) * 4;
break;
case ControlCharacters.ReadKB:
byteSize = kb(number + 1) * 4;
break;
case ControlCharacters.ReadMB:
byteSize = mb(number + 1) * 4;
break;
/* istanbul ignore next: this should never happen */
default:
throw new Error("This should never happen");
}

results.push([controlCharacter, number, input.subarray(size - remaining, size - remaining + byteSize)]);
remaining -= byteSize;
continue;
}

/* istanbul ignore next */
throw new Error(`Unrecognized control character returned by _bestControlCharacter for size ${remainder}`);
}
const result = Buffer.from(results.flat().flatMap((x) => (x instanceof Buffer ? Array.from(x.values()) : x)));
log("final result", result);
return result;
return this._stream.encode(input);
}
}
Loading

0 comments on commit f4161e8

Please sign in to comment.