diff --git a/lib/internal/fs/promises.js b/lib/internal/fs/promises.js index b30af8fdf5514a..bc151d739b9c7c 100644 --- a/lib/internal/fs/promises.js +++ b/lib/internal/fs/promises.js @@ -790,6 +790,8 @@ module.exports = { appendFile, readFile, watch, + + kHandle, }, FileHandle, diff --git a/lib/internal/webstreams/adapters.js b/lib/internal/webstreams/adapters.js new file mode 100644 index 00000000000000..870804f0c4a1f8 --- /dev/null +++ b/lib/internal/webstreams/adapters.js @@ -0,0 +1,871 @@ +'use strict'; + +const { + ArrayPrototypeMap, + PromiseAll, + PromisePrototypeThen, + PromisePrototypeFinally, + PromiseResolve, + Uint8Array, +} = primordials; + +const { + ReadableStream, + isReadableStream, +} = require('internal/webstreams/readablestream'); + +const { + WritableStream, + isWritableStream, +} = require('internal/webstreams/writablestream'); + +const { + ByteLengthQueuingStrategy, + CountQueuingStrategy, +} = require('internal/webstreams/queuingstrategies'); + +const { + Writable, + Readable, + Duplex, +} = require('stream'); + +const { + Buffer, +} = require('buffer'); + +const { + errnoException, + codes: { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_ARG_VALUE, + ERR_INVALID_STATE, + }, +} = require('internal/errors'); + +const { + createDeferredPromise, +} = require('internal/util'); + +const { + validateAbortSignal, + validateBoolean, + validateObject, +} = require('internal/validators'); + +const { + WriteWrap, + ShutdownWrap, + kReadBytesOrError, + kLastWriteWasAsync, + streamBaseState, +} = internalBinding('stream_wrap'); + +const { UV_EOF } = internalBinding('uv'); + +/** + * @typedef {import('../../stream').Writable} Writable + * @typedef {import('../../stream').Readable} Readable + * @typedef {import('./writablestream').WritableStream} WritableStream + * @typedef {import('./readablestream').ReadableStream} ReadableStream + * + * @typedef {import('../abort_controller').AbortSignal} AbortSignal + */ + +/** + * @param {Writable} streamWritable + * @returns {WritableStream} + */ +function newWritableStreamFromStreamWritable(streamWritable) { + if (typeof streamWritable?._writableState !== 'object') { + throw new ERR_INVALID_ARG_TYPE( + 'streamWritable', + 'stream.Writable', + streamWritable); + } + + const highWaterMark = streamWritable.writableHighWaterMark; + const strategy = + streamWritable.writableObjectMode ? + new CountQueuingStrategy({ highWaterMark }) : + new ByteLengthQueuingStrategy({ highWaterMark }); + + let controller; + let backpressurePromise; + let closed; + let errored = false; + + function onClose() { + streamWritable.off('close', onClose); + streamWritable.off('drain', onDrain); + streamWritable.off('error', onError); + streamWritable.off('finish', onFinish); + if (controller !== undefined) { + controller.error( + new ERR_INVALID_STATE('The stream.Writable closed unexpectedly')); + } + } + + function onDrain() { + if (backpressurePromise !== undefined) + backpressurePromise.resolve(); + } + + function onError(error) { + errored = true; + if (backpressurePromise !== undefined) + backpressurePromise.reject(error); + // If closed is not undefined, the error is happening + // after the WritableStream close has already started. + // We need to reject it here. + if (closed !== undefined) { + closed.reject(error); + closed = undefined; + } + controller.error(error); + controller = undefined; + } + + function onFinish() { + // If the finish event happens and closed is defined, that's + // expected and good. If the finish event happens and closed + // is not set, the the stream.Writable closed unexpectedly + // and we need to let the WritableStream know via the controller. + if (closed !== undefined) { + closed.resolve(); + closed = undefined; + return; + } + controller.error( + new ERR_INVALID_STATE('The stream.Writable closed unexpectedly')); + controller = undefined; + } + + streamWritable.once('close', onClose); + streamWritable.once('error', onError); + streamWritable.once('finish', onFinish); + streamWritable.on('drain', onDrain); + + return new WritableStream({ + start(c) { controller = c; }, + + async write(chunk) { + if (!streamWritable.write(chunk)) { + backpressurePromise = createDeferredPromise(); + return PromisePrototypeFinally( + backpressurePromise.promise, () => { + backpressurePromise = undefined; + }); + } + }, + + abort(reason) { + if (!streamWritable.destroyed && !errored) + streamWritable.destroy(reason); + }, + + close() { + if (closed === undefined && !streamWritable.writableEnded) { + closed = createDeferredPromise(); + streamWritable.end(); + return closed.promise; + } + + controller = undefined; + return PromiseResolve(); + }, + }, strategy); +} + +/** + * @param {WritableStream} writableStream + * @param {{ + * decodeStrings? : boolean, + * highWaterMark? : number, + * objectMode? : boolean, + * signal? : AbortSignal, + * }} [options] + * @returns {Writable} + */ +function newStreamWritableFromWritableStream(writableStream, options = {}) { + if (!isWritableStream(writableStream)) { + throw new ERR_INVALID_ARG_TYPE( + 'writableStream', + 'WritableStream', + writableStream); + } + + validateObject(options, 'options'); + const { + highWaterMark, + decodeStrings = true, + objectMode = false, + signal, + } = options; + + validateBoolean(objectMode, 'options.objectMode'); + validateBoolean(decodeStrings, 'options.decodeStrings'); + if (signal !== undefined) + validateAbortSignal(signal, 'options.signal'); + + const writer = writableStream.getWriter(); + let closed = false; + + const writable = new Writable({ + highWaterMark: highWaterMark || writer.desiredSize, + objectMode, + decodeStrings, + signal, + + writev(chunks, callback) { + if (closed) { + callback(new ERR_INVALID_STATE('The WritableStream is closed')); + return; + } + + function done(error) { + try { + callback(error); + } catch (error) { + process.nextTick(() => writable.destroy(error)); + } + } + + PromisePrototypeThen( + writer.ready, + () => { + return PromisePrototypeThen( + PromiseAll( + ArrayPrototypeMap( + chunks, + (chunk) => writer.write(chunk))), + done, + done); + }, + done); + }, + + write(chunk, encoding, callback) { + if (closed) { + callback(new ERR_INVALID_STATE('The WritableStream is closed')); + return; + } + if (typeof chunk === 'string' && decodeStrings && !objectMode) + chunk = new Uint8Array(Buffer.from(chunk, encoding)); + + function done(error) { + try { + callback(error); + } catch (error) { + writable.destroy(error); + } + } + + PromisePrototypeThen( + writer.ready, + () => { + return PromisePrototypeThen( + writer.write(chunk), + done, + done); + }, + done); + }, + + destroy(error, callback) { + function done() { + try { + callback(error); + } catch (error) { + process.nextTick(() => { throw error; }); + } + } + + if (!closed) { + if (error != null) { + PromisePrototypeThen( + writer.abort(error), + done, + done); + } else { + PromisePrototypeThen( + writer.close(), + done, + done); + } + return; + } + + done(); + }, + + final(callback) { + function done(error) { + try { + callback(error); + } catch (error) { + process.nextTick(() => writable.destroy(error)); + } + } + + if (!closed) { + PromisePrototypeThen( + writer.close(), + done, + done); + } + }, + }); + + PromisePrototypeThen( + writer.closed, + () => { + closed = true; + // If the WritableStream closes before the stream.Writable has been + // ended, we signal an error on the stream.Writable. + if (!writable.writableEnded) { + writable.destroy( + new ERR_INVALID_STATE('The WritableStream closed unexpectedly')); + } + }, + (error) => { + // If the WritableStream errors before the stream.Writable has been + // destroyed, signal an error on the stream.Writable. + closed = true; + if (!writable.destroyed) + writable.destroy(error); + }); + + return writable; +} + +/** + * @param {Readable} streamReadable + * @returns {ReadableStream} + */ +function newReadableStreamFromStreamReadable(streamReadable) { + if (typeof streamReadable?._readableState !== 'object') { + throw new ERR_INVALID_ARG_TYPE( + 'streamReadable', + 'stream.Readable', + streamReadable); + } + + const objectMode = streamReadable.readableObjectMode; + const highWaterMark = streamReadable.readableHighWaterMark; + const strategy = + objectMode ? + new CountQueuingStrategy({ highWaterMark }) : + { highWaterMark }; + + let controller; + let closed = false; + + function maybeCloseOrError(error) { + if (closed) return; + closed = true; + if (error) + return controller.error(error); + controller.close(); + } + + function onClose() { + streamReadable.off('close', onClose); + streamReadable.off('data', onData); + streamReadable.off('end', onEnd); + streamReadable.off('error', onError); + maybeCloseOrError(); + } + + function onError(error) { + // If closed is false, the stream.Readable errored + // before the ReadableStream was closed, we need to + // go ahead and error it out. + maybeCloseOrError(error); + } + + function onEnd() { + // If closed is false, the stream.Readable is ending + // before the ReadableStream, we need to go ahead an + // close the ReadableStream. + maybeCloseOrError(); + } + + function onData(chunk) { + // Copy the Buffer to detach it from the pool. + if (Buffer.isBuffer(chunk) && !objectMode) + chunk = new Uint8Array(chunk); + controller.enqueue(chunk); + } + + streamReadable.pause(); + + streamReadable.once('close', onClose); + streamReadable.once('end', onEnd); + streamReadable.once('error', onError); + streamReadable.on('data', onData); + + return new ReadableStream({ + start(c) { controller = c; }, + + pull() { streamReadable.resume(); }, + + cancel(reason) { + closed = true; + // If the streamReadable has not yet been destroyed, + // destroy it immediately with the given reason. + if (!streamReadable.destroyed) + streamReadable.destroy(reason); + }, + }, strategy); +} + +/** + * @param {ReadableStream} readableStream + * @param {{ + * highWaterMark? : number, + * encoding? : string, + * objectMode? : boolean, + * signal? : AbortSignal, + * }} [options] + * @returns {Readable} + */ +function newStreamReadableFromReadableStream(readableStream, options = {}) { + if (!isReadableStream(readableStream)) { + throw new ERR_INVALID_ARG_TYPE( + 'readableStream', + 'ReadableStream', + readableStream); + } + + validateObject(options, 'options'); + const { + highWaterMark, + encoding, + objectMode = false, + signal, + } = options; + + if (encoding !== undefined && !Buffer.isEncoding(encoding)) + throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding'); + validateBoolean(objectMode, 'options.objectMode'); + if (signal !== undefined) + validateAbortSignal(signal, 'options.signal'); + + const reader = readableStream.getReader(); + let closed = false; + + const readable = new Readable({ + objectMode, + highWaterMark, + encoding, + signal, + + read() { + PromisePrototypeThen( + reader.read(), + (chunk) => { + readable.push(chunk.value); + if (chunk.done) + readable.push(null); + }, + (error) => readable.destroy(error)); + }, + + destroy(error, callback) { + function done() { + try { + callback(error); + } catch (error) { + process.nextTick(() => { throw error; }); + } + } + + if (!closed) { + PromisePrototypeThen( + reader.cancel(error), + done, + done); + return; + } + done(error); + }, + }); + + PromisePrototypeThen( + reader.closed, + () => { + closed = true; + if (!readable.readableEnded) + readable.push(null); + }, + (error) => { + closed = true; + if (!readable.destroyed) { + readable.destroy(error); + } + }); + + return readable; +} + +/** + * @typedef {import('./readablestream').ReadableWritablePair + * } ReadableWritablePair + * @typedef {import('../../stream').Duplex} Duplex + * + * @param {Duplex} duplex + * @returns {ReadableWritablePair} + */ +function newReadableWritablePairFromDuplex(duplex) { + if (typeof duplex?._writableState !== 'object' || + typeof duplex?._readableState !== 'object') { + throw new ERR_INVALID_ARG_TYPE('duplex', 'stream.Duplex', duplex); + } + + const writable = newWritableStreamFromStreamWritable(duplex); + const readable = newReadableStreamFromStreamReadable(duplex); + + return { + writable, + readable, + }; +} + +/** + * @param {ReadableWritablePair} pair + * @param {{ + * decodeStrings? : boolean, + * encoding? : string, + * highWaterMark? : number, + * objectMode? : boolean, + * signal? : AbortSignal, + * }} [options] + * @returns + */ +function newDuplexFromReadableWritablePair(pair = {}, options = {}) { + validateObject(pair, 'pair'); + const { + readable: readableStream, + writable: writableStream, + } = pair; + + if (!isReadableStream(readableStream)) { + throw new ERR_INVALID_ARG_TYPE( + 'pair.readable', + 'ReadableStream', + readableStream); + } + if (!isWritableStream(writableStream)) { + throw new ERR_INVALID_ARG_TYPE( + 'pair.writable', + 'WritableStream', + writableStream); + } + + validateObject(options, 'options'); + const { + objectMode = false, + encoding, + decodeStrings = true, + highWaterMark, + signal, + } = options; + + validateBoolean(objectMode, 'options.objectMode'); + if (signal !== undefined) + validateAbortSignal(signal, 'options.signal'); + if (encoding !== undefined && !Buffer.isEncoding(encoding)) + throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding'); + + const writer = writableStream.getWriter(); + const reader = readableStream.getReader(); + let writableClosed = false; + let readableClosed = false; + + const duplex = new Duplex({ + allowHalfOpen: false, + highWaterMark: highWaterMark || writer.desiredSize, + objectMode, + encoding, + decodeStrings, + signal, + + writev(chunks, callback) { + if (writableClosed) { + callback(new ERR_INVALID_STATE('The WritableStream is closed')); + return; + } + + function done(error) { + try { + callback(error); + } catch (error) { + process.nextTick(() => duplex.destroy(error)); + } + } + + PromisePrototypeThen( + writer.ready, + () => { + return PromisePrototypeThen( + PromiseAll( + ArrayPrototypeMap( + chunks, + (chunk) => writer.write(chunk))), + done, + done); + }, + done); + }, + + write(chunk, encoding, callback) { + if (writableClosed) { + callback(new ERR_INVALID_STATE('The WritableStream is closed')); + return; + } + if (typeof chunk === 'string' && decodeStrings && !objectMode) + chunk = new Uint8Array(Buffer.from(chunk, encoding)); + + function done(error) { + try { + callback(error); + } catch (error) { + duplex.destroy(error); + } + } + + PromisePrototypeThen( + writer.ready, + () => { + return PromisePrototypeThen( + writer.write(chunk), + done, + done); + }, + done); + }, + + final(callback) { + function done(error) { + try { + callback(error); + } catch (error) { + process.nextTick(() => duplex.destroy(error)); + } + } + + if (!writableClosed) { + PromisePrototypeThen( + writer.close(), + done, + done); + } + }, + + read() { + PromisePrototypeThen( + reader.read(), + (chunk) => { + duplex.push(chunk.value); + if (chunk.done) + duplex.push(null); + }, + (error) => duplex.destroy(error)); + }, + + destroy(error, callback) { + function done() { + try { + callback(error); + } catch (error) { + process.nextTick(() => { throw error; }); + } + } + + async function closeWriter() { + if (!writableClosed) + await writer.abort(error); + } + + async function closeReader() { + if (!readableClosed) + await reader.cancel(error); + } + + if (!writableClosed || !readableClosed) { + PromisePrototypeThen( + PromiseAll([ + closeWriter(), + closeReader(), + ]), + done, + done); + return; + } + + done(); + }, + }); + + PromisePrototypeThen( + writer.closed, + () => { + writableClosed = true; + if (!duplex.writableEnded) { + duplex.destroy( + new ERR_INVALID_STATE('The WritableStream closed unexpectedly')); + } + }, + (error) => { + writableClosed = true; + readableClosed = true; + duplex.destroy(error); + }); + + PromisePrototypeThen( + reader.closed, + () => { + readableClosed = true; + if (!duplex.readableEnded) + duplex.push(null); + }, + (error) => { + writableClosed = true; + readableClosed = true; + duplex.destroy(error); + }); + + return duplex; +} + +function newWritableStreamFromStreamBase(streamBase, strategy) { + validateObject(streamBase, 'streamBase'); + + let current; + + function createWriteWrap(controller, promise) { + const req = new WriteWrap(); + req.handle = streamBase; + req.oncomplete = onWriteComplete; + req.async = false; + req.bytes = 0; + req.buffer = null; + req.controller = controller; + req.promise = promise; + return req; + } + + function onWriteComplete(status) { + if (status < 0) { + const error = errnoException(status, 'write', this.error); + this.reject(error); + this.controller.error(error); + return; + } + this.resolve(); + } + + function doWrite(chunk, controller) { + const promise = createDeferredPromise(); + let ret; + let req; + try { + req = createWriteWrap(controller, promise); + ret = streamBase.writeBuffer(req, chunk); + if (streamBaseState[kLastWriteWasAsync]) + req.buffer = chunk; + req.async = !!streamBaseState[kLastWriteWasAsync]; + } catch (error) { + promise.reject(error); + } + + if (ret !== 0) + promise.reject(errnoException(ret, 'write', req?.error)); + else if (!req.async) + promise.resolve(); + + return promise.promise; + } + + return new WritableStream({ + write(chunk, controller) { + current = current !== undefined ? + PromisePrototypeThen( + current, + () => doWrite(chunk, controller), + (error) => controller.error(error)) : + doWrite(chunk, controller); + return current; + }, + + close() { + const promise = createDeferredPromise(); + const req = new ShutdownWrap(); + req.oncomplete = () => promise.resolve(); + const err = streamBase.shutdown(req); + if (err === 1) + promise.resolve(); + return promise.promise; + }, + }, strategy); +} + +function newReadableStreamFromStreamBase(streamBase, strategy) { + validateObject(streamBase, 'streamBase'); + + if (typeof streamBase.onread === 'function') + throw new ERR_INVALID_STATE('StreamBase already has a consumer'); + + let controller; + + streamBase.onread = (arrayBuffer) => { + const nread = streamBaseState[kReadBytesOrError]; + + if (nread === 0) + return; + + try { + if (nread === UV_EOF) { + controller.close(); + return; + } + + controller.enqueue(arrayBuffer); + + if (controller.desiredSize <= 0) + streamBase.readStop(); + } catch (error) { + controller.error(error); + } + }; + + return new ReadableStream({ + start(c) { controller = c; }, + + pull() { + streamBase.readStart(); + }, + + cancel() { + const promise = createDeferredPromise(); + const req = new ShutdownWrap(); + req.oncomplete = () => promise.resolve(); + const err = streamBase.shutdown(req); + if (err === 1) + promise.resolve(); + return promise.promise; + }, + }, strategy); +} + +module.exports = { + newWritableStreamFromStreamWritable, + newReadableStreamFromStreamReadable, + newStreamWritableFromWritableStream, + newStreamReadableFromReadableStream, + newReadableWritablePairFromDuplex, + newDuplexFromReadableWritablePair, + newWritableStreamFromStreamBase, + newReadableStreamFromStreamBase, +}; diff --git a/lib/stream/web.js b/lib/stream/web.js index 929abd19044458..fb05a7a91cc79d 100644 --- a/lib/stream/web.js +++ b/lib/stream/web.js @@ -31,6 +31,15 @@ const { CountQueuingStrategy, } = require('internal/webstreams/queuingstrategies'); +const { + newWritableStreamFromStreamWritable, + newReadableStreamFromStreamReadable, + newStreamWritableFromWritableStream, + newStreamReadableFromReadableStream, + newReadableWritablePairFromDuplex, + newDuplexFromReadableWritablePair, +} = require('internal/webstreams/adapters'); + module.exports = { ReadableStream, ReadableStreamDefaultReader, @@ -45,4 +54,12 @@ module.exports = { WritableStreamDefaultController, ByteLengthQueuingStrategy, CountQueuingStrategy, + + // Non-standard Node.js specific Adapter APIs + newWritableStreamFromStreamWritable, + newReadableStreamFromStreamReadable, + newStreamWritableFromWritableStream, + newStreamReadableFromReadableStream, + newReadableWritablePairFromDuplex, + newDuplexFromReadableWritablePair, }; diff --git a/test/parallel/test-whatwg-webstreams-adapters-to-readablestream.js b/test/parallel/test-whatwg-webstreams-adapters-to-readablestream.js new file mode 100644 index 00000000000000..974d4f9aa32aae --- /dev/null +++ b/test/parallel/test-whatwg-webstreams-adapters-to-readablestream.js @@ -0,0 +1,181 @@ +// Flags: --no-warnings --expose-internals +'use strict'; + +const common = require('../common'); + +const assert = require('assert'); + +const { + newReadableStreamFromStreamReadable, +} = require('stream/web'); + +const { + Readable, +} = require('stream'); + +const { + kState, +} = require('internal/webstreams/util'); + +{ + // Canceling the readableStream closes the readable. + const readable = new Readable({ + read() { + readable.push('hello'); + readable.push(null); + } + }); + + readable.on('close', common.mustCall()); + readable.on('end', common.mustNotCall()); + readable.on('error', common.mustNotCall()); + readable.on('pause', common.mustCall()); + readable.on('resume', common.mustNotCall()); + + const readableStream = newReadableStreamFromStreamReadable(readable); + + readableStream.cancel().then(common.mustCall()); +} + +{ + // Destroying the readable without an error just closes the + // readableStream without an error. + const readable = new Readable({ + read() { + readable.push('hello'); + readable.push(null); + } + }); + + const readableStream = newReadableStreamFromStreamReadable(readable); + + assert(!readableStream.locked); + + const reader = readableStream.getReader(); + + reader.closed.then(common.mustCall()); + + readable.on('end', common.mustNotCall()); + readable.on('error', common.mustNotCall()); + + readable.on('close', common.mustCall(() => { + assert.strictEqual(readableStream[kState].state, 'closed'); + })); + + readable.destroy(); +} + +{ + // Ending the readable without an error just closes the + // readableStream without an error. + const readable = new Readable({ + read() { + readable.push('hello'); + readable.push(null); + } + }); + + const readableStream = newReadableStreamFromStreamReadable(readable); + + assert(!readableStream.locked); + + const reader = readableStream.getReader(); + + reader.closed.then(common.mustCall()); + + readable.on('end', common.mustCall()); + readable.on('error', common.mustNotCall()); + + readable.on('close', common.mustCall(() => { + assert.strictEqual(readableStream[kState].state, 'closed'); + })); + + readable.push(null); +} + +{ + // Destroying the readable with an error should error the readableStream + const error = new Error('boom'); + const readable = new Readable({ + read() { + readable.push('hello'); + readable.push(null); + } + }); + + const readableStream = newReadableStreamFromStreamReadable(readable); + + assert(!readableStream.locked); + + const reader = readableStream.getReader(); + + assert.rejects(reader.closed, error); + + readable.on('end', common.mustNotCall()); + readable.on('error', common.mustCall((reason) => { + assert.strictEqual(reason, error); + })); + + readable.on('close', common.mustCall(() => { + assert.strictEqual(readableStream[kState].state, 'errored'); + })); + + readable.destroy(error); +} + +{ + const readable = new Readable({ + encoding: 'utf8', + read() { + readable.push('hello'); + readable.push(null); + } + }); + + const readableStream = newReadableStreamFromStreamReadable(readable); + const reader = readableStream.getReader(); + + readable.on('data', common.mustCall()); + readable.on('end', common.mustCall()); + readable.on('close', common.mustCall()); + + (async () => { + assert.deepStrictEqual( + await reader.read(), + { value: 'hello', done: false }); + assert.deepStrictEqual( + await reader.read(), + { value: undefined, done: true }); + + })().then(common.mustCall()); +} + +{ + const data = {}; + const readable = new Readable({ + objectMode: true, + read() { + readable.push(data); + readable.push(null); + } + }); + + assert(readable.readableObjectMode); + + const readableStream = newReadableStreamFromStreamReadable(readable); + const reader = readableStream.getReader(); + + readable.on('data', common.mustCall()); + readable.on('end', common.mustCall()); + readable.on('close', common.mustCall()); + + (async () => { + assert.deepStrictEqual( + await reader.read(), + { value: data, done: false }); + assert.deepStrictEqual( + await reader.read(), + { value: undefined, done: true }); + + })().then(common.mustCall()); +} diff --git a/test/parallel/test-whatwg-webstreams-adapters-to-readablewritablepair.js b/test/parallel/test-whatwg-webstreams-adapters-to-readablewritablepair.js new file mode 100644 index 00000000000000..4bdeaac9ca6d95 --- /dev/null +++ b/test/parallel/test-whatwg-webstreams-adapters-to-readablewritablepair.js @@ -0,0 +1,203 @@ +// Flags: --no-warnings +'use strict'; + +const common = require('../common'); + +const assert = require('assert'); + +const { + newReadableWritablePairFromDuplex, +} = require('stream/web'); + +const { + PassThrough, +} = require('stream'); + +{ + // Destroying the duplex without an error should close + // the readable and error the writable. + + const duplex = new PassThrough(); + const { + readable, + writable, + } = newReadableWritablePairFromDuplex(duplex); + + const reader = readable.getReader(); + const writer = writable.getWriter(); + + reader.closed.then(common.mustCall()); + assert.rejects(writer.closed, { + code: 'ERR_INVALID_STATE', + }); + + duplex.destroy(); + + duplex.on('close', common.mustCall()); +} + +{ + // Destroying the duplex with an error should error + // both the readable and writable + + const error = new Error('boom'); + const duplex = new PassThrough(); + const { + readable, + writable, + } = newReadableWritablePairFromDuplex(duplex); + + duplex.on('close', common.mustCall()); + duplex.on('error', common.mustCall((reason) => { + assert.strictEqual(reason, error); + })); + + const reader = readable.getReader(); + const writer = writable.getWriter(); + + assert.rejects(reader.closed, error); + assert.rejects(writer.closed, error); + + duplex.destroy(error); +} + +{ + const error = new Error('boom'); + const duplex = new PassThrough(); + const { + readable, + writable, + } = newReadableWritablePairFromDuplex(duplex); + + duplex.on('close', common.mustCall()); + duplex.on('error', common.mustCall((reason) => { + assert.strictEqual(reason, error); + })); + + const reader = readable.getReader(); + const writer = writable.getWriter(); + + reader.closed.then(common.mustCall()); + assert.rejects(writer.closed, error); + + reader.cancel(error).then(common.mustCall()); +} + +{ + const duplex = new PassThrough(); + const { + readable, + writable, + } = newReadableWritablePairFromDuplex(duplex); + + duplex.on('close', common.mustCall()); + duplex.on('error', common.mustNotCall()); + + const reader = readable.getReader(); + const writer = writable.getWriter(); + + reader.closed.then(common.mustCall()); + writer.closed.then(common.mustCall()); + + writer.close().then(common.mustCall()); +} + +{ + const error = new Error('boom'); + const duplex = new PassThrough(); + const { + readable, + writable, + } = newReadableWritablePairFromDuplex(duplex); + + duplex.on('close', common.mustCall()); + duplex.on('error', common.mustCall((reason) => { + assert.strictEqual(reason, error); + })); + + const reader = readable.getReader(); + const writer = writable.getWriter(); + + assert.rejects(reader.closed, error); + assert.rejects(writer.closed, error); + + writer.abort(error).then(common.mustCall()); +} + +{ + const duplex = new PassThrough(); + const { + readable, + writable, + } = newReadableWritablePairFromDuplex(duplex); + + duplex.on('close', common.mustCall()); + duplex.on('error', common.mustNotCall()); + + const reader = readable.getReader(); + const writer = writable.getWriter(); + + assert.rejects(writer.closed, { + code: 'ERR_INVALID_STATE', + }); + + reader.cancel(); +} + +{ + const duplex = new PassThrough(); + const { + readable, + writable, + } = newReadableWritablePairFromDuplex(duplex); + + duplex.on('close', common.mustCall()); + duplex.on('error', common.mustNotCall()); + + const reader = readable.getReader(); + const writer = writable.getWriter(); + + reader.closed.then(common.mustCall()); + assert.rejects(writer.closed, { + code: 'ERR_INVALID_STATE', + }); + + duplex.end(); +} + +{ + const duplex = new PassThrough(); + const { + readable, + writable, + } = newReadableWritablePairFromDuplex(duplex); + + duplex.on('data', common.mustCall(2)); + duplex.on('close', common.mustCall()); + duplex.on('end', common.mustCall()); + duplex.on('finish', common.mustCall()); + + const writer = writable.getWriter(); + const reader = readable.getReader(); + + const ec = new TextEncoder(); + const dc = new TextDecoder(); + + Promise.all([ + writer.write(ec.encode('hello')), + reader.read().then(common.mustCall(({ done, value }) => { + assert(!done); + assert.strictEqual(dc.decode(value), 'hello'); + })), + reader.read().then(common.mustCall(({ done, value }) => { + assert(!done); + assert.strictEqual(dc.decode(value), 'there'); + })), + writer.write(ec.encode('there')), + writer.close(), + reader.read().then(common.mustCall(({ done, value }) => { + assert(done); + assert.strictEqual(value, undefined); + })), + ]).then(common.mustCall()); +} diff --git a/test/parallel/test-whatwg-webstreams-adapters-to-streamduplex.js b/test/parallel/test-whatwg-webstreams-adapters-to-streamduplex.js new file mode 100644 index 00000000000000..d1dd82024616e3 --- /dev/null +++ b/test/parallel/test-whatwg-webstreams-adapters-to-streamduplex.js @@ -0,0 +1,88 @@ +// Flags: --no-warnings --expose-internals +'use strict'; + +const common = require('../common'); + +const assert = require('assert'); + +const { + TransformStream, + newDuplexFromReadableWritablePair, +} = require('stream/web'); + +const { + kState, +} = require('internal/webstreams/util'); + +{ + const transform = new TransformStream(); + const duplex = newDuplexFromReadableWritablePair(transform); + + assert(transform.readable.locked); + assert(transform.writable.locked); + + duplex.destroy(); + + duplex.on('close', common.mustCall(() => { + assert.strictEqual(transform.readable[kState].state, 'closed'); + assert.strictEqual(transform.writable[kState].state, 'errored'); + })); +} + +{ + const error = new Error('boom'); + const transform = new TransformStream(); + const duplex = newDuplexFromReadableWritablePair(transform); + + assert(transform.readable.locked); + assert(transform.writable.locked); + + duplex.destroy(error); + duplex.on('error', common.mustCall((reason) => { + assert.strictEqual(reason, error); + })); + + duplex.on('close', common.mustCall(() => { + assert.strictEqual(transform.readable[kState].state, 'closed'); + assert.strictEqual(transform.writable[kState].state, 'errored'); + assert.strictEqual(transform.writable[kState].storedError, error); + })); +} + +{ + const transform = new TransformStream(); + const duplex = new newDuplexFromReadableWritablePair(transform); + + duplex.end(); + duplex.resume(); + + duplex.on('close', common.mustCall(() => { + assert.strictEqual(transform.readable[kState].state, 'closed'); + assert.strictEqual(transform.writable[kState].state, 'closed'); + })); +} + +{ + const ec = new TextEncoder(); + const dc = new TextDecoder(); + const transform = new TransformStream({ + transform(chunk, controller) { + const text = dc.decode(chunk); + controller.enqueue(ec.encode(text.toUpperCase())); + } + }); + const duplex = new newDuplexFromReadableWritablePair(transform, { + encoding: 'utf8', + }); + + duplex.end('hello'); + duplex.on('data', common.mustCall((chunk) => { + assert.strictEqual(chunk, 'HELLO'); + })); + duplex.on('end', common.mustCall()); + + duplex.on('close', common.mustCall(() => { + assert.strictEqual(transform.readable[kState].state, 'closed'); + assert.strictEqual(transform.writable[kState].state, 'closed'); + })); +} diff --git a/test/parallel/test-whatwg-webstreams-adapters-to-streamreadable.js b/test/parallel/test-whatwg-webstreams-adapters-to-streamreadable.js new file mode 100644 index 00000000000000..fc63ae3e2bc44b --- /dev/null +++ b/test/parallel/test-whatwg-webstreams-adapters-to-streamreadable.js @@ -0,0 +1,156 @@ +// Flags: --expose-internals --no-warnings +'use strict'; + +const common = require('../common'); + +const assert = require('assert'); + +const { + ReadableStream, + WritableStream, + newStreamReadableFromReadableStream, +} = require('stream/web'); + +const { + kState, +} = require('internal/webstreams/util'); + +class MySource { + constructor(value = new Uint8Array(10)) { + this.value = value; + } + + start(c) { + this.started = true; + this.controller = c; + } + + pull(controller) { + controller.enqueue(this.value); + controller.close(); + } + + cancel(reason) { + this.canceled = true; + this.cancelReason = reason; + } +} + +{ + // Destroying the readable without an error closes + // the readableStream. + + const readableStream = new ReadableStream(); + const readable = newStreamReadableFromReadableStream(readableStream); + + assert(readableStream.locked); + + assert.rejects(readableStream.cancel(), { + code: 'ERR_INVALID_STATE', + }); + assert.rejects(readableStream.pipeTo(new WritableStream()), { + code: 'ERR_INVALID_STATE', + }); + assert.throws(() => readableStream.tee(), { + code: 'ERR_INVALID_STATE', + }); + assert.throws(() => readableStream.getReader(), { + code: 'ERR_INVALID_STATE', + }); + assert.throws(() => { + readableStream.pipeThrough({ + readable: new ReadableStream(), + writable: new WritableStream(), + }); + }, { + code: 'ERR_INVALID_STATE', + }); + + readable.destroy(); + + readable.on('close', common.mustCall(() => { + assert.strictEqual(readableStream[kState].state, 'closed'); + })); +} + +{ + // Destroying the readable with an error closes the readableStream + // without error but recors the cancel reason in the source. + const error = new Error('boom'); + const source = new MySource(); + const readableStream = new ReadableStream(source); + const readable = newStreamReadableFromReadableStream(readableStream); + + assert(readableStream.locked); + + readable.destroy(error); + + readable.on('error', common.mustCall((reason) => { + assert.strictEqual(reason, error); + })); + + readable.on('close', common.mustCall(() => { + assert.strictEqual(readableStream[kState].state, 'closed'); + assert.strictEqual(source.cancelReason, error); + })); +} + +{ + // An error in the source causes the readable to error. + const error = new Error('boom'); + const source = new MySource(); + const readableStream = new ReadableStream(source); + const readable = newStreamReadableFromReadableStream(readableStream); + + assert(readableStream.locked); + + source.controller.error(error); + + readable.on('error', common.mustCall((reason) => { + assert.strictEqual(reason, error); + })); + + readable.on('close', common.mustCall(() => { + assert.strictEqual(readableStream[kState].state, 'errored'); + })); +} + +{ + const readableStream = new ReadableStream(new MySource()); + const readable = newStreamReadableFromReadableStream(readableStream); + + readable.on('data', common.mustCall((chunk) => { + assert.deepStrictEqual(chunk, Buffer.alloc(10)); + })); + readable.on('end', common.mustCall()); + readable.on('close', common.mustCall()); + readable.on('error', common.mustNotCall()); +} + +{ + const readableStream = new ReadableStream(new MySource('hello')); + const readable = newStreamReadableFromReadableStream(readableStream, { + encoding: 'utf8', + }); + + readable.on('data', common.mustCall((chunk) => { + assert.deepStrictEqual(chunk, 'hello'); + })); + readable.on('end', common.mustCall()); + readable.on('close', common.mustCall()); + readable.on('error', common.mustNotCall()); +} + +{ + const readableStream = new ReadableStream(new MySource()); + const readable = newStreamReadableFromReadableStream(readableStream, { + objectMode: true + }); + + readable.on('data', common.mustCall((chunk) => { + assert.deepStrictEqual(chunk, new Uint8Array(10)); + })); + readable.on('end', common.mustCall()); + readable.on('close', common.mustCall()); + readable.on('error', common.mustNotCall()); +} diff --git a/test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js b/test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js new file mode 100644 index 00000000000000..bd8b0fb0387e24 --- /dev/null +++ b/test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js @@ -0,0 +1,190 @@ +// Flags: --no-warnings --expose-internals +'use strict'; + +const common = require('../common'); + +const assert = require('assert'); + +const { + WritableStream, + newStreamWritableFromWritableStream, +} = require('stream/web'); + +const { + kState, +} = require('internal/webstreams/util'); + +class TestSource { + constructor() { + this.chunks = []; + } + + start(c) { + this.controller = c; + this.started = true; + } + + write(chunk) { + this.chunks.push(chunk); + } + + close() { + this.closed = true; + } + + abort(reason) { + this.abortReason = reason; + } +} + +[1, {}, false, []].forEach((arg) => { + assert.throws(() => newStreamWritableFromWritableStream(arg), { + code: 'ERR_INVALID_ARG_TYPE', + }); +}); + +{ + // Ending the stream.Writable should close the writableStream + const source = new TestSource(); + const writableStream = new WritableStream(source); + const writable = newStreamWritableFromWritableStream(writableStream); + + assert(writableStream.locked); + + writable.end('chunk'); + + writable.on('close', common.mustCall(() => { + assert(writableStream.locked); + assert.strictEqual(writableStream[kState].state, 'closed'); + assert.strictEqual(source.chunks.length, 1); + assert.deepStrictEqual(source.chunks[0], Buffer.from('chunk')); + })); +} + +{ + // Destroying the stream.Writable without an error should close + // the writableStream with no error. + const source = new TestSource(); + const writableStream = new WritableStream(source); + const writable = newStreamWritableFromWritableStream(writableStream); + + assert(writableStream.locked); + + writable.destroy(); + + writable.on('close', common.mustCall(() => { + assert(writableStream.locked); + assert.strictEqual(writableStream[kState].state, 'closed'); + assert.strictEqual(source.chunks.length, 0); + })); +} + +{ + // Destroying the stream.Writable with an error should error + // the writableStream + const error = new Error('boom'); + const source = new TestSource(); + const writableStream = new WritableStream(source); + const writable = newStreamWritableFromWritableStream(writableStream); + + assert(writableStream.locked); + + writable.destroy(error); + + writable.on('error', common.mustCall((reason) => { + assert.strictEqual(reason, error); + })); + + writable.on('close', common.mustCall(() => { + assert(writableStream.locked); + assert.strictEqual(writableStream[kState].state, 'errored'); + assert.strictEqual(writableStream[kState].storedError, error); + assert.strictEqual(source.chunks.length, 0); + })); +} + +{ + // Attempting to close, abort, or getWriter on writableStream + // should fail because it is locked. An internal error in + // writableStream should error the writable. + const error = new Error('boom'); + const source = new TestSource(); + const writableStream = new WritableStream(source); + const writable = newStreamWritableFromWritableStream(writableStream); + + assert(writableStream.locked); + + assert.rejects(writableStream.close(), { + code: 'ERR_INVALID_STATE', + }); + + assert.rejects(writableStream.abort(), { + code: 'ERR_INVALID_STATE', + }); + + assert.throws(() => writableStream.getWriter(), { + code: 'ERR_INVALID_STATE', + }); + + writable.on('error', common.mustCall((reason) => { + assert.strictEqual(error, reason); + })); + + source.controller.error(error); +} + +{ + const source = new TestSource(); + const writableStream = new WritableStream(source); + const writable = newStreamWritableFromWritableStream(writableStream); + + writable.on('error', common.mustNotCall()); + writable.on('finish', common.mustCall()); + writable.on('close', common.mustCall(() => { + assert.strictEqual(source.chunks.length, 1); + assert.deepStrictEqual(source.chunks[0], Buffer.from('hello')); + })); + + writable.write('hello', common.mustCall()); + writable.end(); +} + +{ + const source = new TestSource(); + const writableStream = new WritableStream(source); + const writable = + newStreamWritableFromWritableStream(writableStream, { + decodeStrings: false, + }); + + writable.on('error', common.mustNotCall()); + writable.on('finish', common.mustCall()); + writable.on('close', common.mustCall(() => { + assert.strictEqual(source.chunks.length, 1); + assert.deepStrictEqual(source.chunks[0], 'hello'); + })); + + writable.write('hello', common.mustCall()); + writable.end(); +} + +{ + const source = new TestSource(); + const writableStream = new WritableStream(source); + const writable = + newStreamWritableFromWritableStream( + writableStream, { + objectMode: true + }); + assert(writable.writableObjectMode); + + writable.on('error', common.mustNotCall()); + writable.on('finish', common.mustCall()); + writable.on('close', common.mustCall(() => { + assert.strictEqual(source.chunks.length, 1); + assert.strictEqual(source.chunks[0], 'hello'); + })); + + writable.write('hello', common.mustCall()); + writable.end(); +} diff --git a/test/parallel/test-whatwg-webstreams-adapters-to-writablestream.js b/test/parallel/test-whatwg-webstreams-adapters-to-writablestream.js new file mode 100644 index 00000000000000..14f4fbde84c214 --- /dev/null +++ b/test/parallel/test-whatwg-webstreams-adapters-to-writablestream.js @@ -0,0 +1,151 @@ +// Flags: --no-warnings + +'use strict'; + +const common = require('../common'); + +const assert = require('assert'); + +const { + newWritableStreamFromStreamWritable, +} = require('stream/web'); + +const { + Writable, + PassThrough, +} = require('stream'); + +class TestWritable extends Writable { + constructor(asyncWrite = false) { + super(); + this.chunks = []; + this.asyncWrite = asyncWrite; + } + + _write(chunk, encoding, callback) { + this.chunks.push({ chunk, encoding }); + if (this.asyncWrite) { + setImmediate(() => callback()); + return; + } + callback(); + } +} + +[1, {}, false, []].forEach((arg) => { + assert.throws(() => newWritableStreamFromStreamWritable(arg), { + code: 'ERR_INVALID_ARG_TYPE', + }); +}); + +{ + // Closing the WritableStream normally closes the stream.Writable + // without errors. + + const writable = new TestWritable(); + writable.on('error', common.mustNotCall()); + writable.on('finish', common.mustCall()); + writable.on('close', common.mustCall()); + + const writableStream = newWritableStreamFromStreamWritable(writable); + + writableStream.close().then(common.mustCall(() => { + assert(writable.destroyed); + })); +} + +{ + // Aborting the WritableStream errors the stream.Writable + + const error = new Error('boom'); + const writable = new TestWritable(); + writable.on('error', common.mustCall((reason) => { + assert.strictEqual(reason, error); + })); + writable.on('finish', common.mustNotCall()); + writable.on('close', common.mustCall()); + + const writableStream = newWritableStreamFromStreamWritable(writable); + + writableStream.abort(error).then(common.mustCall(() => { + assert(writable.destroyed); + })); +} + +{ + // Destroying the stream.Writable prematurely errors the + // WritableStream + + const error = new Error('boom'); + const writable = new TestWritable(); + + const writableStream = newWritableStreamFromStreamWritable(writable); + assert.rejects(writableStream.close(), error); + writable.destroy(error); +} + +{ + // Ending the stream.Writable directly errors the WritableStream + const writable = new TestWritable(); + + const writableStream = newWritableStreamFromStreamWritable(writable); + + assert.rejects(writableStream.close(), { + code: 'ERR_INVALID_STATE' + }); + + writable.end(); +} + +{ + const writable = new TestWritable(); + const writableStream = newWritableStreamFromStreamWritable(writable); + const writer = writableStream.getWriter(); + const ec = new TextEncoder(); + writer.write(ec.encode('hello')).then(common.mustCall(() => { + assert.strictEqual(writable.chunks.length, 1); + assert.deepStrictEqual( + writable.chunks[0], + { + chunk: Buffer.from('hello'), + encoding: 'buffer' + }); + })); +} + +{ + const writable = new TestWritable(true); + + writable.on('error', common.mustNotCall()); + writable.on('close', common.mustCall()); + writable.on('finish', common.mustCall()); + + const writableStream = newWritableStreamFromStreamWritable(writable); + const writer = writableStream.getWriter(); + const ec = new TextEncoder(); + writer.write(ec.encode('hello')).then(common.mustCall(() => { + assert.strictEqual(writable.chunks.length, 1); + assert.deepStrictEqual( + writable.chunks[0], + { + chunk: Buffer.from('hello'), + encoding: 'buffer' + }); + writer.close().then(common.mustCall()); + })); +} + +{ + const duplex = new PassThrough(); + duplex.setEncoding('utf8'); + const writableStream = newWritableStreamFromStreamWritable(duplex); + const ec = new TextEncoder(); + writableStream + .getWriter() + .write(ec.encode('hello')) + .then(common.mustCall()); + + duplex.on('data', common.mustCall((chunk) => { + assert.strictEqual(chunk, 'hello'); + })); +}