From 44246251ab003b1b3d15ce0b3e637893b7a62721 Mon Sep 17 00:00:00 2001 From: Richard Lau Date: Fri, 27 Sep 2019 20:32:40 -0400 Subject: [PATCH] Revert "stream: invoke callback before emitting error always" This reverts commit 3de5eae6dbe503485b95bdeb8bddbd67e4613d59. --- doc/api/stream.md | 3 +- lib/_stream_writable.js | 37 +++++------- lib/internal/streams/destroy.js | 13 +---- test/parallel/test-http2-reset-flood.js | 5 +- test/parallel/test-stream-writable-destroy.js | 14 ----- .../test-stream-writable-write-cb-error.js | 58 ------------------- .../test-wrap-js-stream-exceptions.js | 6 +- test/parallel/test-zlib-write-after-close.js | 14 +++-- 8 files changed, 28 insertions(+), 122 deletions(-) delete mode 100644 test/parallel/test-stream-writable-write-cb-error.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 48b478a58eaa58..d4dbe54dbc2b52 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -571,8 +571,7 @@ The `writable.write()` method writes some data to the stream, and calls the supplied `callback` once the data has been fully handled. If an error occurs, the `callback` *may or may not* be called with the error as its first argument. To reliably detect write errors, add a listener for the -`'error'` event. If `callback` is called with an error, it will be called -before the `'error'` event is emitted. +`'error'` event. The return value is `true` if the internal buffer is less than the `highWaterMark` configured when the stream was created after admitting `chunk`. diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 9b75b672cbd843..c6d895ff5df10b 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -158,11 +158,6 @@ function WritableState(options, stream, isDuplex) { // Should .destroy() be called after 'finish' (and potentially 'end') this.autoDestroy = !!(options && options.autoDestroy); - // Indicates whether the stream has errored. When true all write() calls - // should return false. This is needed since when autoDestroy - // is disabled we need a way to tell whether the stream has failed. - this.errored = false; - // Count buffered requests this.bufferedRequestCount = 0; @@ -406,7 +401,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { if (!ret) state.needDrain = true; - if (state.writing || state.corked || state.errored) { + if (state.writing || state.corked) { var last = state.lastBufferedRequest; state.lastBufferedRequest = { chunk, @@ -425,9 +420,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { doWrite(stream, state, false, len, chunk, encoding, cb); } - // Return false if errored or destroyed in order to break - // any synchronous while(stream.write(data)) loops. - return ret && !state.errored && !state.destroyed; + return ret; } function doWrite(stream, state, writev, len, chunk, encoding, cb) { @@ -444,11 +437,18 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.sync = false; } -function onwriteError(stream, state, er, cb) { +function onwriteError(stream, state, sync, er, cb) { --state.pendingcb; - cb(er); - // This can emit error, but error must always follow cb. + if (sync) { + // Defer the callback if we are being called synchronously + // to avoid piling up things on the stack + process.nextTick(cb, er); + } else { + // The caller expect this to happen before if + // it is async + cb(er); + } errorOrDestroy(stream, er); } @@ -465,14 +465,9 @@ function onwrite(stream, er) { state.length -= state.writelen; state.writelen = 0; - if (er) { - state.errored = true; - if (sync) { - process.nextTick(onwriteError, stream, state, er, cb); - } else { - onwriteError(stream, state, er, cb); - } - } else { + if (er) + onwriteError(stream, state, sync, er, cb); + else { // Check if we're actually ready to finish, but don't emit yet var finished = needFinish(state) || stream.destroyed; @@ -627,7 +622,7 @@ Object.defineProperty(Writable.prototype, 'writableLength', { function needFinish(state) { return (state.ending && state.length === 0 && - !state.errored && + !state.errorEmitted && state.bufferedRequest === null && !state.finished && !state.writing); diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 8708ca022c7f8d..27985482cee70b 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -27,10 +27,6 @@ function destroy(err, cb) { const r = this._readableState; const w = this._writableState; - if (w && err) { - w.errored = true; - } - if ((w && w.destroyed) || (r && r.destroyed)) { if (cb) { cb(err); @@ -54,12 +50,10 @@ function destroy(err, cb) { this._destroy(err || null, (err) => { const emitClose = (w && w.emitClose) || (r && r.emitClose); if (cb) { - // Invoke callback before scheduling emitClose so that callback - // can schedule before. - cb(err); if (emitClose) { process.nextTick(emitCloseNT, this); } + cb(err); } else if (needError(this, err)) { process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err); } else if (emitClose) { @@ -97,7 +91,6 @@ function undestroy() { if (w) { w.destroyed = false; - w.errored = false; w.ended = false; w.ending = false; w.finalCalled = false; @@ -117,10 +110,6 @@ function errorOrDestroy(stream, err) { const r = stream._readableState; const w = stream._writableState; - if (w & err) { - w.errored = true; - } - if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err); else if (needError(stream, err)) diff --git a/test/parallel/test-http2-reset-flood.js b/test/parallel/test-http2-reset-flood.js index 9977bfd1a3e669..a6553401fbb6e7 100644 --- a/test/parallel/test-http2-reset-flood.js +++ b/test/parallel/test-http2-reset-flood.js @@ -67,10 +67,7 @@ const worker = new Worker(__filename).on('message', common.mustCall((port) => { h2header.writeIntBE(1, 0, 3); // Length: 1 h2header.writeIntBE(i, 5, 4); // Stream ID // 0x88 = :status: 200 - if (!conn.write(Buffer.concat([h2header, Buffer.from([0x88])]))) { - process.nextTick(writeRequests); - break; - } + conn.write(Buffer.concat([h2header, Buffer.from([0x88])])); } } diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index c4a96788ab24dd..ac107ecbb7034b 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -16,20 +16,6 @@ const assert = require('assert'); assert.strictEqual(write.destroyed, true); } -{ - const write = new Writable({ - write(chunk, enc, cb) { - this.destroy(new Error('asd')); - cb(); - } - }); - - write.on('error', common.mustCall()); - write.on('finish', common.mustNotCall()); - write.end('asd'); - assert.strictEqual(write.destroyed, true); -} - { const write = new Writable({ write(chunk, enc, cb) { cb(); } diff --git a/test/parallel/test-stream-writable-write-cb-error.js b/test/parallel/test-stream-writable-write-cb-error.js deleted file mode 100644 index 72db1b7e3ffe70..00000000000000 --- a/test/parallel/test-stream-writable-write-cb-error.js +++ /dev/null @@ -1,58 +0,0 @@ -'use strict'; -const common = require('../common'); -const { Writable } = require('stream'); -const assert = require('assert'); - -// Ensure callback is always invoked before -// error is emitted. Regardless if error was -// sync or async. - -{ - let callbackCalled = false; - // Sync Error - const writable = new Writable({ - write: common.mustCall((buf, enc, cb) => { - cb(new Error()); - }) - }); - writable.on('error', common.mustCall(() => { - assert.strictEqual(callbackCalled, true); - })); - writable.write('hi', common.mustCall(() => { - callbackCalled = true; - })); -} - -{ - let callbackCalled = false; - // Async Error - const writable = new Writable({ - write: common.mustCall((buf, enc, cb) => { - process.nextTick(cb, new Error()); - }) - }); - writable.on('error', common.mustCall(() => { - assert.strictEqual(callbackCalled, true); - })); - writable.write('hi', common.mustCall(() => { - callbackCalled = true; - })); -} - -{ - // Sync Error - const writable = new Writable({ - write: common.mustCall((buf, enc, cb) => { - cb(new Error()); - }) - }); - - writable.on('error', common.mustCall()); - - let cnt = 0; - // Ensure we don't live lock on sync error - while (writable.write('a')) - cnt++; - - assert.strictEqual(cnt, 0); -} diff --git a/test/parallel/test-wrap-js-stream-exceptions.js b/test/parallel/test-wrap-js-stream-exceptions.js index 2cc592a760c5c4..eeab26f525ae50 100644 --- a/test/parallel/test-wrap-js-stream-exceptions.js +++ b/test/parallel/test-wrap-js-stream-exceptions.js @@ -16,8 +16,4 @@ const socket = new JSStreamWrap(new Duplex({ }) })); -socket.end('foo'); -socket.on('error', common.expectsError({ - type: Error, - message: 'write EPROTO' -})); +assert.throws(() => socket.end('foo'), /Error: write EPROTO/); diff --git a/test/parallel/test-zlib-write-after-close.js b/test/parallel/test-zlib-write-after-close.js index 24d1e9b9901a94..160971b16bc30c 100644 --- a/test/parallel/test-zlib-write-after-close.js +++ b/test/parallel/test-zlib-write-after-close.js @@ -26,10 +26,12 @@ const zlib = require('zlib'); zlib.gzip('hello', common.mustCall(function(err, out) { const unzip = zlib.createGunzip(); unzip.close(common.mustCall()); - - unzip.write(out); - unzip.on('error', common.expectsError({ - code: 'ERR_STREAM_DESTROYED', - type: Error - })); + common.expectsError( + () => unzip.write(out), + { + code: 'ERR_STREAM_DESTROYED', + type: Error, + message: 'Cannot call write after a stream was destroyed' + } + ); }));