diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index cdd5bcb791f451..4b4a7d95338652 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -50,13 +50,30 @@ function destroyer(stream, reading, writing, final, callback) { return callback(); } - if (!err && reading && !writing && stream.writable) { - return callback(); - } + if (!err) { + const wState = stream._writableState; + + const writableEnded = stream.writableEnded || + (wState && wState.ended); + const writableFinished = stream.writableFinished || + (wState && wState.finished); + + const willFinish = stream.writable || + (writableEnded && !writableFinished); + const willEnd = stream.readable; - if (err || !final || !stream.readable) { - destroyImpl.destroyer(stream, err); + // First + if (reading && !writing && willFinish) { + return callback(); + } + + // Last + if (!reading && writing && willEnd) { + return callback(); + } } + + destroyImpl.destroyer(stream, err); callback(err); }); @@ -81,7 +98,9 @@ function destroyer(stream, reading, writing, final, callback) { .once('end', _destroy) .once('error', _destroy); } else { - _destroy(err); + // Do an extra tick so that 'finish' has a chance to be emitted if + // first stream is Duplex. + process.nextTick(_destroy, err); } }); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index b273fddfa3b613..6d9e2be5299422 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -13,6 +13,7 @@ const { const assert = require('assert'); const http = require('http'); const { promisify } = require('util'); +const net = require('net'); { let finished = false; @@ -1118,3 +1119,51 @@ const { promisify } = require('util'); assert.strictEqual(closed, true); })); } + +{ + const server = net.createServer(common.mustCall((socket) => { + // echo server + pipeline(socket, socket, common.mustCall()); + // 13 force destroys the socket before it has a chance to emit finish + socket.on('finish', common.mustCall(() => { + server.close(); + })); + })).listen(0, common.mustCall(() => { + const socket = net.connect(server.address().port); + socket.end(); + })); +} + +{ + const d = new Duplex({ + autoDestroy: false, + write: common.mustCall((data, enc, cb) => { + d.push(data); + cb(); + }), + read: common.mustCall(() => { + d.push(null); + }), + final: common.mustCall((cb) => { + setTimeout(() => { + assert.strictEqual(d.destroyed, false); + cb(); + }, 1000); + }), + // `destroy()` won't be invoked by pipeline since + // the writable side has not completed when + // the pipeline has completed. + destroy: common.mustNotCall() + }); + + const sink = new Writable({ + write: common.mustCall((data, enc, cb) => { + cb(); + }) + }); + + pipeline(d, sink, common.mustCall()); + + d.write('test'); + d.end(); +}