Skip to content

Commit

Permalink
stream: pipeline should only destroy un-finished streams
Browse files Browse the repository at this point in the history
This PR logically reverts nodejs#31940 which
has caused lots of unnecessary breakage in the ecosystem.

This PR also aligns better with the actual documented behavior:

`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
  * `Readable` streams which have emitted `'end'` or `'close'`.
  * `Writable` streams which have emitted `'finish'` or `'close'`.

The behavior introduced in nodejs#31940
was much more aggressive in terms of destroying streams. This was
good for avoiding potential resources leaks however breaks some
common assumputions in legacy streams.

Furthermore, it makes the code simpler and removes some hacks.

Fixes: nodejs#32954
Fixes: nodejs#32955

PR-URL: nodejs#32968
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Mathias Buus <mathiasbuus@gmail.com>
Backport-PR-URL: nodejs#32980
  • Loading branch information
ronag committed Apr 23, 2020
1 parent 947ddec commit 9e7f255
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 44 deletions.
51 changes: 11 additions & 40 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,52 +25,23 @@ let EE;
let PassThrough;
let createReadableStreamAsyncIterator;

function isIncoming(stream) {
return (
stream.socket &&
typeof stream.complete === 'boolean' &&
ArrayIsArray(stream.rawTrailers) &&
ArrayIsArray(stream.rawHeaders)
);
}

function isOutgoing(stream) {
return (
stream.socket &&
typeof stream.setHeader === 'function'
);
}

function destroyer(stream, reading, writing, final, callback) {
function destroyer(stream, reading, writing, callback) {
callback = once(callback);
let destroyed = false;

let finished = false;
stream.on('close', () => {
finished = true;
});

if (eos === undefined) eos = require('internal/streams/end-of-stream');
eos(stream, { readable: reading, writable: writing }, (err) => {
if (destroyed) return;
destroyed = true;

if (!err && (isIncoming(stream) || isOutgoing(stream))) {
// http/1 request objects have a coupling to their response and should
// not be prematurely destroyed. Assume they will handle their own
// lifecycle.
return callback();
}

if (!err && reading && !writing && stream.writable) {
return callback();
}

if (err || !final || !stream.readable) {
destroyImpl.destroyer(stream, err);
}

finished = !err;
callback(err);
});

return (err) => {
if (destroyed) return;
destroyed = true;
if (finished) return;
finished = true;
destroyImpl.destroyer(stream, err);
callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
Expand Down Expand Up @@ -192,7 +163,7 @@ function pipeline(...streams) {

if (isStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, !reading, finish));
destroys.push(destroyer(stream, reading, writing, finish));
}

if (i === 0) {
Expand Down Expand Up @@ -250,7 +221,7 @@ function pipeline(...streams) {
ret = pt;

finishCount++;
destroys.push(destroyer(ret, false, true, true, finish));
destroys.push(destroyer(ret, false, true, finish));
}
} else if (isStream(stream)) {
if (isReadable(ret)) {
Expand Down
142 changes: 138 additions & 4 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ const {
Readable,
Transform,
pipeline,
PassThrough
PassThrough,
Duplex
} = require('stream');
const assert = require('assert');
const http = require('http');
const { promisify } = require('util');
const net = require('net');

{
let finished = false;
Expand Down Expand Up @@ -917,7 +919,7 @@ const { promisify } = require('util');
const src = new PassThrough({ autoDestroy: false });
const dst = new PassThrough({ autoDestroy: false });
pipeline(src, dst, common.mustCall(() => {
assert.strictEqual(src.destroyed, true);
assert.strictEqual(src.destroyed, false);
assert.strictEqual(dst.destroyed, false);
}));
src.end();
Expand Down Expand Up @@ -959,8 +961,8 @@ const { promisify } = require('util');
}

{
const src = new PassThrough();
const dst = new PassThrough();
const src = new PassThrough({ autoDestroy: true });
const dst = new PassThrough({ autoDestroy: true });
dst.readable = false;
pipeline(src, dst, common.mustCall((err) => {
assert(!err);
Expand Down Expand Up @@ -1061,3 +1063,135 @@ const { promisify } = require('util');
assert.ifError(err);
}));
}

{
let closed = false;
const src = new Readable({
read() {},
destroy(err, cb) {
process.nextTick(cb);
}
});
const dst = new Writable({
write(chunk, encoding, callback) {
callback();
}
});
src.on('close', () => {
closed = true;
});
src.push(null);
pipeline(src, dst, common.mustCall((err) => {
assert.strictEqual(closed, true);
}));
}

{
let closed = false;
const src = new Readable({
read() {},
destroy(err, cb) {
process.nextTick(cb);
}
});
const dst = new Duplex({});
src.on('close', common.mustCall(() => {
closed = true;
}));
src.push(null);
pipeline(src, dst, common.mustCall((err) => {
assert.strictEqual(closed, true);
}));
}

{
const server = net.createServer(common.mustCall((socket) => {
// echo server
pipeline(socket, socket, common.mustCall());
// 13 force destroys the socket before it has a chance to emit finish
socket.on('finish', common.mustCall(() => {
server.close();
}));
})).listen(0, common.mustCall(() => {
const socket = net.connect(server.address().port);
socket.end();
}));
}

{
const d = new Duplex({
autoDestroy: false,
write: common.mustCall((data, enc, cb) => {
d.push(data);
cb();
}),
read: common.mustCall(() => {
d.push(null);
}),
final: common.mustCall((cb) => {
setTimeout(() => {
assert.strictEqual(d.destroyed, false);
cb();
}, 1000);
}),
destroy: common.mustNotCall()
});

const sink = new Writable({
write: common.mustCall((data, enc, cb) => {
cb();
})
});

pipeline(d, sink, common.mustCall());

d.write('test');
d.end();
}

{
const server = net.createServer(common.mustCall((socket) => {
// echo server
pipeline(socket, socket, common.mustCall());
socket.on('finish', common.mustCall(() => {
server.close();
}));
})).listen(0, common.mustCall(() => {
const socket = net.connect(server.address().port);
socket.end();
}));
}

{
const d = new Duplex({
autoDestroy: false,
write: common.mustCall((data, enc, cb) => {
d.push(data);
cb();
}),
read: common.mustCall(() => {
d.push(null);
}),
final: common.mustCall((cb) => {
setTimeout(() => {
assert.strictEqual(d.destroyed, false);
cb();
}, 1000);
}),
// `destroy()` won't be invoked by pipeline since
// the writable side has not completed when
// the pipeline has completed.
destroy: common.mustNotCall()
});

const sink = new Writable({
write: common.mustCall((data, enc, cb) => {
cb();
})
});

pipeline(d, sink, common.mustCall());

d.write('test');
d.end();
}

0 comments on commit 9e7f255

Please sign in to comment.