Skip to content

Commit

Permalink
lib: implement async_hooks API in core
Browse files Browse the repository at this point in the history
Implement async_hooks support in the following:

* fatalException handler
* process.nextTick
* Timers
* net/dgram/http

PR-URL: nodejs#12892
Ref: nodejs#11883
Ref: nodejs#8531
Reviewed-By: Andreas Madsen <amwebdk@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Sam Roberts <vieuxtech@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Refael Ackermann <refack@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Jeremiah Senkpiel <fishrock123@rocketmail.com>
  • Loading branch information
trevnorris authored and Olivier Martin committed May 19, 2017
1 parent 959eb7e commit 5695596
Show file tree
Hide file tree
Showing 13 changed files with 399 additions and 40 deletions.
9 changes: 7 additions & 2 deletions lib/_http_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const net = require('net');
const util = require('util');
const EventEmitter = require('events');
const debug = util.debuglog('http');
const async_id_symbol = process.binding('async_wrap').async_id_symbol;
const nextTick = require('internal/process/next_tick').nextTick;

// New Agent code.

Expand Down Expand Up @@ -93,6 +95,7 @@ function Agent(options) {
self.freeSockets[name] = freeSockets;
socket.setKeepAlive(true, self.keepAliveMsecs);
socket.unref();
socket[async_id_symbol] = -1;
socket._httpMessage = null;
self.removeSocket(socket, options);
freeSockets.push(socket);
Expand Down Expand Up @@ -163,6 +166,8 @@ Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/,
if (freeLen) {
// we have a free socket, so use that.
var socket = this.freeSockets[name].shift();
// Assign the handle a new asyncId and run any init() hooks.
socket._handle.asyncReset();
debug('have free socket');

// don't leak
Expand All @@ -177,7 +182,7 @@ Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/,
// If we are under maxSockets create a new one.
this.createSocket(req, options, function(err, newSocket) {
if (err) {
process.nextTick(function() {
nextTick(newSocket._handle.getAsyncId(), function() {
req.emit('error', err);
});
return;
Expand Down Expand Up @@ -290,7 +295,7 @@ Agent.prototype.removeSocket = function removeSocket(s, options) {
// If we have pending requests and a socket gets closed make a new one
this.createSocket(req, options, function(err, newSocket) {
if (err) {
process.nextTick(function() {
nextTick(newSocket._handle.getAsyncId(), function() {
req.emit('error', err);
});
return;
Expand Down
6 changes: 5 additions & 1 deletion lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const Agent = require('_http_agent');
const Buffer = require('buffer').Buffer;
const urlToOptions = require('internal/url').urlToOptions;
const outHeadersKey = require('internal/http').outHeadersKey;
const nextTick = require('internal/process/next_tick').nextTick;

// The actual list of disallowed characters in regexp form is more like:
// /[^A-Za-z0-9\-._~!$&'()*+,;=/:@]/
Expand Down Expand Up @@ -587,9 +588,12 @@ function responseKeepAlive(res, req) {
socket.removeListener('close', socketCloseListener);
socket.removeListener('error', socketErrorListener);
socket.once('error', freeSocketErrorListener);
// There are cases where _handle === null. Avoid those. Passing null to
// nextTick() will call initTriggerId() to retrieve the id.
const asyncId = socket._handle ? socket._handle.getAsyncId() : null;
// Mark this socket as available, AFTER user-added end
// handlers have a chance to run.
process.nextTick(emitFreeNT, socket);
nextTick(asyncId, emitFreeNT, socket);
}
}

Expand Down
8 changes: 7 additions & 1 deletion lib/_http_common.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const HTTPParser = binding.HTTPParser;
const FreeList = require('internal/freelist');
const ondrain = require('internal/http').ondrain;
const incoming = require('_http_incoming');
const emitDestroy = require('async_hooks').emitDestroy;
const IncomingMessage = incoming.IncomingMessage;
const readStart = incoming.readStart;
const readStop = incoming.readStop;
Expand Down Expand Up @@ -211,8 +212,13 @@ function freeParser(parser, req, socket) {
parser.incoming = null;
parser.outgoing = null;
parser[kOnExecute] = null;
if (parsers.free(parser) === false)
if (parsers.free(parser) === false) {
parser.close();
} else {
// Since the Parser destructor isn't going to run the destroy() callbacks
// it needs to be triggered manually.
emitDestroy(parser.getAsyncId());
}
}
if (req) {
req.parser = null;
Expand Down
12 changes: 9 additions & 3 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const common = require('_http_common');
const checkIsHttpToken = common._checkIsHttpToken;
const checkInvalidHeaderChar = common._checkInvalidHeaderChar;
const outHeadersKey = require('internal/http').outHeadersKey;
const async_id_symbol = process.binding('async_wrap').async_id_symbol;
const nextTick = require('internal/process/next_tick').nextTick;

const CRLF = common.CRLF;
const debug = common.debug;
Expand Down Expand Up @@ -264,8 +266,9 @@ function _writeRaw(data, encoding, callback) {
if (this.output.length) {
this._flushOutput(conn);
} else if (!data.length) {
if (typeof callback === 'function')
process.nextTick(callback);
if (typeof callback === 'function') {
nextTick(this.socket[async_id_symbol], callback);
}
return true;
}
// Directly write to socket.
Expand Down Expand Up @@ -623,7 +626,10 @@ const crlf_buf = Buffer.from('\r\n');
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
if (this.finished) {
var err = new Error('write after end');
process.nextTick(writeAfterEndNT.bind(this), err, callback);
nextTick(this.socket[async_id_symbol],
writeAfterEndNT.bind(this),
err,
callback);

return true;
}
Expand Down
6 changes: 3 additions & 3 deletions lib/async_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var processing_hook = false;
// Use to temporarily store and updated active_hooks_array if the user enables
// or disables a hook while hooks are being processed.
var tmp_active_hooks_array = null;
// Keep track of the field counds held in tmp_active_hooks_array.
// Keep track of the field counts held in tmp_active_hooks_array.
var tmp_async_hook_fields = null;

// Each constant tracks how many callbacks there are for any given step of
Expand All @@ -41,9 +41,9 @@ var tmp_async_hook_fields = null;
const { kInit, kBefore, kAfter, kDestroy, kCurrentAsyncId, kCurrentTriggerId,
kAsyncUidCntr, kInitTriggerId } = async_wrap.constants;

const { async_id_symbol, trigger_id_symbol } = async_wrap;

// Used in AsyncHook and AsyncEvent.
const async_id_symbol = Symbol('_asyncId');
const trigger_id_symbol = Symbol('_triggerId');
const init_symbol = Symbol('init');
const before_symbol = Symbol('before');
const after_symbol = Symbol('after');
Expand Down
12 changes: 10 additions & 2 deletions lib/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ const assert = require('assert');
const Buffer = require('buffer').Buffer;
const util = require('util');
const EventEmitter = require('events');
const setInitTriggerId = require('async_hooks').setInitTriggerId;
const UV_UDP_REUSEADDR = process.binding('constants').os.UV_UDP_REUSEADDR;
const async_id_symbol = process.binding('async_wrap').async_id_symbol;
const nextTick = require('internal/process/next_tick').nextTick;

const UDP = process.binding('udp_wrap').UDP;
const SendWrap = process.binding('udp_wrap').SendWrap;
Expand Down Expand Up @@ -111,6 +114,7 @@ function Socket(type, listener) {
this._handle = handle;
this._receiving = false;
this._bindState = BIND_STATE_UNBOUND;
this[async_id_symbol] = this._handle.getAsyncId();
this.type = type;
this.fd = null; // compatibility hack

Expand Down Expand Up @@ -432,6 +436,10 @@ function doSend(ex, self, ip, list, address, port, callback) {
req.callback = callback;
req.oncomplete = afterSend;
}
// node::SendWrap isn't instantiated and attached to the JS instance of
// SendWrap above until send() is called. So don't set the init trigger id
// until now.
setInitTriggerId(self[async_id_symbol]);
var err = self._handle.send(req,
list,
list.length,
Expand All @@ -441,7 +449,7 @@ function doSend(ex, self, ip, list, address, port, callback) {
if (err && callback) {
// don't emit as error, dgram_legacy.js compatibility
const ex = exceptionWithHostPort(err, 'send', address, port);
process.nextTick(callback, ex);
nextTick(self[async_id_symbol], callback, ex);
}
}

Expand All @@ -468,7 +476,7 @@ Socket.prototype.close = function(callback) {
this._stopReceiving();
this._handle.close();
this._handle = null;
process.nextTick(socketCloseNT, this);
nextTick(this[async_id_symbol], socketCloseNT, this);

return this;
};
Expand Down
24 changes: 23 additions & 1 deletion lib/internal/bootstrap_node.js
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,20 @@
}

function setupProcessFatal() {
const async_wrap = process.binding('async_wrap');
// Arrays containing hook flags and ids for async_hook calls.
const { async_hook_fields, async_uid_fields } = async_wrap;
// Internal functions needed to manipulate the stack.
const { clearIdStack, popAsyncIds } = async_wrap;
const { kAfter, kCurrentAsyncId, kInitTriggerId } = async_wrap.constants;

process._fatalException = function(er) {
var caught;

// It's possible that kInitTriggerId was set for a constructor call that
// threw and was never cleared. So clear it now.
async_uid_fields[kInitTriggerId] = 0;

if (process.domain && process.domain._errorHandler)
caught = process.domain._errorHandler(er);

Expand All @@ -314,9 +324,21 @@
// nothing to be done about it at this point.
}

// if we handled an error, then make sure any ticks get processed
} else {
// If we handled an error, then make sure any ticks get processed
NativeModule.require('timers').setImmediate(process._tickCallback);

// Emit the after() hooks now that the exception has been handled.
if (async_hook_fields[kAfter] > 0) {
do {
NativeModule.require('async_hooks').emitAfter(
async_uid_fields[kCurrentAsyncId]);
// popAsyncIds() returns true if there are more ids on the stack.
} while (popAsyncIds(async_uid_fields[kCurrentAsyncId]));
// Or completely empty the id stack.
} else {
clearIdStack();
}
}

return caught;
Expand Down
Loading

0 comments on commit 5695596

Please sign in to comment.