From eebec50f707524e0166f9aa2af754ca17bb9b5bb Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 26 Apr 2020 11:16:43 +0200 Subject: [PATCH 1/5] stream: implement WritableBase without buffering --- lib/_stream_writable.js | 529 +++++++++-------------------------- lib/internal/streams/base.js | 333 ++++++++++++++++++++++ node.gyp | 1 + 3 files changed, 470 insertions(+), 393 deletions(-) create mode 100644 lib/internal/streams/base.js diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 916e9b87d9c17a..a6fb8c568aea61 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -29,162 +29,115 @@ const { FunctionPrototype, ObjectDefineProperty, ObjectDefineProperties, - ObjectSetPrototypeOf, + ObjectGetOwnPropertyDescriptors, + ObjectCreate, + ObjectKeys, Symbol, SymbolHasInstance, } = primordials; module.exports = Writable; -Writable.WritableState = WritableState; -const EE = require('events'); const Stream = require('stream'); -const { Buffer } = require('buffer'); -const destroyImpl = require('internal/streams/destroy'); +const { WritableBase } = require('internal/streams/base'); + const { getHighWaterMark, getDefaultHighWaterMark } = require('internal/streams/state'); const { - ERR_INVALID_ARG_TYPE, ERR_METHOD_NOT_IMPLEMENTED, ERR_MULTIPLE_CALLBACK, - ERR_STREAM_CANNOT_PIPE, - ERR_STREAM_DESTROYED, - ERR_STREAM_ALREADY_FINISHED, - ERR_STREAM_NULL_VALUES, - ERR_STREAM_WRITE_AFTER_END, - ERR_UNKNOWN_ENCODING + ERR_STREAM_DESTROYED } = require('internal/errors').codes; -const { errorOrDestroy } = destroyImpl; +function nop() {} -ObjectSetPrototypeOf(Writable.prototype, Stream.prototype); -ObjectSetPrototypeOf(Writable, Stream); +const kFlush = Symbol('kFlush'); +class WritableState extends WritableBase.WritableState { + constructor(options, stream, isDuplex) { + super(options); -function nop() {} + // Duplex streams are both readable and writable, but share + // the same options object. + // However, some cases require setting options to different + // values for the readable and the writable sides of the duplex stream, + // e.g. options.readableObjectMode vs. options.writableObjectMode, etc. + if (typeof isDuplex !== 'boolean') + isDuplex = stream instanceof Stream.Duplex; + + if (isDuplex) + this.objectMode = this.objectMode || + !!(options && options.writableObjectMode); + + // The point at which write() starts returning false + // Note: 0 is a valid value, means that we always return false if + // the entire buffer is not flushed immediately on write(). + this.highWaterMark = options ? + getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex) : + getDefaultHighWaterMark(false); + + // if _final has been called. + this.finalCalled = false; + + // drain event flag. + this.needDrain = false; + + // Not an actual buffer we keep track of, but a measurement + // of how much we're waiting to get pushed to some underlying + // socket or file. + this.length = 0; + + // A flag to see when we're in the middle of a write. + this.writing = false; + + // When true all writes will be buffered until .uncork() call. + this.corked = 0; + + // A flag to be able to tell if the onwrite cb is called immediately, + // or on a later tick. We set this to true at first, because any + // actions that shouldn't happen until "later" should generally also + // not happen before the first write call. + this.sync = true; + + // A flag to know if we're processing previously buffered items, which + // may call the _write() callback in the same tick, so that we don't + // end up in an overlapped onwrite situation. + this.bufferProcessing = false; + + // The callback that's passed to _write(chunk, cb). + this.onwrite = onwrite.bind(undefined, stream); + + // The callback that the user supplies to write(chunk, encoding, cb). + this.writecb = null; + + // The amount that is being written when _write is called. + this.writelen = 0; -function WritableState(options, stream, isDuplex) { - // Duplex streams are both readable and writable, but share - // the same options object. - // However, some cases require setting options to different - // values for the readable and the writable sides of the duplex stream, - // e.g. options.readableObjectMode vs. options.writableObjectMode, etc. - if (typeof isDuplex !== 'boolean') - isDuplex = stream instanceof Stream.Duplex; - - // Object stream flag to indicate whether or not this stream - // contains buffers or objects. - this.objectMode = !!(options && options.objectMode); - - if (isDuplex) - this.objectMode = this.objectMode || - !!(options && options.writableObjectMode); - - // The point at which write() starts returning false - // Note: 0 is a valid value, means that we always return false if - // the entire buffer is not flushed immediately on write(). - this.highWaterMark = options ? - getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex) : - getDefaultHighWaterMark(false); - - // if _final has been called. - this.finalCalled = false; - - // drain event flag. - this.needDrain = false; - // At the start of calling end() - this.ending = false; - // When end() has been called, and returned. - this.ended = false; - // When 'finish' is emitted. - this.finished = false; - - // Has it been destroyed - this.destroyed = false; - - // Should we decode strings into buffers before passing to _write? - // this is here so that some node-core streams can optimize string - // handling at a lower level. - const noDecode = !!(options && options.decodeStrings === false); - this.decodeStrings = !noDecode; - - // Crypto is kind of old and crusty. Historically, its default string - // encoding is 'binary' so we have to make this configurable. - // Everything else in the universe uses 'utf8', though. - this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'; - - // Not an actual buffer we keep track of, but a measurement - // of how much we're waiting to get pushed to some underlying - // socket or file. - this.length = 0; - - // A flag to see when we're in the middle of a write. - this.writing = false; - - // When true all writes will be buffered until .uncork() call. - this.corked = 0; - - // A flag to be able to tell if the onwrite cb is called immediately, - // or on a later tick. We set this to true at first, because any - // actions that shouldn't happen until "later" should generally also - // not happen before the first write call. - this.sync = true; - - // A flag to know if we're processing previously buffered items, which - // may call the _write() callback in the same tick, so that we don't - // end up in an overlapped onwrite situation. - this.bufferProcessing = false; - - // The callback that's passed to _write(chunk, cb). - this.onwrite = onwrite.bind(undefined, stream); - - // The callback that the user supplies to write(chunk, encoding, cb). - this.writecb = null; - - // The amount that is being written when _write is called. - this.writelen = 0; - - // Storage for data passed to the afterWrite() callback in case of - // synchronous _write() completion. - this.afterWriteTickInfo = null; - - resetBuffer(this); - - // Number of pending user-supplied write callbacks - // this must be 0 before 'finish' can be emitted. - this.pendingcb = 0; - - // Stream is still being constructed and cannot be - // destroyed until construction finished or failed. - // Async construction is opt in, therefore we start as - // constructed. - this.constructed = true; - - // Emit prefinish if the only thing we're waiting for is _write cbs - // This is relevant for synchronous Transform streams. - this.prefinished = false; - - // True if the error was already emitted and should not be thrown again. - this.errorEmitted = false; - - // Should close be emitted on destroy. Defaults to true. - this.emitClose = !options || options.emitClose !== false; - - // Should .destroy() be called after 'finish' (and potentially 'end'). - this.autoDestroy = !options || options.autoDestroy !== false; - - // Indicates whether the stream has errored. When true all write() calls - // should return false. This is needed since when autoDestroy - // is disabled we need a way to tell whether the stream has failed. - this.errored = false; - - // Indicates whether the stream has finished destroying. - this.closed = false; - - // True if close has been emitted or would have been emitted - // depending on emitClose. - this.closeEmitted = false; + // Storage for data passed to the afterWrite() callback in case of + // synchronous _write() completion. + this.afterWriteTickInfo = null; + + resetBuffer(this); + + // Number of pending user-supplied write callbacks + // this must be 0 before 'finish' can be emitted. + this.pendingcb = 0; + + // Emit prefinish if the only thing we're waiting for is _write cbs + // This is relevant for synchronous Transform streams. + this.prefinished = false; + + this[kFlush] = null; + } + + getBuffer() { + return this.buffered.slice(this.bufferedIndex); + } + + get bufferedRequestCount() { + return this.buffered.length - this.bufferedIndex; + } } function resetBuffer(state) { @@ -194,16 +147,6 @@ function resetBuffer(state) { state.allNoop = true; } -WritableState.prototype.getBuffer = function getBuffer() { - return this.buffered.slice(this.bufferedIndex); -}; - -ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', { - get() { - return this.buffered.length - this.bufferedIndex; - } -}); - // Test _writableState for inheritance to account for Duplex streams, // whose prototype chain only points to Readable. let realHasInstance; @@ -241,8 +184,6 @@ function Writable(options) { if (!isDuplex && !realHasInstance.call(Writable, this)) return new Writable(options); - this._writableState = new WritableState(options, this, isDuplex); - if (options) { if (typeof options.write === 'function') this._write = options.write; @@ -260,73 +201,30 @@ function Writable(options) { this._construct = options.construct; } - Stream.call(this, options); + WritableBase.call(this, { + ...options, + start: (stream, state) => { + if (!state.writing) { + clearBuffer(stream, state); + } + finishMaybe(stream, state); + }, + write: writeOrBuffer, + flush: function(state, cb) { + // .end() fully uncorks. + if (state.corked) { + state.corked = 1; + this.uncork(); + } - destroyImpl.construct(this, () => { - const state = this._writableState; + state[kFlush] = cb; - if (!state.writing) { - clearBuffer(this, state); + finishMaybe(this, state); } - - finishMaybe(this, state); - }); + }, isDuplex, WritableState); } - -// Otherwise people can pipe Writable streams, which is just wrong. -Writable.prototype.pipe = function() { - errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); -}; - -Writable.prototype.write = function(chunk, encoding, cb) { - const state = this._writableState; - - if (typeof encoding === 'function') { - cb = encoding; - encoding = state.defaultEncoding; - } else { - if (!encoding) - encoding = state.defaultEncoding; - else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding)) - throw new ERR_UNKNOWN_ENCODING(encoding); - if (typeof cb !== 'function') - cb = nop; - } - - if (chunk === null) { - throw new ERR_STREAM_NULL_VALUES(); - } else if (!state.objectMode) { - if (typeof chunk === 'string') { - if (state.decodeStrings !== false) { - chunk = Buffer.from(chunk, encoding); - encoding = 'buffer'; - } - } else if (chunk instanceof Buffer) { - encoding = 'buffer'; - } else if (Stream._isUint8Array(chunk)) { - chunk = Stream._uint8ArrayToBuffer(chunk); - encoding = 'buffer'; - } else { - throw new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); - } - } - - let err; - if (state.ending) { - err = new ERR_STREAM_WRITE_AFTER_END(); - } else if (state.destroyed) { - err = new ERR_STREAM_DESTROYED('write'); - } - - if (err) { - process.nextTick(cb, err); - errorOrDestroy(this, err, true); - return false; - } - state.pendingcb++; - return writeOrBuffer(this, state, chunk, encoding, cb); -}; +Writable.WritableState = WritableState; +Writable.prototype = ObjectCreate(WritableBase.prototype); Writable.prototype.cork = function() { this._writableState.corked++; @@ -343,24 +241,20 @@ Writable.prototype.uncork = function() { } }; -Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { - // node::ParseEncoding() requires lower case. - if (typeof encoding === 'string') - encoding = encoding.toLowerCase(); - if (!Buffer.isEncoding(encoding)) - throw new ERR_UNKNOWN_ENCODING(encoding); - this._writableState.defaultEncoding = encoding; - return this; -}; - // If we're already writing something, then just put this // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. -function writeOrBuffer(stream, state, chunk, encoding, callback) { +function writeOrBuffer(state, chunk, encoding, callback) { + if (typeof callback !== 'function') + callback = nop; + + state.pendingcb++; + const len = state.objectMode ? 1 : chunk.length; state.length += len; + // TODO(ronag): Move errored handling to Base. if (state.writing || state.corked || state.errored || !state.constructed) { state.buffered.push({ chunk, encoding, callback }); if (state.allBuffers && encoding !== 'buffer') { @@ -374,7 +268,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { state.writecb = callback; state.writing = true; state.sync = true; - stream._write(chunk, encoding, state.onwrite); + this._write(chunk, encoding, state.onwrite); state.sync = false; } @@ -384,9 +278,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { if (!ret) state.needDrain = true; - // Return false if errored or destroyed in order to break - // any synchronous while(stream.write(data)) loops. - return ret && !state.errored && !state.destroyed; + return ret; } function doWrite(stream, state, writev, len, chunk, encoding, cb) { @@ -413,7 +305,7 @@ function onwriteError(stream, state, er, cb) { // writes. errorBuffer(state, new ERR_STREAM_DESTROYED('write')); // This can emit error, but error must always follow cb. - errorOrDestroy(stream, er); + WritableBase.errorOrDestroy(stream, er); } function onwrite(stream, er) { @@ -422,7 +314,7 @@ function onwrite(stream, er) { const cb = state.writecb; if (typeof cb !== 'function') { - errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); + WritableBase.errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); return; } @@ -432,6 +324,8 @@ function onwrite(stream, er) { state.writelen = 0; if (er) { + // TODO(ronag): Move errored handling to Base. + state.errored = true; // In case of duplex streams we need to notify the readable side of the @@ -574,54 +468,9 @@ Writable.prototype._write = function(chunk, encoding, cb) { Writable.prototype._writev = null; -Writable.prototype.end = function(chunk, encoding, cb) { - const state = this._writableState; - - if (typeof chunk === 'function') { - cb = chunk; - chunk = null; - encoding = null; - } else if (typeof encoding === 'function') { - cb = encoding; - encoding = null; - } - - if (chunk !== null && chunk !== undefined) - this.write(chunk, encoding); - - // .end() fully uncorks. - if (state.corked) { - state.corked = 1; - this.uncork(); - } - - // This is forgiving in terms of unnecessary calls to end() and can hide - // logic errors. However, usually such errors are harmless and causing a - // hard error can be disproportionately destructive. It is not always - // trivial for the user to determine whether end() needs to be called or not. - let err; - if (!state.errored && !state.ending) { - state.ending = true; - finishMaybe(this, state, true); - state.ended = true; - } else if (state.finished) { - err = new ERR_STREAM_ALREADY_FINISHED('end'); - } else if (state.destroyed) { - err = new ERR_STREAM_DESTROYED('end'); - } - - if (typeof cb === 'function') { - if (err || state.finished) - process.nextTick(cb, err); - else - onFinished(this, cb); - } - - return this; -}; - function needFinish(state) { - return (state.ending && + // TODO(ronag): Move errored and finished handling to Base. + return (state[kFlush] && state.constructed && state.length === 0 && !state.errored && @@ -631,23 +480,19 @@ function needFinish(state) { } function callFinal(stream, state) { - state.sync = true; state.pendingcb++; stream._final((err) => { state.pendingcb--; if (err) { - errorOrDestroy(stream, err, state.sync); + state[kFlush](err); } else if (needFinish(state)) { state.prefinished = true; stream.emit('prefinish'); - // Backwards compat. Don't check state.sync here. - // Some streams assume 'finish' will be emitted + // Backwards compat. Some streams assume 'finish' will be emitted // asynchronously relative to _final callback. - state.pendingcb++; - process.nextTick(finish, stream, state); + process.nextTick(state[kFlush]); } }); - state.sync = false; } function prefinish(stream, state) { @@ -662,108 +507,22 @@ function prefinish(stream, state) { } } -function finishMaybe(stream, state, sync) { +function finishMaybe(stream, state) { if (needFinish(state)) { prefinish(stream, state); - if (state.pendingcb === 0 && needFinish(state)) { - state.pendingcb++; - if (sync) { - process.nextTick(finish, stream, state); - } else { - finish(stream, state); - } + if (state.pendingcb === 0 && needFinish(state) && !state.finalCalled) { + state[kFlush](); } } } -function finish(stream, state) { - state.pendingcb--; - // TODO (ronag): Unify with needFinish. - if (state.errorEmitted || state.closeEmitted) - return; - - state.finished = true; - stream.emit('finish'); - - if (state.autoDestroy) { - // In case of duplex streams we need a way to detect - // if the readable side is ready for autoDestroy as well. - const rState = stream._readableState; - const autoDestroy = !rState || ( - rState.autoDestroy && - // We don't expect the readable to ever 'end' - // if readable is explicitly set to false. - (rState.endEmitted || rState.readable === false) - ); - if (autoDestroy) { - stream.destroy(); - } - } -} - -// TODO(ronag): Avoid using events to implement internal logic. -function onFinished(stream, cb) { - function onerror(err) { - stream.removeListener('finish', onfinish); - stream.removeListener('error', onerror); - cb(err); - if (stream.listenerCount('error') === 0) { - stream.emit('error', err); - } - } - - function onfinish() { - stream.removeListener('finish', onfinish); - stream.removeListener('error', onerror); - cb(); - } - stream.on('finish', onfinish); - stream.prependListener('error', onerror); +for (const method of ObjectKeys(WritableBase.prototype)) { + if (!Writable.prototype[method]) + Writable.prototype[method] = WritableBase.prototype[method]; } ObjectDefineProperties(Writable.prototype, { - - destroyed: { - get() { - return this._writableState ? this._writableState.destroyed : false; - }, - set(value) { - // Backward compatibility, the user is explicitly managing destroyed. - if (this._writableState) { - this._writableState.destroyed = value; - } - } - }, - - writable: { - get() { - const w = this._writableState; - // w.writable === false means that this is part of a Duplex stream - // where the writable side was disabled upon construction. - // Compat. The user might manually disable writable side through - // deprecated setter. - return !!w && w.writable !== false && !w.destroyed && !w.errored && - !w.ending && !w.ended; - }, - set(val) { - // Backwards compatible. - if (this._writableState) { - this._writableState.writable = !!val; - } - } - }, - - writableFinished: { - get() { - return this._writableState ? this._writableState.finished : false; - } - }, - - writableObjectMode: { - get() { - return this._writableState ? this._writableState.objectMode : false; - } - }, + ...ObjectGetOwnPropertyDescriptors(WritableBase.prototype), writableBuffer: { get() { @@ -771,12 +530,6 @@ ObjectDefineProperties(Writable.prototype, { } }, - writableEnded: { - get() { - return this._writableState ? this._writableState.ending : false; - } - }, - writableHighWaterMark: { get() { return this._writableState && this._writableState.highWaterMark; @@ -796,21 +549,11 @@ ObjectDefineProperties(Writable.prototype, { } }); -const destroy = destroyImpl.destroy; Writable.prototype.destroy = function(err, cb) { const state = this._writableState; if (!state.destroyed) { process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write')); } - destroy.call(this, err, cb); + WritableBase.prototype.destroy.call(this, err, cb); return this; }; - -Writable.prototype._undestroy = destroyImpl.undestroy; -Writable.prototype._destroy = function(err, cb) { - cb(err); -}; - -Writable.prototype[EE.captureRejectionSymbol] = function(err) { - this.destroy(err); -}; diff --git a/lib/internal/streams/base.js b/lib/internal/streams/base.js new file mode 100644 index 00000000000000..117b9cddf8a7f1 --- /dev/null +++ b/lib/internal/streams/base.js @@ -0,0 +1,333 @@ +'use strict'; + +const { + ObjectDefineProperties, + ObjectCreate, + Symbol +} = primordials; + +const EE = require('events'); +const Stream = require('stream'); +const { Buffer } = require('buffer'); +const destroyImpl = require('internal/streams/destroy'); + +const { + ERR_INVALID_ARG_TYPE, + ERR_STREAM_CANNOT_PIPE, + ERR_STREAM_DESTROYED, + ERR_STREAM_ALREADY_FINISHED, + ERR_STREAM_NULL_VALUES, + ERR_STREAM_WRITE_AFTER_END, + ERR_UNKNOWN_ENCODING +} = require('internal/errors').codes; + +const { errorOrDestroy } = destroyImpl; + +class WritableStateBase { + constructor(options) { + // Object stream flag to indicate whether or not this stream + // contains buffers or objects. + this.objectMode = !!(options && options.objectMode); + + // At the start of calling end() + this.ending = false; + + // When end() has been called, and returned. + this.ended = false; + + // When 'finish' is emitted. + this.finished = false; + + // Has it been destroyed + this.destroyed = false; + + // Stream is still being constructed and cannot be + // destroyed until construction finished or failed. + // Async construction is opt in, therefore we start as + // constructed. + this.constructed = true; + + // Should we decode strings into buffers before passing to _write? + // this is here so that some node-core streams can optimize string + // handling at a lower level. + const noDecode = !!(options && options.decodeStrings === false); + this.decodeStrings = !noDecode; + + // Crypto is kind of old and crusty. Historically, its default string + // encoding is 'binary' so we have to make this configurable. + // Everything else in the universe uses 'utf8', though. + this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'; + + // True if the error was already emitted and should not be thrown again. + this.errorEmitted = false; + + // Should close be emitted on destroy. Defaults to true. + this.emitClose = !options || options.emitClose !== false; + + // Should .destroy() be called after 'finish' (and potentially 'end'). + this.autoDestroy = !options || options.autoDestroy !== false; + + // Indicates whether the stream has errored. When true all write() calls + // should return false. This is needed since when autoDestroy + // is disabled we need a way to tell whether the stream has failed. + this.errored = false; + + // Indicates whether the stream has finished destroying. + this.closed = false; + } +} + +const kWrite = Symbol('kWrite'); +const kFlush = Symbol('kFlush'); + +function WritableBase(options, isDuplex, State) { + if (typeof options.write !== 'function') { + throw new ERR_INVALID_ARG_TYPE('option.write', 'function', options.write); + } + if (typeof options.flush !== 'function') { + throw new ERR_INVALID_ARG_TYPE('option.flush', 'function', options.flush); + } + + Stream.call(this, options); + + this[kWrite] = options.write; + this[kFlush] = options.flush; + + this._writableState = new State(options, this, isDuplex); + + destroyImpl.construct(this, options.start); +} +WritableBase.prototype = ObjectCreate(Stream.prototype); +WritableBase.errorOrDestroy = errorOrDestroy; +WritableBase.WritableState = WritableStateBase; +WritableBase.prototype.write = function(chunk, encoding, cb) { + const state = this._writableState; + + if (typeof encoding === 'function') { + cb = encoding; + encoding = state.defaultEncoding; + } else if (!encoding) { + encoding = state.defaultEncoding; + } + + if (chunk === null) { + throw new ERR_STREAM_NULL_VALUES(); + } else if (!state.objectMode) { + if (typeof chunk === 'string') { + if (state.decodeStrings !== false) { + chunk = Buffer.from(chunk, encoding); + encoding = 'buffer'; + } + } else if (chunk instanceof Buffer) { + encoding = 'buffer'; + } else if (Stream._isUint8Array(chunk)) { + chunk = Stream._uint8ArrayToBuffer(chunk); + encoding = 'buffer'; + } else { + throw new ERR_INVALID_ARG_TYPE( + 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); + } + } + + let err; + if (state.ending) { + err = new ERR_STREAM_WRITE_AFTER_END(); + } else if (state.destroyed) { + err = new ERR_STREAM_DESTROYED('write'); + } + + if (err) { + if (typeof cb === 'function') { + process.nextTick(cb, err); + } + errorOrDestroy(this, err, true); + return false; + } + + // TODO(ronag): Move more logic from Writable into custom cb. + const ret = this[kWrite](state, chunk, encoding, cb); + // Return false if errored or destroyed in order to break + // any synchronous while(stream.write(data)) loops. + return ret && !state.errored && !state.destroyed; +}; + +// Otherwise people can pipe Writable streams, which is just wrong. +WritableBase.prototype.pipe = function() { + errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); +}; + +WritableBase.prototype.setDefaultEncoding = function(encoding) { + // node::ParseEncoding() requires lower case. + if (typeof encoding === 'string') + encoding = encoding.toLowerCase(); + if (!Buffer.isEncoding(encoding)) + throw new ERR_UNKNOWN_ENCODING(encoding); + this._writableState.defaultEncoding = encoding; + return this; +}; + +WritableBase.prototype.end = function(chunk, encoding, cb) { + const state = this._writableState; + + if (typeof chunk === 'function') { + cb = chunk; + chunk = null; + encoding = null; + } else if (typeof encoding === 'function') { + cb = encoding; + encoding = null; + } + + if (chunk !== null && chunk !== undefined) + this.write(chunk, encoding); + + // This is forgiving in terms of unnecessary calls to end() and can hide + // logic errors. However, usually such errors are harmless and causing a + // hard error can be disproportionately destructive. It is not always + // trivial for the user to determine whether end() needs to be called or not. + let err; + if (!state.errored && !state.ending) { + state.ending = true; + let called = false; + this[kFlush](state, (err) => { + if (called) { + // TODO(ronag): ERR_MULTIPLE_CALLBACK? + return; + } + called = true; + + const sync = !state.ended; + if (err) { + errorOrDestroy(this, err, sync); + } else if (sync) { + process.nextTick(finish, this, state); + } else { + finish(this, state); + } + }); + state.ended = true; + } else if (state.finished) { + err = new ERR_STREAM_ALREADY_FINISHED('end'); + } else if (state.destroyed) { + err = new ERR_STREAM_DESTROYED('end'); + } + + if (typeof cb === 'function') { + if (err || state.finished) + process.nextTick(cb, err); + else + onFinished(this, cb); + } + + return this; +}; + +function finish(stream, state) { + // TODO(ronag): state.closed, state.errored, state.destroyed? + + if (state.errorEmitted) + return; + + // TODO(ronag): This could occur after 'close' is emitted. + + state.finished = true; + stream.emit('finish'); + + if (state.autoDestroy) { + // In case of duplex streams we need a way to detect + // if the readable side is ready for autoDestroy as well. + const rState = stream._readableState; + const autoDestroy = !rState || ( + rState.autoDestroy && + // We don't expect the readable to ever 'end' + // if readable is explicitly set to false. + (rState.endEmitted || rState.readable === false) + ); + if (autoDestroy) { + stream.destroy(); + } + } +} + +WritableBase.prototype[EE.captureRejectionSymbol] = function(err) { + this.destroy(err); +}; +WritableBase.prototype.destroy = destroyImpl.destroy; +WritableBase.prototype._undestroy = destroyImpl.undestroy; +WritableBase.prototype._destroy = function(err, cb) { + cb(err); +}; + +// TODO(ronag): Avoid using events to implement internal logic. +function onFinished(stream, cb) { + function onerror(err) { + stream.removeListener('finish', onfinish); + stream.removeListener('error', onerror); + cb(err); + if (stream.listenerCount('error') === 0) { + stream.emit('error', err); + } + } + + function onfinish() { + stream.removeListener('finish', onfinish); + stream.removeListener('error', onerror); + cb(); + } + stream.on('finish', onfinish); + stream.prependListener('error', onerror); +} + +ObjectDefineProperties(WritableBase.prototype, { + destroyed: { + get() { + return this._writableState ? this._writableState.destroyed : false; + }, + set(value) { + // Backward compatibility, the user is explicitly managing destroyed. + if (this._writableState) { + this._writableState.destroyed = value; + } + } + }, + + writable: { + get() { + const w = this._writableState; + // w.writable === false means that this is part of a Duplex stream + // where the writable side was disabled upon construction. + // Compat. The user might manually disable writable side through + // deprecated setter. + return !!w && w.writable !== false && !w.destroyed && !w.errored && + !w.ending && !w.ended; + }, + set(val) { + // Backwards compatible. + if (this._writableState) { + this._writableState.writable = !!val; + } + } + }, + + writableFinished: { + get() { + return this._writableState ? this._writableState.finished : false; + } + }, + + writableObjectMode: { + get() { + return this._writableState ? this._writableState.objectMode : false; + } + }, + + writableEnded: { + get() { + return this._writableState ? this._writableState.ending : false; + } + } +}); + +module.exports = { + WritableBase +}; diff --git a/node.gyp b/node.gyp index 0af72d48c25150..bedecabfca51ff 100644 --- a/node.gyp +++ b/node.gyp @@ -229,6 +229,7 @@ 'lib/internal/watchdog.js', 'lib/internal/streams/lazy_transform.js', 'lib/internal/streams/async_iterator.js', + 'lib/internal/streams/base.js', 'lib/internal/streams/buffer_list.js', 'lib/internal/streams/duplexpair.js', 'lib/internal/streams/from.js', From fbe7a31967c54c6180832c9d7d1c97e11c0e6bdd Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 21 Jun 2020 13:45:49 +0200 Subject: [PATCH 2/5] fixup --- lib/_stream_writable.js | 19 ++++++++++------- lib/internal/streams/base.js | 21 +++++++++++++------ .../test-stream-writable-write-error.js | 10 ++++----- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index a6fb8c568aea61..60286877928646 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -39,7 +39,11 @@ const { module.exports = Writable; const Stream = require('stream'); -const { WritableBase } = require('internal/streams/base'); +const { + WritableBase, + WritableStateBase, + errorOrDestroy +} = require('internal/streams/base'); const { getHighWaterMark, @@ -54,7 +58,7 @@ const { function nop() {} const kFlush = Symbol('kFlush'); -class WritableState extends WritableBase.WritableState { +class WritableState extends WritableStateBase { constructor(options, stream, isDuplex) { super(options); @@ -203,11 +207,12 @@ function Writable(options) { WritableBase.call(this, { ...options, - start: (stream, state) => { + start: function() { + const state = this._writableState; if (!state.writing) { - clearBuffer(stream, state); + clearBuffer(this, state); } - finishMaybe(stream, state); + finishMaybe(this, state); }, write: writeOrBuffer, flush: function(state, cb) { @@ -305,7 +310,7 @@ function onwriteError(stream, state, er, cb) { // writes. errorBuffer(state, new ERR_STREAM_DESTROYED('write')); // This can emit error, but error must always follow cb. - WritableBase.errorOrDestroy(stream, er); + errorOrDestroy(stream, er); } function onwrite(stream, er) { @@ -314,7 +319,7 @@ function onwrite(stream, er) { const cb = state.writecb; if (typeof cb !== 'function') { - WritableBase.errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); + errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); return; } diff --git a/lib/internal/streams/base.js b/lib/internal/streams/base.js index 117b9cddf8a7f1..9cde737c30b905 100644 --- a/lib/internal/streams/base.js +++ b/lib/internal/streams/base.js @@ -16,6 +16,7 @@ const { ERR_STREAM_CANNOT_PIPE, ERR_STREAM_DESTROYED, ERR_STREAM_ALREADY_FINISHED, + ERR_MULTIPLE_CALLBACK, ERR_STREAM_NULL_VALUES, ERR_STREAM_WRITE_AFTER_END, ERR_UNKNOWN_ENCODING @@ -74,6 +75,10 @@ class WritableStateBase { // Indicates whether the stream has finished destroying. this.closed = false; + + // True if close has been emitted or would have been emitted + // depending on emitClose. + this.closeEmitted = false; } } @@ -98,8 +103,6 @@ function WritableBase(options, isDuplex, State) { destroyImpl.construct(this, options.start); } WritableBase.prototype = ObjectCreate(Stream.prototype); -WritableBase.errorOrDestroy = errorOrDestroy; -WritableBase.WritableState = WritableStateBase; WritableBase.prototype.write = function(chunk, encoding, cb) { const state = this._writableState; @@ -108,6 +111,8 @@ WritableBase.prototype.write = function(chunk, encoding, cb) { encoding = state.defaultEncoding; } else if (!encoding) { encoding = state.defaultEncoding; + } else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding)) { + throw new ERR_UNKNOWN_ENCODING(encoding); } if (chunk === null) { @@ -178,8 +183,10 @@ WritableBase.prototype.end = function(chunk, encoding, cb) { encoding = null; } - if (chunk !== null && chunk !== undefined) + if (chunk !== null && chunk !== undefined) { + // TODO (ronag): Propagate callback error. this.write(chunk, encoding); + } // This is forgiving in terms of unnecessary calls to end() and can hide // logic errors. However, usually such errors are harmless and causing a @@ -191,7 +198,7 @@ WritableBase.prototype.end = function(chunk, encoding, cb) { let called = false; this[kFlush](state, (err) => { if (called) { - // TODO(ronag): ERR_MULTIPLE_CALLBACK? + errorOrDestroy(this, new ERR_MULTIPLE_CALLBACK()); return; } called = true; @@ -225,7 +232,7 @@ WritableBase.prototype.end = function(chunk, encoding, cb) { function finish(stream, state) { // TODO(ronag): state.closed, state.errored, state.destroyed? - if (state.errorEmitted) + if (state.errorEmitted || state.closeEmitted) return; // TODO(ronag): This could occur after 'close' is emitted. @@ -329,5 +336,7 @@ ObjectDefineProperties(WritableBase.prototype, { }); module.exports = { - WritableBase + WritableBase, + WritableStateBase, + errorOrDestroy }; diff --git a/test/parallel/test-stream-writable-write-error.js b/test/parallel/test-stream-writable-write-error.js index 069e32e1be8e3e..f9b6c1ea5a6aec 100644 --- a/test/parallel/test-stream-writable-write-error.js +++ b/test/parallel/test-stream-writable-write-error.js @@ -31,7 +31,7 @@ function test(autoDestroy) { { const w = new Writable({ autoDestroy, - _write() {} + write() {} }); w.end(); expectError(w, ['asd'], 'ERR_STREAM_WRITE_AFTER_END'); @@ -40,7 +40,7 @@ function test(autoDestroy) { { const w = new Writable({ autoDestroy, - _write() {} + write() {} }); w.destroy(); } @@ -48,7 +48,7 @@ function test(autoDestroy) { { const w = new Writable({ autoDestroy, - _write() {} + write() {} }); expectError(w, [null], 'ERR_STREAM_NULL_VALUES', true); } @@ -56,7 +56,7 @@ function test(autoDestroy) { { const w = new Writable({ autoDestroy, - _write() {} + write() {} }); expectError(w, [{}], 'ERR_INVALID_ARG_TYPE', true); } @@ -65,7 +65,7 @@ function test(autoDestroy) { const w = new Writable({ decodeStrings: false, autoDestroy, - _write() {} + write() {} }); expectError(w, ['asd', 'noencoding'], 'ERR_UNKNOWN_ENCODING', true); } From 6069c263ba27c6164a0708fe69a6ae2014391b46 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 21 Jun 2020 13:52:57 +0200 Subject: [PATCH 3/5] fixup --- lib/_stream_writable.js | 14 ++++++++++---- lib/internal/streams/base.js | 8 ++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 60286877928646..0ff6851df9bf27 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -59,9 +59,11 @@ function nop() {} const kFlush = Symbol('kFlush'); class WritableState extends WritableStateBase { - constructor(options, stream, isDuplex) { + constructor(options, stream) { super(options); + let isDuplex = options.isDuplex; + // Duplex streams are both readable and writable, but share // the same options object. // However, some cases require setting options to different @@ -207,6 +209,7 @@ function Writable(options) { WritableBase.call(this, { ...options, + isDuplex, start: function() { const state = this._writableState; if (!state.writing) { @@ -215,7 +218,8 @@ function Writable(options) { finishMaybe(this, state); }, write: writeOrBuffer, - flush: function(state, cb) { + flush: function(cb) { + const state = this._writableState; // .end() fully uncorks. if (state.corked) { state.corked = 1; @@ -226,7 +230,7 @@ function Writable(options) { finishMaybe(this, state); } - }, isDuplex, WritableState); + }, WritableState); } Writable.WritableState = WritableState; Writable.prototype = ObjectCreate(WritableBase.prototype); @@ -249,7 +253,9 @@ Writable.prototype.uncork = function() { // If we're already writing something, then just put this // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. -function writeOrBuffer(state, chunk, encoding, callback) { +function writeOrBuffer(chunk, encoding, callback) { + const state = this._writableState; + if (typeof callback !== 'function') callback = nop; diff --git a/lib/internal/streams/base.js b/lib/internal/streams/base.js index 9cde737c30b905..c020f2d223eac5 100644 --- a/lib/internal/streams/base.js +++ b/lib/internal/streams/base.js @@ -85,7 +85,7 @@ class WritableStateBase { const kWrite = Symbol('kWrite'); const kFlush = Symbol('kFlush'); -function WritableBase(options, isDuplex, State) { +function WritableBase(options, State = WritableStateBase) { if (typeof options.write !== 'function') { throw new ERR_INVALID_ARG_TYPE('option.write', 'function', options.write); } @@ -98,7 +98,7 @@ function WritableBase(options, isDuplex, State) { this[kWrite] = options.write; this[kFlush] = options.flush; - this._writableState = new State(options, this, isDuplex); + this._writableState = new State(options, this); destroyImpl.construct(this, options.start); } @@ -150,7 +150,7 @@ WritableBase.prototype.write = function(chunk, encoding, cb) { } // TODO(ronag): Move more logic from Writable into custom cb. - const ret = this[kWrite](state, chunk, encoding, cb); + const ret = this[kWrite](chunk, encoding, cb); // Return false if errored or destroyed in order to break // any synchronous while(stream.write(data)) loops. return ret && !state.errored && !state.destroyed; @@ -196,7 +196,7 @@ WritableBase.prototype.end = function(chunk, encoding, cb) { if (!state.errored && !state.ending) { state.ending = true; let called = false; - this[kFlush](state, (err) => { + this[kFlush]((err) => { if (called) { errorOrDestroy(this, new ERR_MULTIPLE_CALLBACK()); return; From 37adf2b96146a95edf800298ff998292559b2154 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 21 Jun 2020 14:36:44 +0200 Subject: [PATCH 4/5] fixup --- lib/_stream_writable.js | 2 +- lib/internal/streams/{base.js => writable_base.js} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename lib/internal/streams/{base.js => writable_base.js} (100%) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 0ff6851df9bf27..a00e40c49ec89b 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -43,7 +43,7 @@ const { WritableBase, WritableStateBase, errorOrDestroy -} = require('internal/streams/base'); +} = require('internal/streams/writable_base'); const { getHighWaterMark, diff --git a/lib/internal/streams/base.js b/lib/internal/streams/writable_base.js similarity index 100% rename from lib/internal/streams/base.js rename to lib/internal/streams/writable_base.js From 545b780f7db4487d8f60ed0545cbf78adbd68d41 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 21 Jun 2020 14:38:27 +0200 Subject: [PATCH 5/5] fixup --- node.gyp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node.gyp b/node.gyp index bedecabfca51ff..ea32bcff3a513b 100644 --- a/node.gyp +++ b/node.gyp @@ -229,7 +229,7 @@ 'lib/internal/watchdog.js', 'lib/internal/streams/lazy_transform.js', 'lib/internal/streams/async_iterator.js', - 'lib/internal/streams/base.js', + 'lib/internal/streams/writable_base.js', 'lib/internal/streams/buffer_list.js', 'lib/internal/streams/duplexpair.js', 'lib/internal/streams/from.js',