Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: fix multiple destroy() calls #29197

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ This is a destructive and immediate way to destroy a stream. Previous calls to
`write()` may not have drained, and may trigger an `ERR_STREAM_DESTROYED` error.
Use `end()` instead of destroy if data should flush before close, or wait for
the `'drain'` event before destroying the stream.

Once `destroy()` has been called any further calls will be a noop and no
further errors except from `_destroy` may be emitted as `'error'`.

Implementors should not override this method,
but instead implement [`writable._destroy()`][writable-_destroy].

Expand Down Expand Up @@ -953,6 +957,10 @@ Destroy the stream. Optionally emit an `'error'` event, and emit a `'close'`
event (unless `emitClose` is set to `false`). After this call, the readable
stream will release any internal resources and subsequent calls to `push()`
will be ignored.

Once `destroy()` has been called any further calls will be a noop and no
further errors except from `_destroy` may be emitted as `'error'`.

Implementors should not override this method, but instead implement
[`readable._destroy()`][readable-_destroy].

Expand Down Expand Up @@ -1484,6 +1492,9 @@ Implementors should not override this method, but instead implement
The default implementation of `_destroy()` for `Transform` also emit `'close'`
unless `emitClose` is set in false.

Once `destroy()` has been called any further calls will be a noop and no
further errors except from `_destroy` may be emitted as `'error'`.

