diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index e212881c4ac555..34a21ed914911d 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -315,12 +315,7 @@ Writable.prototype.uncork = function() { if (state.corked) { state.corked--; - - if (!state.writing && - !state.corked && - !state.bufferProcessing && - state.bufferedRequest) - clearBuffer(this, state); + clearBuffer(this, state); } }; @@ -468,15 +463,7 @@ function onwrite(stream, er) { if (er) onwriteError(stream, state, sync, er, cb); else { - // Check if we're actually ready to finish, but don't emit yet - var finished = needFinish(state) || stream.destroyed; - - if (!finished && - !state.corked && - !state.bufferProcessing && - state.bufferedRequest) { - clearBuffer(stream, state); - } + clearBuffer(stream, state); if (sync) { process.nextTick(afterWrite, stream, state, cb); @@ -500,6 +487,13 @@ function afterWrite(stream, state, cb) { // If there's something in the buffer waiting, then process it function clearBuffer(stream, state) { + if (state.destroyed || + state.writing || + state.corked || + state.bufferProcessing || + !state.bufferedRequest) + return; + state.bufferProcessing = true; var entry = state.bufferedRequest; @@ -620,7 +614,8 @@ function needFinish(state) { state.length === 0 && state.bufferedRequest === null && !state.finished && - !state.writing); + !state.writing && + !state.destroyed); } function callFinal(stream, state) { stream._final((err) => { diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 87d58a8be55f80..525eb5d2f2c37e 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -62,6 +62,8 @@ function ReadStream(path, options) { // For backwards compat do not emit close on destroy. options.emitClose = false; + options.autoDestroy = options.autoClose === undefined ? + true : options.autoClose; Readable.call(this, options); @@ -73,7 +75,7 @@ function ReadStream(path, options) { this.start = options.start; this.end = options.end; - this.autoClose = options.autoClose === undefined ? true : options.autoClose; + this.autoClose = options.autoDestroy; this.pos = undefined; this.bytesRead = 0; this.closed = false; @@ -100,12 +102,6 @@ function ReadStream(path, options) { if (typeof this.fd !== 'number') this.open(); - - this.on('end', function() { - if (this.autoClose) { - this.destroy(); - } - }); } Object.setPrototypeOf(ReadStream.prototype, Readable.prototype); Object.setPrototypeOf(ReadStream, Readable); @@ -238,6 +234,8 @@ function WriteStream(path, options) { // For backwards compat do not emit close on destroy. options.emitClose = false; + options.autoDestroy = options.autoClose === undefined ? + true : options.autoClose; Writable.call(this, options); @@ -248,7 +246,7 @@ function WriteStream(path, options) { this.mode = options.mode === undefined ? 0o666 : options.mode; this.start = options.start; - this.autoClose = options.autoClose === undefined ? true : !!options.autoClose; + this.autoClose = options.autoDestroy; this.pos = undefined; this.bytesWritten = 0; this.closed = false; @@ -268,14 +266,6 @@ function WriteStream(path, options) { Object.setPrototypeOf(WriteStream.prototype, Writable.prototype); Object.setPrototypeOf(WriteStream, Writable); -WriteStream.prototype._final = function(callback) { - if (this.autoClose) { - this.destroy(); - } - - callback(); -}; - WriteStream.prototype.open = function() { fs.open(this.path, this.flags, this.mode, (er, fd) => { if (er) { @@ -307,9 +297,6 @@ WriteStream.prototype._write = function(data, encoding, cb) { fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { if (er) { - if (this.autoClose) { - this.destroy(); - } return cb(er); } this.bytesWritten += bytes; @@ -342,7 +329,6 @@ WriteStream.prototype._writev = function(data, cb) { fs.writev(this.fd, chunks, this.pos, function(er, bytes) { if (er) { - self.destroy(); return cb(er); } self.bytesWritten += bytes; diff --git a/test/parallel/test-file-write-stream.js b/test/parallel/test-file-write-stream.js index d66657d690f1b2..3fb78a73a3af4b 100644 --- a/test/parallel/test-file-write-stream.js +++ b/test/parallel/test-file-write-stream.js @@ -65,17 +65,16 @@ file assert.strictEqual(file.bytesWritten, EXPECTED.length * 2); callbacks.close++; - common.expectsError( - () => { - console.error('write after end should not be allowed'); - file.write('should not work anymore'); - }, - { - code: 'ERR_STREAM_WRITE_AFTER_END', - type: Error, - message: 'write after end' - } - ); + file.on('error', common.expectsError({ + code: 'ERR_STREAM_WRITE_AFTER_END', + type: Error, + message: 'write after end' + })); + file.write('should not work anymore', common.expectsError({ + code: 'ERR_STREAM_WRITE_AFTER_END', + type: Error, + message: 'write after end' + })); fs.unlinkSync(fn); }); diff --git a/test/parallel/test-fs-write-stream-autoclose-option.js b/test/parallel/test-fs-write-stream-autoclose-option.js index e39f4d615ab7e0..81b1c344b56a7b 100644 --- a/test/parallel/test-fs-write-stream-autoclose-option.js +++ b/test/parallel/test-fs-write-stream-autoclose-option.js @@ -25,10 +25,11 @@ function next() { stream = fs.createWriteStream(null, { fd: stream.fd, start: 0 }); stream.write('Test2'); stream.end(); + stream.on('finish', common.mustCall(function() { assert.strictEqual(stream.closed, false); - assert.strictEqual(stream.fd, null); stream.on('close', common.mustCall(function() { + assert.strictEqual(stream.fd, null); assert.strictEqual(stream.closed, true); process.nextTick(next2); })); @@ -51,8 +52,8 @@ function next3() { stream.end(); stream.on('finish', common.mustCall(function() { assert.strictEqual(stream.closed, false); - assert.strictEqual(stream.fd, null); stream.on('close', common.mustCall(function() { + assert.strictEqual(stream.fd, null); assert.strictEqual(stream.closed, true); })); })); diff --git a/test/parallel/test-http2-compat-socket-set.js b/test/parallel/test-http2-compat-socket-set.js index 05beb09d548e91..65bbfb0e60c9c5 100644 --- a/test/parallel/test-http2-compat-socket-set.js +++ b/test/parallel/test-http2-compat-socket-set.js @@ -74,6 +74,7 @@ server.on('request', common.mustCall(function(request, response) { common.expectsError(() => request.socket.resume = noop, errMsg); request.stream.on('finish', common.mustCall(() => { + response.stream.destroy(); setImmediate(() => { request.socket.setTimeout = noop; assert.strictEqual(request.stream.setTimeout, noop); @@ -83,7 +84,7 @@ server.on('request', common.mustCall(function(request, response) { assert.strictEqual(request.stream._isProcessing, true); }); })); - response.stream.destroy(); + response.stream.end(); })); server.listen(0, common.mustCall(function() { diff --git a/test/parallel/test-stream-duplex-destroy.js b/test/parallel/test-stream-duplex-destroy.js index 3c38d2c364051c..906d9c63068460 100644 --- a/test/parallel/test-stream-duplex-destroy.js +++ b/test/parallel/test-stream-duplex-destroy.js @@ -125,7 +125,7 @@ const assert = require('assert'); duplex.removeListener('end', fail); duplex.removeListener('finish', fail); duplex.on('end', common.mustCall()); - duplex.on('finish', common.mustCall()); + duplex.on('finish', common.mustNotCall()); assert.strictEqual(duplex.destroyed, true); } diff --git a/test/parallel/test-stream-transform-destroy.js b/test/parallel/test-stream-transform-destroy.js index c594d9989ae4de..8750c65164e5e0 100644 --- a/test/parallel/test-stream-transform-destroy.js +++ b/test/parallel/test-stream-transform-destroy.js @@ -117,7 +117,7 @@ const assert = require('assert'); transform.removeListener('end', fail); transform.removeListener('finish', fail); transform.on('end', common.mustCall()); - transform.on('finish', common.mustCall()); + transform.on('finish', common.mustNotCall()); } { diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index a431d6d48d1c8e..191fa5dd8a38cd 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -111,7 +111,7 @@ const assert = require('assert'); write.destroy(); write.removeListener('finish', fail); - write.on('finish', common.mustCall()); + write.on('finish', common.mustNotCall()); assert.strictEqual(write.destroyed, true); } diff --git a/test/parallel/test-stream-write-destroy.js b/test/parallel/test-stream-write-destroy.js index 83b329a6a8a7b3..58096c5635c9cf 100644 --- a/test/parallel/test-stream-write-destroy.js +++ b/test/parallel/test-stream-write-destroy.js @@ -48,12 +48,10 @@ for (const withPendingData of [ false, true ]) { w.destroy(); assert.strictEqual(chunksWritten, 1); callbacks.shift()(); - assert.strictEqual(chunksWritten, 2); + assert.strictEqual(chunksWritten, useEnd && !withPendingData ? 1 : 2); assert.strictEqual(callbacks.length, 0); assert.strictEqual(drains, 1); - // When we used `.end()`, we see the 'finished' event if and only if - // we actually finished processing the write queue. - assert.strictEqual(finished, !withPendingData && useEnd); + assert.strictEqual(finished, false); } }