Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: allow async createConnection() #4638

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions doc/api/http.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,18 @@ options.agent = keepAliveAgent;
http.request(options, onResponseCallback);
```

### agent.createConnection(options[, callback])

Produces a socket/stream to be used for HTTP requests.

By default, this function is the same as [`net.createConnection()`][]. However,
custom Agents may override this method in case greater flexibility is desired.

A socket/stream can be supplied in one of two ways: by returning the
socket/stream from this function, or by passing the socket/stream to `callback`.

`callback` has a signature of `(err, stream)`.

### agent.destroy()

Destroy any sockets that are currently in use by the agent.
Expand Down Expand Up @@ -1086,6 +1098,10 @@ Options:
- `Agent` object: explicitly use the passed in `Agent`.
- `false`: opts out of connection pooling with an Agent, defaults request to
`Connection: close`.
- `createConnection`: A function that produces a socket/stream to use for the
request when the `agent` option is not used. This can be used to avoid
creating a custom Agent class just to override the default `createConnection`
function. See [`agent.createConnection()`][] for more details.

The optional `callback` parameter will be added as a one time listener for
the `'response'` event.
Expand Down Expand Up @@ -1161,6 +1177,7 @@ There are a few special headers that should be noted.
[`'listening'`]: net.html#net_event_listening
[`'response'`]: #http_event_response
[`Agent`]: #http_class_http_agent
[`agent.createConnection`]: #http_agent_createconnection
[`Buffer`]: buffer.html#buffer_buffer
[`destroy()`]: #http_agent_destroy
[`EventEmitter`]: events.html#events_class_events_eventemitter
Expand All @@ -1172,6 +1189,7 @@ There are a few special headers that should be noted.
[`http.Server`]: #http_class_http_server
[`http.ServerResponse`]: #http_class_http_serverresponse
[`message.headers`]: #http_message_headers
[`net.createConnection`]: net.html#net_net_createconnection_options_connectlistener
[`net.Server`]: net.html#net_class_net_server
[`net.Server.close()`]: net.html#net_server_close_callback
[`net.Server.listen()`]: net.html#net_server_listen_handle_callback
Expand Down
110 changes: 67 additions & 43 deletions lib/_http_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ function Agent(options) {
var name = self.getName(options);
debug('agent.on(free)', name);

if (!socket.destroyed &&
if (socket.writable &&
self.requests[name] && self.requests[name].length) {
self.requests[name].shift().onSocket(socket);
if (self.requests[name].length === 0) {
Expand All @@ -57,7 +57,7 @@ function Agent(options) {
var req = socket._httpMessage;
if (req &&
req.shouldKeepAlive &&
!socket.destroyed &&
socket.writable &&
self.keepAlive) {
var freeSockets = self.freeSockets[name];
var freeLen = freeSockets ? freeSockets.length : 0;
Expand Down Expand Up @@ -138,7 +138,15 @@ Agent.prototype.addRequest = function(req, options) {
} else if (sockLen < this.maxSockets) {
debug('call onSocket', sockLen, freeLen);
// If we are under maxSockets create a new one.
req.onSocket(this.createSocket(req, options));
this.createSocket(req, options, function(err, newSocket) {
if (err) {
process.nextTick(function() {
req.emit('error', err);
});
return;
}
req.onSocket(newSocket);
});
} else {
debug('wait for socket');
// We are over limit so we'll add it to the queue.
Expand All @@ -149,18 +157,16 @@ Agent.prototype.addRequest = function(req, options) {
}
};

Agent.prototype.createSocket = function(req, options) {
Agent.prototype.createSocket = function(req, options, cb) {
var self = this;
options = util._extend({}, options);
options = util._extend(options, self.options);

if (!options.servername) {
options.servername = options.host;
if (req) {
var hostHeader = req.getHeader('host');
if (hostHeader) {
options.servername = hostHeader.replace(/:.*$/, '');
}
const hostHeader = req.getHeader('host');
if (hostHeader) {
options.servername = hostHeader.replace(/:.*$/, '');
}
}

Expand All @@ -169,48 +175,58 @@ Agent.prototype.createSocket = function(req, options) {

debug('createConnection', name, options);
options.encoding = null;
var s = self.createConnection(options);
if (!self.sockets[name]) {
self.sockets[name] = [];
}
this.sockets[name].push(s);
debug('sockets', name, this.sockets[name].length);
var called = false;
const newSocket = self.createConnection(options, oncreate);
if (newSocket)
oncreate(null, newSocket);
function oncreate(err, s) {
if (called)
return;
called = true;
if (err)
return cb(err);
if (!self.sockets[name]) {
self.sockets[name] = [];
}
self.sockets[name].push(s);
debug('sockets', name, self.sockets[name].length);

function onFree() {
self.emit('free', s, options);
}
s.on('free', onFree);

function onClose(err) {
debug('CLIENT socket onClose');
// This is the only place where sockets get removed from the Agent.
// If you want to remove a socket from the pool, just close it.
// All socket errors end in a close event anyway.
self.removeSocket(s, options);
}
s.on('close', onClose);

function onRemove() {
// We need this function for cases like HTTP 'upgrade'
// (defined by WebSockets) where we need to remove a socket from the
// pool because it'll be locked up indefinitely
debug('CLIENT socket onRemove');
self.removeSocket(s, options);
s.removeListener('close', onClose);
s.removeListener('free', onFree);
s.removeListener('agentRemove', onRemove);
function onFree() {
self.emit('free', s, options);
}
s.on('free', onFree);

function onClose(err) {
debug('CLIENT socket onClose');
// This is the only place where sockets get removed from the Agent.
// If you want to remove a socket from the pool, just close it.
// All socket errors end in a close event anyway.
self.removeSocket(s, options);
}
s.on('close', onClose);

function onRemove() {
// We need this function for cases like HTTP 'upgrade'
// (defined by WebSockets) where we need to remove a socket from the
// pool because it'll be locked up indefinitely
debug('CLIENT socket onRemove');
self.removeSocket(s, options);
s.removeListener('close', onClose);
s.removeListener('free', onFree);
s.removeListener('agentRemove', onRemove);
}
s.on('agentRemove', onRemove);
cb(null, s);
}
s.on('agentRemove', onRemove);
return s;
};

Agent.prototype.removeSocket = function(s, options) {
var name = this.getName(options);
debug('removeSocket', name, 'destroyed:', s.destroyed);
debug('removeSocket', name, 'writable:', s.writable);
var sets = [this.sockets];

// If the socket was destroyed, remove it from the free buffers too.
if (s.destroyed)
if (!s.writable)
sets.push(this.freeSockets);

for (var sk = 0; sk < sets.length; sk++) {
Expand All @@ -231,7 +247,15 @@ Agent.prototype.removeSocket = function(s, options) {
debug('removeSocket, have a request, make a socket');
var req = this.requests[name][0];
// If we have pending requests and a socket gets closed make a new one
this.createSocket(req, options).emit('free');
this.createSocket(req, options, function(err, newSocket) {
if (err) {
process.nextTick(function() {
req.emit('error', err);
});
return;
}
newSocket.emit('free');
});
}
};

Expand Down
41 changes: 37 additions & 4 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ function ClientRequest(options, cb) {
if (agent === false) {
agent = new defaultAgent.constructor();
} else if ((agent === null || agent === undefined) &&
!options.createConnection) {
typeof options.createConnection !== 'function') {
agent = defaultAgent;
}
self.agent = agent;
Expand Down Expand Up @@ -118,10 +118,20 @@ function ClientRequest(options, cb) {
self._renderHeaders());
}

var called = false;
if (self.socketPath) {
self._last = true;
self.shouldKeepAlive = false;
self.onSocket(self.agent.createConnection({ path: self.socketPath }));
const optionsPath = {
path: self.socketPath
};
const newSocket = self.agent.createConnection(optionsPath, oncreate);
if (newSocket && !called) {
called = true;
self.onSocket(newSocket);
} else {
return;
}
} else if (self.agent) {
// If there is an agent we should default to Connection:keep-alive,
// but only if the Agent will actually reuse the connection!
Expand All @@ -139,14 +149,37 @@ function ClientRequest(options, cb) {
// No agent, default to Connection:close.
self._last = true;
self.shouldKeepAlive = false;
if (options.createConnection) {
self.onSocket(options.createConnection(options));
if (typeof options.createConnection === 'function') {
const newSocket = options.createConnection(options, oncreate);
if (newSocket && !called) {
called = true;
self.onSocket(newSocket);
} else {
return;
}
} else {
debug('CLIENT use net.createConnection', options);
self.onSocket(net.createConnection(options));
}
}

function oncreate(err, socket) {
if (called)
return;
called = true;
if (err) {
process.nextTick(function() {
self.emit('error', err);
});
return;
}
self.onSocket(socket);
self._deferToConnect(null, null, function() {
self._flush();
self = null;
});
}

self._deferToConnect(null, null, function() {
self._flush();
self = null;
Expand Down
68 changes: 51 additions & 17 deletions test/parallel/test-http-createConnection.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,61 @@
'use strict';
var common = require('../common');
var assert = require('assert');
var http = require('http');
var net = require('net');
const common = require('../common');
const http = require('http');
const net = require('net');
const assert = require('assert');

var create = 0;
var response = 0;
process.on('exit', function() {
assert.equal(1, create, 'createConnection() http option was not called');
assert.equal(1, response, 'http server "request" callback was not called');
});

var server = http.createServer(function(req, res) {
const server = http.createServer(common.mustCall(function(req, res) {
res.end();
response++;
}).listen(common.PORT, '127.0.0.1', function() {
http.get({ createConnection: createConnection }, function(res) {
}, 4)).listen(common.PORT, '127.0.0.1', function() {
let fn = common.mustCall(createConnection);
http.get({ createConnection: fn }, function(res) {
res.resume();
server.close();
fn = common.mustCall(createConnectionAsync);
http.get({ createConnection: fn }, function(res) {
res.resume();
fn = common.mustCall(createConnectionBoth1);
http.get({ createConnection: fn }, function(res) {
res.resume();
fn = common.mustCall(createConnectionBoth2);
http.get({ createConnection: fn }, function(res) {
res.resume();
fn = common.mustCall(createConnectionError);
http.get({ createConnection: fn }, function(res) {
assert.fail(null, null, 'Unexpected response callback');
}).on('error', common.mustCall(function(err) {
assert.equal(err.message, 'Could not create socket');
server.close();
}));
});
});
});
});
});

function createConnection() {
create++;
return net.createConnection(common.PORT, '127.0.0.1');
}

function createConnectionAsync(options, cb) {
setImmediate(function() {
cb(null, net.createConnection(common.PORT, '127.0.0.1'));
});
}

function createConnectionBoth1(options, cb) {
const socket = net.createConnection(common.PORT, '127.0.0.1');
setImmediate(function() {
cb(null, socket);
});
return socket;
}

function createConnectionBoth2(options, cb) {
const socket = net.createConnection(common.PORT, '127.0.0.1');
cb(null, socket);
return socket;
}

function createConnectionError(options, cb) {
process.nextTick(cb, new Error('Could not create socket'));
}