### `stream.finished(stream[, options], callback)`
<!-- YAML
added: v10.0.0
Expand Down
2 changes: 1 addition & 1 deletion lib/_tls_wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -1611,7 +1611,7 @@ exports.connect = function connect(...args) {
tlssock._start();

tlssock.on('secure', onConnectSecure);
tlssock.once('end', onConnectEnd);
tlssock.prependListener('end', onConnectEnd);
ronag marked this conversation as resolved.
Show resolved Hide resolved

return tlssock;
};
2 changes: 1 addition & 1 deletion lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ WriteStream.prototype._writev = function(data, cb) {

if (er) {
if (this.autoClose) {
this.destroy();
this.destroy(er);
}
return cb(er);
}
Expand Down
33 changes: 17 additions & 16 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ function destroy(err, cb) {
const r = this._readableState;
const w = this._writableState;

if ((w && w.destroyed) || (r && r.destroyed)) {
if (typeof cb === 'function') {
// TODO(ronag): Invoke with `'close'`/`'error'`.
ronag marked this conversation as resolved.
Show resolved Hide resolved
cb();
}

return this;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section was modified and moved above errored = true

if (err) {
if (w) {
w.errored = true;
Expand All @@ -16,16 +25,6 @@ function destroy(err, cb) {
}
}

if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) {
cb(err);
} else if (err) {
process.nextTick(emitErrorNT, this, err);
}

return this;
}

// We set destroyed to true before firing error callbacks in order
// to make it re-entrance safe in case destroy() is called within callbacks

Expand Down Expand Up @@ -53,13 +52,11 @@ function destroy(err, cb) {
r.closed = true;
}

if (cb) {
// Invoke callback before scheduling emitClose so that callback
// can schedule before.
if (typeof cb === 'function') {
cb(err);
// Don't emit 'error' if passed a callback.
process.nextTick(emitCloseNT, this);
Copy link
Member Author

@ronag ronag Dec 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always need to emit 'error'. Before the callback would call destroy(er) in a roundabout way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cb route should be a purely internal route through an internal undocumented api

} else if (err) {
}

if (err) {
process.nextTick(emitErrorCloseNT, this, err);
} else {
process.nextTick(emitCloseNT, this);
Expand Down Expand Up @@ -138,6 +135,10 @@ function errorOrDestroy(stream, err, sync) {
const r = stream._readableState;
const w = stream._writableState;

if ((w && w.destroyed) || (r && r.destroyed)) {
return this;
}

if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err);
else if (err) {
Expand Down
3 changes: 1 addition & 2 deletions test/parallel/test-file-write-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ file

callbacks.close++;
console.error('write after end should not be allowed');
file.write('should not work anymore');
file.on('error', common.expectsError({
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is after 'close', thus no 'error'

file.write('should not work anymore', common.expectsError({
code: 'ERR_STREAM_WRITE_AFTER_END',
name: 'Error',
message: 'write after end'
Expand Down
13 changes: 4 additions & 9 deletions test/parallel/test-file-write-stream2.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const filepath = path.join(tmpdir.path, 'write.txt');

const EXPECTED = '012345678910';

const cb_expected = 'write open drain write drain close error ';
const cb_expected = 'write open drain write drain close ';
let cb_occurred = '';

let countDrains = 0;
Expand Down Expand Up @@ -92,16 +92,11 @@ file.on('drain', function() {
file.on('close', function() {
cb_occurred += 'close ';
assert.strictEqual(file.bytesWritten, EXPECTED.length * 2);
file.write('should not work anymore');
file.write('should not work anymore', (err) => {
assert.ok(err.message.includes('write after end'));
});
});


file.on('error', function(err) {
cb_occurred += 'error ';
assert.ok(err.message.includes('write after end'));
});


for (let i = 0; i < 11; i++) {
const ret = file.write(String(i));
console.error(`${i} ${ret}`);
Expand Down
8 changes: 3 additions & 5 deletions test/parallel/test-http2-server-stream-session-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ server.on('stream', common.mustCall((stream) => {
name: 'Error'
}
);
stream.on('error', common.expectsError({
name: 'Error',
code: 'ERR_STREAM_WRITE_AFTER_END',
message: 'write after end'
}));
// When session is detroyed all streams are destroyed and no further
// error should be emitted.
stream.on('error', common.mustNotCall());
assert.strictEqual(stream.write('data', common.expectsError({
name: 'Error',
code: 'ERR_STREAM_WRITE_AFTER_END',
Expand Down
6 changes: 1 addition & 5 deletions test/parallel/test-net-socket-destroy-send.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ server.listen(0, common.mustCall(function() {
conn.on('connect', common.mustCall(function() {
// Test destroy returns this, even on multiple calls when it short-circuits.
assert.strictEqual(conn, conn.destroy().destroy());
conn.on('error', common.expectsError({
code: 'ERR_STREAM_DESTROYED',
message: 'Cannot call write after a stream was destroyed',
name: 'Error'
}));
conn.on('error', common.mustNotCall());

conn.write(Buffer.from('kaboom'), common.expectsError({
code: 'ERR_STREAM_DESTROYED',
Expand Down
5 changes: 2 additions & 3 deletions test/parallel/test-stream-catch-rejections.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ const assert = require('assert');
const r = new stream.Readable({
captureRejections: true,
read() {
this.push('hello');
this.push('world');
this.push(null);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push(null) will cause destroy() to be called.

}
});
r.push('hello');
r.push('world');

const err = new Error('kaboom');

Expand Down
4 changes: 1 addition & 3 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,7 @@ const { promisify } = require('util');
res,
stream,
common.mustCall((err) => {
assert.ok(err);
// TODO(ronag):
// assert.strictEqual(err.message, 'oh no');
assert.strictEqual(err.message, 'oh no');
ronag marked this conversation as resolved.
Show resolved Hide resolved
server.close();
})
);
Expand Down
12 changes: 6 additions & 6 deletions test/parallel/test-stream-readable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ const assert = require('assert');

let ticked = false;
read.on('close', common.mustCall(() => {
assert.strictEqual(read._readableState.errorEmitted, false);
assert.strictEqual(read._readableState.errorEmitted, true);
assert.strictEqual(ticked, true);
}));
// 'error' should not be emitted since a callback is passed to
// destroy(err, callback);
read.on('error', common.mustNotCall());
read.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));

assert.strictEqual(read._readableState.errored, false);
assert.strictEqual(read._readableState.errorEmitted, false);
Expand Down Expand Up @@ -217,7 +217,7 @@ const assert = require('assert');
}));
readable.on('error', common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(err.message, 'kaboom 2');
assert.strictEqual(err.message, 'kaboom 1');
assert.strictEqual(readable._readableState.errorEmitted, true);
}));

Expand All @@ -230,7 +230,7 @@ const assert = require('assert');
// the `_destroy()` callback is called.
readable.destroy(new Error('kaboom 2'));
assert.strictEqual(readable._readableState.errorEmitted, false);
assert.strictEqual(readable._readableState.errored, true);
assert.strictEqual(readable._readableState.errored, false);

ticked = true;
}
Expand Down
22 changes: 8 additions & 14 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,14 @@ const assert = require('assert');

let ticked = false;
writable.on('close', common.mustCall(() => {
writable.on('error', common.mustNotCall());
writable.destroy(new Error('hello'));
assert.strictEqual(ticked, true);
assert.strictEqual(writable._writableState.errorEmitted, true);
}));
writable.on('error', common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(err.message, 'kaboom 2');
assert.strictEqual(err.message, 'kaboom 1');
assert.strictEqual(writable._writableState.errorEmitted, true);
}));

Expand All @@ -205,7 +207,7 @@ const assert = require('assert');
// the `_destroy()` callback is called.
writable.destroy(new Error('kaboom 2'));
assert.strictEqual(writable._writableState.errorEmitted, false);
assert.strictEqual(writable._writableState.errored, true);
assert.strictEqual(writable._writableState.errored, false);

ticked = true;
}
Expand Down Expand Up @@ -246,8 +248,8 @@ const assert = require('assert');

const expected = new Error('kaboom');

write.destroy(expected, common.mustCall(function(err) {
assert.strictEqual(err, expected);
write.destroy(expected, common.mustCall((err) => {
assert.strictEqual(err, undefined);
}));
}

Expand All @@ -271,11 +273,7 @@ const assert = require('assert');
const write = new Writable();

write.destroy();
write.on('error', common.expectsError({
name: 'Error',
code: 'ERR_STREAM_DESTROYED',
message: 'Cannot call write after a stream was destroyed'
}));
write.on('error', common.mustNotCall());
write.write('asd', common.expectsError({
name: 'Error',
code: 'ERR_STREAM_DESTROYED',
Expand All @@ -288,11 +286,7 @@ const assert = require('assert');
write(chunk, enc, cb) { cb(); }
});

write.on('error', common.expectsError({
name: 'Error',
code: 'ERR_STREAM_DESTROYED',
message: 'Cannot call write after a stream was destroyed'
}));
write.on('error', common.mustNotCall());

write.cork();
write.write('asd', common.mustCall());
Expand Down
1 change: 0 additions & 1 deletion test/parallel/test-stream-writable-writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ const { Writable } = require('stream');
w.write('asd');
assert.strictEqual(w.writable, false);
w.on('error', common.mustCall());
w.destroy();
}

{
Expand Down
1 change: 0 additions & 1 deletion test/parallel/test-stream-writable-write-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ function test(autoDestroy) {
_write() {}
});
w.destroy();
expectError(w, 'asd', 'ERR_STREAM_DESTROYED');
}

{
Expand Down
5 changes: 5 additions & 0 deletions test/parallel/test-tls-wrap-econnreset-localaddress.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const server = net.createServer((c) => {
}).listen(common.mustCall(() => {
const port = server.address().port;

let errored = false;
tls.connect({
port: port,
localAddress: common.localhostIPv4
Expand All @@ -24,5 +25,9 @@ const server = net.createServer((c) => {
assert.strictEqual(e.port, port);
assert.strictEqual(e.localAddress, common.localhostIPv4);
server.close();
errored = true;
}))
.on('close', common.mustCall(() => {
assert.strictEqual(errored, true);
}));
}));
5 changes: 5 additions & 0 deletions test/parallel/test-tls-wrap-econnreset-pipe.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ if (process.argv[2] !== 'child') {
const server = net.createServer((c) => {
c.end();
}).listen(common.PIPE, common.mustCall(() => {
let errored = false;
tls.connect({ path: common.PIPE })
.once('error', common.mustCall((e) => {
assert.strictEqual(e.code, 'ECONNRESET');
Expand All @@ -39,5 +40,9 @@ const server = net.createServer((c) => {
assert.strictEqual(e.host, undefined);
assert.strictEqual(e.localAddress, undefined);
server.close();
errored = true;
}))
.on('close', common.mustCall(() => {
assert.strictEqual(errored, true);
}));
}));
5 changes: 5 additions & 0 deletions test/parallel/test-tls-wrap-econnreset-socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@ const server = net.createServer((c) => {

const socket = new net.Socket();

let errored = false;
tls.connect({ socket })
.once('error', common.mustCall((e) => {
assert.strictEqual(e.code, 'ECONNRESET');
assert.strictEqual(e.path, undefined);
assert.strictEqual(e.host, undefined);
assert.strictEqual(e.port, undefined);
assert.strictEqual(e.localAddress, undefined);
errored = true;
server.close();
}))
.on('close', common.mustCall(() => {
assert.strictEqual(errored, true);
}));

socket.connect(port);
Expand Down
5 changes: 5 additions & 0 deletions test/parallel/test-tls-wrap-econnreset.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const server = net.createServer((c) => {
}).listen(common.mustCall(() => {
const port = server.address().port;

let errored = false;
tls.connect(port, common.localhostIPv4)
.once('error', common.mustCall((e) => {
assert.strictEqual(e.code, 'ECONNRESET');
Expand All @@ -21,5 +22,9 @@ const server = net.createServer((c) => {
assert.strictEqual(e.port, port);
assert.strictEqual(e.localAddress, undefined);
server.close();
errored = true;
}))
.on('close', common.mustCall(() => {
assert.strictEqual(errored, true);
}));
}));
Loading