Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: Extend esm examples, document and lint import forms for process + Buffer #39043

Closed
wants to merge 9 commits into from
205 changes: 198 additions & 7 deletions doc/api/async_context.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ in other languages.
The `AsyncLocalStorage` and `AsyncResource` classes are part of the
`async_hooks` module:

```js
```mjs
import async_hooks from 'async_hooks';
```

```cjs
const async_hooks = require('async_hooks');
```

Expand All @@ -40,7 +44,39 @@ The following example uses `AsyncLocalStorage` to build a simple logger
that assigns IDs to incoming HTTP requests and includes them in messages
logged within each request.

```js
```mjs
import http from 'http';
import { AsyncLocalStorage } from 'async_hooks';

const asyncLocalStorage = new AsyncLocalStorage();

function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}

let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);

http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
// 0: start
// 1: start
// 0: finish
// 1: finish
```

```cjs
const http = require('http');
const { AsyncLocalStorage } = require('async_hooks');

Expand Down Expand Up @@ -299,7 +335,35 @@ The `init` hook will trigger when an `AsyncResource` is instantiated.

The following is an overview of the `AsyncResource` API.

```js
```mjs
import { AsyncResource, executionAsyncId } from 'async_hooks';

// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false }
);

// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);

// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();

// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();

// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();
```

```cjs
const { AsyncResource, executionAsyncId } = require('async_hooks');

// AsyncResource() is meant to be extended. Instantiating a
Expand Down Expand Up @@ -446,7 +510,14 @@ database connection pools, can follow a similar model.
Assuming that the task is adding two numbers, using a file named
`task_processor.js` with the following content:

```js
```mjs
import { parentPort } from 'worker_threads';
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
```

```cjs
const { parentPort } = require('worker_threads');
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
Expand All @@ -455,7 +526,95 @@ parentPort.on('message', (task) => {

a Worker pool around it could use the following structure:

```js
```mjs
import { AsyncResource } from 'async_hooks';
import { EventEmitter } from 'events';
import path from 'path';
import { Worker } from 'worker_threads';

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}

done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}

export default class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];

for (let i = 0; i < numThreads; i++)
this.addNewWorker();

// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}

addNewWorker() {
const worker = new Worker(new URL('task_processer.js', import.meta.url));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}

runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}

const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}

close() {
for (const worker of this.workers) worker.terminate();
}
}
```

```cjs
const { AsyncResource } = require('async_hooks');
const { EventEmitter } = require('events');
const path = require('path');
Expand Down Expand Up @@ -553,7 +712,23 @@ were scheduled.

This pool could be used as follows:

```js
```mjs
import WorkerPool from './worker_pool.js';
import os from 'os';

const pool = new WorkerPool(os.cpus().length);

let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
```

```cjs
const WorkerPool = require('./worker_pool.js');
const os = require('os');

Expand All @@ -579,7 +754,22 @@ The following example shows how to use the `AsyncResource` class to properly
associate an event listener with the correct execution context. The same
approach can be applied to a [`Stream`][] or a similar event-driven class.

```js
```mjs
import { createServer } from 'http';
import { AsyncResource, executionAsyncId } from 'async_hooks';

const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);
```

```cjs
const { createServer } = require('http');
const { AsyncResource, executionAsyncId } = require('async_hooks');

Expand All @@ -593,6 +783,7 @@ const server = createServer((req, res) => {
res.end();
}).listen(3000);
```

[`AsyncResource`]: #async_context_class_asyncresource
[`EventEmitter`]: events.md#events_class_eventemitter
[`Stream`]: stream.md#stream_stream
Expand Down
Loading