diff --git a/build/replacements.mjs b/build/replacements.mjs index 8f32ff8ed..41d50de37 100644 --- a/build/replacements.mjs +++ b/build/replacements.mjs @@ -275,11 +275,21 @@ const duplexFromTestWebStreamNeutralizeWritable = [ 'makeATestWritableStreamOff(writeFunc) {' ] -const polyfillAddAbortSignal = [ +const polyfillAddAbortListener = [ 'require\\(\'events\'\\).addAbortListener', 'require(\'../../ours/util\').addAbortListener' ] +const abortSignalAny = [ + 'AbortSignal.any', + 'require(\'../../ours/util\').AbortSignalAny' +] + +const asyncDisposeTest = [ + 'Symbol.asyncDispose', + 'require(\'../../lib/ours/primordials\').SymbolAsyncDispose' +] + export const replacements = { 'lib/_stream.+': [legacyStreamsRequireStream], 'lib/internal/streams/duplexify.+': [ @@ -292,10 +302,11 @@ export const replacements = { internalStreamsAbortControllerPolyfill, internalStreamsNoRequireAbortController, internalStreamsNoRequireAbortController2, - internalStreamsWeakHandler2 + internalStreamsWeakHandler2, + abortSignalAny ], 'lib/internal/streams/add-abort-signal.js': [ - polyfillAddAbortSignal + polyfillAddAbortListener ], 'lib/internal/streams/readable.js': [ removefromWebReadableMethod, @@ -314,7 +325,8 @@ export const replacements = { internalStreamsRequireWebStream, internalStreamsRequireInternal, internalStreamsWeakHandler, - internalStreamsInspectCustom + internalStreamsInspectCustom, + polyfillAddAbortListener ], 'lib/internal/validators.js': [ internalValidatorsRequireAssert, @@ -362,6 +374,7 @@ export const replacements = { removeWebStreamsFromDuplexFromTest ], 'test/parallel/test-stream-finished.js': [testParallelFinishedEvent], + 'test/parallel/test-stream-readable-dispose.js': [asyncDisposeTest], 'test/parallel/test-stream-flatMap.js': [testParallelFlatMapWinLineSeparator], 'test/parallel/test-stream-preprocess.js': [testParallelPreprocessWinLineSeparator], 'test/parallel/test-stream-writable-samecb-singletick.js': [ diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 77c31616b..aa7fff34e 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -213,7 +213,7 @@ function eos(stream, options, callback) { if (options.signal.aborted) { process.nextTick(abort) } else { - addAbortListener ??= require('events').addAbortListener + addAbortListener ??= require('../../ours/util').addAbortListener const disposable = addAbortListener(options.signal, abort) const originalCallback = callback callback = once((...args) => { @@ -240,7 +240,7 @@ function eosWeb(stream, options, callback) { if (options.signal.aborted) { process.nextTick(abort) } else { - addAbortListener ??= require('events').addAbortListener + addAbortListener ??= require('../../ours/util').addAbortListener const disposable = addAbortListener(options.signal, abort) const originalCallback = callback callback = once((...args) => { diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index beaa64871..7eff11047 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -66,7 +66,7 @@ function map(fn, options) { validateInteger(highWaterMark, 'options.highWaterMark', 0) highWaterMark += concurrency return async function* map() { - const signal = AbortSignal.any( + const signal = require('../../ours/util').AbortSignalAny( [options === null || options === undefined ? undefined : options.signal].filter(Boolean) ) const stream = this diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index cb8d0bada..138d3686f 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -186,7 +186,7 @@ function pipelineImpl(streams, callback, opts) { function abort() { finishImpl(new AbortError()) } - addAbortListener ??= require('events').addAbortListener + addAbortListener ??= require('../../ours/util').addAbortListener let disposable if (outerSignal) { disposable = addAbortListener(outerSignal, abort) diff --git a/lib/ours/util.js b/lib/ours/util.js index 5859d6128..62bdc17ab 100644 --- a/lib/ours/util.js +++ b/lib/ours/util.js @@ -2,6 +2,7 @@ const bufferModule = require('buffer') const { kResistStopPropagation, SymbolDispose } = require('./primordials') +const AbortSignal = global.AbortSignal || require('abort-controller').AbortSignal const AsyncFunction = Object.getPrototypeOf(async function () {}).constructor const Blob = globalThis.Blob || bufferModule.Blob /* eslint-disable indent */ @@ -159,6 +160,15 @@ module.exports = { : _removeEventListener() } } + }, + AbortSignalAny: + AbortSignal.any || + function AbortSignalAny(signals) { + if (signals[0]) { + return signals[0] + } + const ac = new AbortController() + return ac.signal } } module.exports.promisify.custom = Symbol.for('nodejs.util.promisify.custom') diff --git a/src/util.js b/src/util.js index 3f697ce81..ad5c3f4e0 100644 --- a/src/util.js +++ b/src/util.js @@ -2,6 +2,7 @@ const bufferModule = require('buffer') const { kResistStopPropagation, SymbolDispose } = require('./primordials') +const AbortSignal = global.AbortSignal || require('abort-controller').AbortSignal const AsyncFunction = Object.getPrototypeOf(async function () {}).constructor const Blob = globalThis.Blob || bufferModule.Blob @@ -160,6 +161,13 @@ module.exports = { removeEventListener?.(); }, }; + }, + AbortSignalAny: AbortSignal.any || function AbortSignalAny(signals) { + if (signals[0]) { + return signals[0] + } + const ac = new AbortController() + return ac.signal } } diff --git a/test/parallel/test-stream-pipe-deadlock.js b/test/parallel/test-stream-pipe-deadlock.js new file mode 100644 index 000000000..e5414f9f0 --- /dev/null +++ b/test/parallel/test-stream-pipe-deadlock.js @@ -0,0 +1,42 @@ +'use strict' + +const tap = require('tap') +const silentConsole = { + log() {}, + error() {} +} +const common = require('../common') +const { Readable, Writable } = require('../../lib/ours/index') + +// https://github.com/nodejs/node/issues/48666 +;(async () => { + // Prepare src that is internally ended, with buffered data pending + const src = new Readable({ + read() {} + }) + src.push(Buffer.alloc(100)) + src.push(null) + src.pause() + + // Give it time to settle + await new Promise((resolve) => setImmediate(resolve)) + const dst = new Writable({ + highWaterMark: 1000, + write(buf, enc, cb) { + process.nextTick(cb) + } + }) + dst.write(Buffer.alloc(1000)) // Fill write buffer + dst.on('finish', common.mustCall()) + src.pipe(dst) +})().then(common.mustCall()) + +/* replacement start */ +process.on('beforeExit', (code) => { + if (code === 0) { + tap.pass('test succeeded') + } else { + tap.fail(`test failed - exited code ${code}`) + } +}) +/* replacement end */ diff --git a/test/parallel/test-stream-readable-dispose.js b/test/parallel/test-stream-readable-dispose.js new file mode 100644 index 000000000..3ba69074b --- /dev/null +++ b/test/parallel/test-stream-readable-dispose.js @@ -0,0 +1,40 @@ +'use strict' + +const tap = require('tap') +const silentConsole = { + log() {}, + error() {} +} +const common = require('../common') +const { Readable } = require('../../lib/ours/index') +const assert = require('assert') +{ + const read = new Readable({ + read() {} + }) + read.resume() + read.on('end', common.mustNotCall('no end event')) + read.on('close', common.mustCall()) + read.on( + 'error', + common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError') + }) + ) + read[require('../../lib/ours/primordials').SymbolAsyncDispose]().then( + common.mustCall(() => { + assert.strictEqual(read.errored.name, 'AbortError') + assert.strictEqual(read.destroyed, true) + }) + ) +} + +/* replacement start */ +process.on('beforeExit', (code) => { + if (code === 0) { + tap.pass('test succeeded') + } else { + tap.fail(`test failed - exited code ${code}`) + } +}) +/* replacement end */ diff --git a/test/parallel/test-stream-set-default-hwm.js b/test/parallel/test-stream-set-default-hwm.js new file mode 100644 index 000000000..de954cbd9 --- /dev/null +++ b/test/parallel/test-stream-set-default-hwm.js @@ -0,0 +1,45 @@ +'use strict' + +const tap = require('tap') +const silentConsole = { + log() {}, + error() {} +} +require('../common') +const assert = require('node:assert') +const { + setDefaultHighWaterMark, + getDefaultHighWaterMark, + Writable, + Readable, + Transform +} = require('../../lib/ours/index') +assert.notStrictEqual(getDefaultHighWaterMark(false), 32 * 1000) +setDefaultHighWaterMark(false, 32 * 1000) +assert.strictEqual(getDefaultHighWaterMark(false), 32 * 1000) +assert.notStrictEqual(getDefaultHighWaterMark(true), 32) +setDefaultHighWaterMark(true, 32) +assert.strictEqual(getDefaultHighWaterMark(true), 32) +const w = new Writable({ + write() {} +}) +assert.strictEqual(w.writableHighWaterMark, 32 * 1000) +const r = new Readable({ + read() {} +}) +assert.strictEqual(r.readableHighWaterMark, 32 * 1000) +const t = new Transform({ + transform() {} +}) +assert.strictEqual(t.writableHighWaterMark, 32 * 1000) +assert.strictEqual(t.readableHighWaterMark, 32 * 1000) + +/* replacement start */ +process.on('beforeExit', (code) => { + if (code === 0) { + tap.pass('test succeeded') + } else { + tap.fail(`test failed - exited code ${code}`) + } +}) +/* replacement end */