diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index 1eac2ddf9413fd..924d07a986a906 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -1,18 +1,23 @@ 'use strict'; +const assert = require('assert'); const util = require('util'); const Socket = require('net').Socket; const JSStream = process.binding('js_stream').JSStream; const uv = process.binding('uv'); +const debug = util.debuglog('stream_wrap'); function StreamWrap(stream) { - var handle = new JSStream(); + const handle = new JSStream(); this.stream = stream; - var self = this; + this._list = null; + + const self = this; handle.close = function(cb) { - cb(); + debug('close'); + self.doClose(cb); }; handle.isAlive = function() { return self.isAlive(); @@ -27,21 +32,25 @@ function StreamWrap(stream) { return self.readStop(); }; handle.onshutdown = function(req) { - return self.shutdown(req); + return self.doShutdown(req); }; handle.onwrite = function(req, bufs) { - return self.write(req, bufs); + return self.doWrite(req, bufs); }; this.stream.pause(); + this.stream.on('error', function(err) { + self.emit('error', err); + }); this.stream.on('data', function(chunk) { - self._handle.readBuffer(chunk); + debug('data', chunk.length); + if (self._handle) + self._handle.readBuffer(chunk); }); this.stream.once('end', function() { - self._handle.emitEOF(); - }); - this.stream.on('error', function(err) { - self.emit('error', err); + debug('end'); + if (self._handle) + self._handle.emitEOF(); }); Socket.call(this, { @@ -55,11 +64,11 @@ module.exports = StreamWrap; StreamWrap.StreamWrap = StreamWrap; StreamWrap.prototype.isAlive = function isAlive() { - return this.readable && this.writable; + return true; }; StreamWrap.prototype.isClosing = function isClosing() { - return !this.isAlive(); + return !this.readable || !this.writable; }; StreamWrap.prototype.readStart = function readStart() { @@ -72,21 +81,31 @@ StreamWrap.prototype.readStop = function readStop() { return 0; }; -StreamWrap.prototype.shutdown = function shutdown(req) { - var self = this; +StreamWrap.prototype.doShutdown = function doShutdown(req) { + const self = this; + const handle = this._handle; + const item = this._enqueue('shutdown', req); this.stream.end(function() { // Ensure that write was dispatched setImmediate(function() { - self._handle.finishShutdown(req, 0); + if (!self._dequeue(item)) + return; + + handle.finishShutdown(req, 0); }); }); return 0; }; -StreamWrap.prototype.write = function write(req, bufs) { +StreamWrap.prototype.doWrite = function doWrite(req, bufs) { + const self = this; + const handle = self._handle; + var pending = bufs.length; - var self = this; + + // Queue the request to be able to cancel it + const item = self._enqueue('write', req); self.stream.cork(); bufs.forEach(function(buf) { @@ -103,6 +122,10 @@ StreamWrap.prototype.write = function write(req, bufs) { // Ensure that write was dispatched setImmediate(function() { + // Do not invoke callback twice + if (!self._dequeue(item)) + return; + var errCode = 0; if (err) { if (err.code && uv['UV_' + err.code]) @@ -111,10 +134,83 @@ StreamWrap.prototype.write = function write(req, bufs) { errCode = uv.UV_EPIPE; } - self._handle.doAfterWrite(req); - self._handle.finishWrite(req, errCode); + handle.doAfterWrite(req); + handle.finishWrite(req, errCode); }); } return 0; }; + +function QueueItem(type, req) { + this.type = type; + this.req = req; + this.prev = this; + this.next = this; +} + +StreamWrap.prototype._enqueue = function enqueue(type, req) { + const item = new QueueItem(type, req); + if (this._list === null) { + this._list = item; + return item; + } + + item.next = this._list.next; + item.prev = this._list; + item.next.prev = item; + item.prev.next = item; + + return item; +}; + +StreamWrap.prototype._dequeue = function dequeue(item) { + assert(item instanceof QueueItem); + + var next = item.next; + var prev = item.prev; + + if (next === null && prev === null) + return false; + + item.next = null; + item.prev = null; + + if (next === item) { + prev = null; + next = null; + } else { + prev.next = next; + next.prev = prev; + } + + if (this._list === item) + this._list = next; + + return true; +}; + +StreamWrap.prototype.doClose = function doClose(cb) { + const self = this; + const handle = self._handle; + + setImmediate(function() { + while (self._list !== null) { + const item = self._list; + const req = item.req; + self._dequeue(item); + + const errCode = uv.UV_ECANCELED; + if (item.type === 'write') { + handle.doAfterWrite(req); + handle.finishWrite(req, errCode); + } else if (item.type === 'shutdown') { + handle.finishShutdown(req, errCode); + } + } + + // Should be already set by net.js + assert(self._handle === null); + cb(); + }); +}; diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index 8ccee2379d4309..b96e577a76005b 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -254,7 +254,7 @@ function TLSSocket(socket, options) { this.encrypted = true; net.Socket.call(this, { - handle: this._wrapHandle(wrap && wrap._handle), + handle: this._wrapHandle(wrap), allowHalfOpen: socket && socket.allowHalfOpen, readable: false, writable: false @@ -279,7 +279,7 @@ util.inherits(TLSSocket, net.Socket); exports.TLSSocket = TLSSocket; var proxiedMethods = [ - 'close', 'ref', 'unref', 'open', 'bind', 'listen', 'connect', 'bind6', + 'ref', 'unref', 'open', 'bind', 'listen', 'connect', 'bind6', 'connect6', 'getsockname', 'getpeername', 'setNoDelay', 'setKeepAlive', 'setSimultaneousAccepts', 'setBlocking', @@ -295,8 +295,20 @@ proxiedMethods.forEach(function(name) { }; }); -TLSSocket.prototype._wrapHandle = function(handle) { +tls_wrap.TLSWrap.prototype.close = function closeProxy(cb) { + if (this._parentWrap && this._parentWrap._handle === this._parent) { + setImmediate(cb); + return this._parentWrap.destroy(); + } + return this._parent.close(cb); +}; + +TLSSocket.prototype._wrapHandle = function(wrap) { var res; + var handle; + + if (wrap) + handle = wrap._handle; var options = this._tlsOptions; if (!handle) { @@ -310,6 +322,7 @@ TLSSocket.prototype._wrapHandle = function(handle) { tls.createSecureContext(); res = tls_wrap.wrap(handle, context.context, options.isServer); res._parent = handle; + res._parentWrap = wrap; res._secureContext = context; res.reading = handle.reading; Object.defineProperty(handle, 'reading', { @@ -355,7 +368,13 @@ TLSSocket.prototype._init = function(socket, wrap) { // represent real writeQueueSize during regular writes. ssl.writeQueueSize = 1; - this.server = options.server || null; + this.server = options.server; + + // Move the server to TLSSocket, otherwise both `socket.destroy()` and + // `TLSSocket.destroy()` will decrement number of connections of the TLS + // server, leading to misfiring `server.close()` callback + if (socket && socket.server === this.server) + socket.server = null; // For clients, we will always have either a given ca list or be using // default one @@ -418,6 +437,7 @@ TLSSocket.prototype._init = function(socket, wrap) { // set `.onsniselect` callback. if (process.features.tls_sni && options.isServer && + options.SNICallback && options.server && (options.SNICallback !== SNICallback || options.server._contexts.length)) { @@ -554,6 +574,10 @@ TLSSocket.prototype._start = function() { return; } + // Socket was destroyed before the connection was established + if (!this._handle) + return; + debug('start'); if (this._tlsOptions.requestOCSP) this._handle.requestOCSP(); diff --git a/src/js_stream.cc b/src/js_stream.cc index 6b7c4063e05a2a..91041d0201188d 100644 --- a/src/js_stream.cc +++ b/src/js_stream.cc @@ -163,7 +163,7 @@ template void JSStream::Finish(const FunctionCallbackInfo& args) { Wrap* w = Unwrap(args[0].As()); - w->Done(args[0]->Int32Value()); + w->Done(args[1]->Int32Value()); } diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index b8a648de923081..d4c7c9055d529d 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -320,6 +320,10 @@ void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) { TLSWrap* wrap = req_wrap->wrap()->Cast(); req_wrap->Dispose(); + // We should not be getting here after `DestroySSL`, because all queued writes + // must be invoked with UV_ECANCELED + CHECK_NE(wrap->ssl_, nullptr); + // Handle error if (status) { // Ignore errors after shutdown @@ -331,9 +335,6 @@ void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) { return; } - if (wrap->ssl_ == nullptr) - return; - // Commit NodeBIO::FromBIO(wrap->enc_out_)->Read(nullptr, wrap->write_size_); diff --git a/test/parallel/test-stream-wrap.js b/test/parallel/test-stream-wrap.js new file mode 100644 index 00000000000000..e7a7ecddd2385d --- /dev/null +++ b/test/parallel/test-stream-wrap.js @@ -0,0 +1,39 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const StreamWrap = require('_stream_wrap'); +const Duplex = require('stream').Duplex; +const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap; + +var done = false; + +function testShutdown(callback) { + var stream = new Duplex({ + read: function() { + }, + write: function() { + } + }); + + var wrap = new StreamWrap(stream); + + var req = new ShutdownWrap(); + req.oncomplete = function(code) { + assert(code < 0); + callback(); + }; + req.handle = wrap._handle; + + // Close the handle to simulate + wrap.destroy(); + req.handle.shutdown(req); +} + +testShutdown(function() { + done = true; +}); + +process.on('exit', function() { + assert(done); +}); diff --git a/test/parallel/test-tls-connect-given-socket.js b/test/parallel/test-tls-connect-given-socket.js index 9e8170b13af1b7..902b67aa515c02 100644 --- a/test/parallel/test-tls-connect-given-socket.js +++ b/test/parallel/test-tls-connect-given-socket.js @@ -43,16 +43,33 @@ var server = tls.createServer(options, function(socket) { }); assert(client.readable); assert(client.writable); + + return client; } - // Already connected socket - var connected = net.connect(common.PORT, function() { - establish(connected); + // Immediate death socket + var immediateDeath = net.connect(common.PORT); + establish(immediateDeath).destroy(); + + // Outliving + var outlivingTCP = net.connect(common.PORT); + outlivingTCP.on('connect', function() { + outlivingTLS.destroy(); + next(); }); + var outlivingTLS = establish(outlivingTCP); + + function next() { + // Already connected socket + var connected = net.connect(common.PORT, function() { + establish(connected); + }); + + // Connecting socket + var connecting = net.connect(common.PORT); + establish(connecting); - // Connecting socket - var connecting = net.connect(common.PORT); - establish(connecting); + } }); process.on('exit', function() { diff --git a/test/parallel/test-tls-destroy-whilst-write.js b/test/parallel/test-tls-destroy-whilst-write.js new file mode 100644 index 00000000000000..8b865fab178365 --- /dev/null +++ b/test/parallel/test-tls-destroy-whilst-write.js @@ -0,0 +1,29 @@ +'use strict'; +var assert = require('assert'); +var common = require('../common'); + +if (!common.hasCrypto) { + console.log('1..0 # Skipped: missing crypto'); + process.exit(); +} +var tls = require('tls'); +var stream = require('stream'); + +var delay = new stream.Duplex({ + read: function read() { + }, + write: function write(data, enc, cb) { + console.log('pending'); + setTimeout(function() { + console.log('done'); + cb(); + }, 200); + } +}); + +var secure = tls.connect({ + socket: delay +}); +setImmediate(function() { + secure.destroy(); +});