diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 5253be1b393460..7dd3733a986686 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -108,6 +108,7 @@ const { writeGeneric, writevGeneric, onStreamRead, + kAfterAsyncWrite, kMaybeDestroy, kUpdateTimer } = require('internal/stream_base_commons'); @@ -1515,21 +1516,6 @@ function trackWriteState(stream, bytes) { session[kHandle].chunksSentSinceLastWrite = 0; } -function afterDoStreamWrite(status, handle) { - const stream = handle[kOwner]; - const session = stream[kSession]; - - stream[kUpdateTimer](); - - const { bytes } = this; - stream[kState].writeQueueSize -= bytes; - - if (session !== undefined) - session[kState].writeQueueSize -= bytes; - if (typeof this.callback === 'function') - this.callback(null); -} - function streamOnResume() { if (!this.destroyed) this[kHandle].readStart(); @@ -1782,6 +1768,13 @@ class Http2Stream extends Duplex { 'bug in Node.js'); } + [kAfterAsyncWrite]({ bytes }) { + this[kState].writeQueueSize -= bytes; + + if (this.session !== undefined) + this.session[kState].writeQueueSize -= bytes; + } + [kWriteGeneric](writev, data, encoding, cb) { // When the Http2Stream is first created, it is corked until the // handle and the stream ID is assigned. However, if the user calls @@ -1808,7 +1801,7 @@ class Http2Stream extends Duplex { if (!this.headersSent) this[kProceed](); - const req = createWriteWrap(this[kHandle], afterDoStreamWrite); + const req = createWriteWrap(this[kHandle]); req.stream = this[kID]; if (writev) diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index 709395fa910cb2..31291e751d57a6 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -16,6 +16,7 @@ const { owner_symbol } = require('internal/async_hooks').symbols; const kMaybeDestroy = Symbol('kMaybeDestroy'); const kUpdateTimer = Symbol('kUpdateTimer'); +const kAfterAsyncWrite = Symbol('kAfterAsyncWrite'); function handleWriteReq(req, data, encoding) { const { handle } = req; @@ -52,11 +53,33 @@ function handleWriteReq(req, data, encoding) { } } -function createWriteWrap(handle, oncomplete) { +function onWriteComplete(status) { + const stream = this.handle[owner_symbol]; + + if (stream.destroyed) { + if (typeof this.callback === 'function') + this.callback(null); + return; + } + + if (status < 0) { + const ex = errnoException(status, 'write', this.error); + stream.destroy(ex, this.callback); + return; + } + + stream[kUpdateTimer](); + stream[kAfterAsyncWrite](this); + + if (typeof this.callback === 'function') + this.callback(null); +} + +function createWriteWrap(handle) { var req = new WriteWrap(); req.handle = handle; - req.oncomplete = oncomplete; + req.oncomplete = onWriteComplete; req.async = false; req.bytes = 0; req.buffer = null; @@ -160,6 +183,7 @@ module.exports = { writevGeneric, writeGeneric, onStreamRead, + kAfterAsyncWrite, kMaybeDestroy, kUpdateTimer, }; diff --git a/lib/net.js b/lib/net.js index 503946047d1e4f..d4a56ceee7f2f5 100644 --- a/lib/net.js +++ b/lib/net.js @@ -62,6 +62,7 @@ const { writevGeneric, writeGeneric, onStreamRead, + kAfterAsyncWrite, kUpdateTimer } = require('internal/stream_base_commons'); const errors = require('internal/errors'); @@ -693,6 +694,10 @@ protoGetter('localPort', function localPort() { }); +Socket.prototype[kAfterAsyncWrite] = function() { + this[kLastWriteQueueSize] = 0; +}; + Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { // If we are still connecting, then buffer this for later. // The Writable logic will buffer up any more writes while @@ -715,7 +720,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { this._unrefTimer(); - var req = createWriteWrap(this._handle, afterWrite); + var req = createWriteWrap(this._handle); if (writev) writevGeneric(this, req, data, cb); else @@ -779,39 +784,6 @@ protoGetter('bytesWritten', function bytesWritten() { }); -function afterWrite(status, handle, err) { - var self = handle[owner_symbol]; - if (self !== process.stderr && self !== process.stdout) - debug('afterWrite', status); - - if (this.async) - self[kLastWriteQueueSize] = 0; - - // callback may come after call to destroy. - if (self.destroyed) { - debug('afterWrite destroyed'); - if (this.callback) - this.callback(null); - return; - } - - if (status < 0) { - var ex = errnoException(status, 'write', this.error); - debug('write failure', ex); - self.destroy(ex, this.callback); - return; - } - - self._unrefTimer(); - - if (self !== process.stderr && self !== process.stdout) - debug('afterWrite call cb'); - - if (this.callback) - this.callback.call(undefined); -} - - function checkBindError(err, port, handle) { // EADDRINUSE may not be reported until we call listen() or connect(). // To complicate matters, a failed bind() followed by listen() or connect() diff --git a/src/node_http2.cc b/src/node_http2.cc index 20b634f90fc00d..cb37d2940f927a 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1570,8 +1570,12 @@ void Http2Session::ClearOutgoing(int status) { current_outgoing_buffers_.swap(outgoing_buffers_); for (const nghttp2_stream_write& wr : current_outgoing_buffers_) { WriteWrap* wrap = wr.req_wrap; - if (wrap != nullptr) - wrap->Done(status); + if (wrap != nullptr) { + // TODO(addaleax): Pass `status` instead of 0, so that we actually error + // out with the error from the write to the underlying protocol, + // if one occurred. + wrap->Done(0); + } } }