Skip to content

Commit

Permalink
feat: Add Piscina#close API (#396)
Browse files Browse the repository at this point in the history
Co-authored-by: Carlos Fuentes <me@metcoder.dev>
  • Loading branch information
andersonjoseph and metcoder95 committed Sep 10, 2023
1 parent e59f88c commit 5378e4c
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 7 deletions.
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ This class extends [`EventEmitter`][] from Node.js.
`fs.close()`, and will close them automatically when the Worker exits.
Defaults to `true`. (This option is only supported on Node.js 12.19+ and
all Node.js versions higher than 14.6.0).
* `closeTimeout`: (`number`) An optional time (in milliseconds) to wait for the pool to
complete all in-flight tasks when `close()` is called. The default is `30000`

Use caution when setting resource limits. Setting limits that are too low may
result in the `Piscina` worker threads being unusable.
Expand Down Expand Up @@ -420,6 +422,21 @@ Stops all Workers and rejects all `Promise`s for pending tasks.

This returns a `Promise` that is fulfilled once all threads have stopped.

### Method: `close([options])`

* `options`:
* `force`: A `boolean` value that indicates whether to abort all tasks that
are enqueued but not started yet. The default is `false`.

Stops all Workers gracefully.

This returns a `Promise` that is fulfilled once all tasks that were started
have completed and all threads have stopped.

This method is similar to `destroy()`, but with the difference that `close()`
will wait for the worker tasks to finish, while `destroy()`
will abort them immediately.

### Event: `'error'`

An `'error'` event is emitted by instances of this class when:
Expand All @@ -440,7 +457,7 @@ A `'drain'` event is emitted whenever the `queueSize` reaches `0`.

Similar to [`Piscina#needsDrain`](#property-needsdrain-readonly);
this event is triggered once the total capacity of the pool is exceeded
by number of tasks enequeued that are pending of execution.
by number of tasks enqueued that are pending of execution.

### Event: `'message'`

Expand Down
125 changes: 119 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
kValue
} from './common';
import { version } from '../package.json';
import { setTimeout as sleep } from 'timers/promises';

const cpuCount : number = (() => {
try {
Expand Down Expand Up @@ -119,6 +120,7 @@ interface Options {
taskQueue? : TaskQueue,
niceIncrement? : number,
trackUnmanagedFds? : boolean,
closeTimeout?: number
}

interface FilledOptions extends Options {
Expand All @@ -131,7 +133,8 @@ interface FilledOptions extends Options {
concurrentTasksPerWorker : number,
useAtomics: boolean,
taskQueue : TaskQueue,
niceIncrement : number
niceIncrement : number,
closeTimeout : number
}

const kDefaultOptions : FilledOptions = {
Expand All @@ -145,7 +148,8 @@ const kDefaultOptions : FilledOptions = {
useAtomics: true,
taskQueue: new ArrayTaskQueue(),
niceIncrement: 0,
trackUnmanagedFds: true
trackUnmanagedFds: true,
closeTimeout: 30000
};

interface RunOptions {
Expand All @@ -169,6 +173,14 @@ const kDefaultRunOptions : FilledRunOptions = {
name: null
};

interface CloseOptions {
force?: boolean,
}

const kDefaultCloseOptions : Required<CloseOptions> = {
force: false
};

class DirectlyTransferable implements Transferable {
#value : object;
constructor (value : object) {
Expand Down Expand Up @@ -386,7 +398,9 @@ const Errors = {
TaskQueueAtLimit:
() => new Error('Task queue is at limit'),
NoTaskQueueAvailable:
() => new Error('No task queue available and all Workers are busy')
() => new Error('No task queue available and all Workers are busy'),
CloseTimeout:
() => new Error('Close operation timed out')
};

class WorkerInfo extends AsynchronouslyCreatedResource {
Expand Down Expand Up @@ -525,6 +539,7 @@ class ThreadPool {
start : number = performance.now();
inProcessPendingMessages : boolean = false;
startingUp : boolean = false;
closingUp : boolean = false;
workerFailsDuringBootstrap : boolean = false;

constructor (publicInterface : Piscina, options : Options) {
Expand Down Expand Up @@ -562,6 +577,9 @@ class ThreadPool {
}

_ensureMinimumWorkers () : void {
if (this.closingUp) {
return;
}
while (this.workers.size < this.options.minThreads) {
this._addNewWorker();
}
Expand Down Expand Up @@ -751,8 +769,7 @@ class ThreadPool {
name
} = options;
const {
transferList = [],
signal = null
transferList = []
} = options;
if (filename == null) {
filename = this.options.filename;
Expand All @@ -765,6 +782,16 @@ class ThreadPool {
}
filename = maybeFileURLToPath(filename);

let signal: AbortSignalAny | null;
if (this.closingUp) {
const closingUpAbortController = new AbortController();
closingUpAbortController.abort('queue is closing up');

signal = closingUpAbortController.signal;
} else {
signal = options.signal ?? null;
}

let resolve : (result : any) => void;
let reject : (err : Error) => void;
// eslint-disable-next-line
Expand Down Expand Up @@ -913,6 +940,71 @@ class ThreadPool {

await Promise.all(exitEvents);
}

async close (options : Required<CloseOptions>) {
this.closingUp = true;

if (options.force) {
const skipQueueLength = this.skipQueue.length;
for (let i = 0; i < skipQueueLength; i++) {
const taskInfo : TaskInfo = this.skipQueue.shift() as TaskInfo;
if (taskInfo.workerInfo === null) {
taskInfo.done(new AbortError());
} else {
this.skipQueue.push(taskInfo);
}
}

const taskQueueLength = this.taskQueue.size;
for (let i = 0; i < taskQueueLength; i++) {
const taskInfo : TaskInfo = this.taskQueue.shift() as TaskInfo;
if (taskInfo.workerInfo === null) {
taskInfo.done(new AbortError());
} else {
this.taskQueue.push(taskInfo);
}
}
}

const onPoolFlushed = () => new Promise<void>((resolve) => {
const numberOfWorkers = this.workers.size;
let numberOfWorkersDone = 0;

const checkIfWorkerIsDone = (workerInfo: WorkerInfo) => {
if (workerInfo.taskInfos.size === 0) {
numberOfWorkersDone++;
}

if (numberOfWorkers === numberOfWorkersDone) {
resolve();
}
};

for (const workerInfo of this.workers) {
checkIfWorkerIsDone(workerInfo);

workerInfo.port.on('message', () => checkIfWorkerIsDone(workerInfo));
}
});

const throwOnTimeOut = async (timeout: number) => {
await sleep(timeout);
throw Errors.CloseTimeout();
};

try {
await Promise.race([
onPoolFlushed(),
throwOnTimeOut(this.options.closeTimeout)
]);
} catch (error) {
this.publicInterface.emit('error', error);
} finally {
await this.destroy();
this.publicInterface.emit('close');
this.closingUp = false;
}
}
}

class Piscina extends EventEmitterAsyncResource {
Expand Down Expand Up @@ -945,7 +1037,7 @@ class Piscina extends EventEmitterAsyncResource {
}
if (options.maxQueue !== undefined &&
options.maxQueue !== 'auto' &&
(typeof options.maxQueue !== 'number' || options.maxQueue < 0)) {
(typeof options.maxQueue !== 'number' || options.maxQueue < 0)) {
throw new TypeError('options.maxQueue must be a non-negative integer');
}
if (options.concurrentTasksPerWorker !== undefined &&
Expand Down Expand Up @@ -974,6 +1066,9 @@ class Piscina extends EventEmitterAsyncResource {
typeof options.trackUnmanagedFds !== 'boolean') {
throw new TypeError('options.trackUnmanagedFds must be a boolean value');
}
if (options.closeTimeout !== undefined && (typeof options.closeTimeout !== 'number' || options.closeTimeout < 0)) {
throw new TypeError('options.closeTimeout must be a non-negative integer');
}

this.#pool = new ThreadPool(this, options);
}
Expand Down Expand Up @@ -1055,6 +1150,24 @@ class Piscina extends EventEmitterAsyncResource {
return this.#pool.runTask(task, { transferList, filename, name, signal });
}

async close (options : CloseOptions = kDefaultCloseOptions) {
if (options === null || typeof options !== 'object') {
throw TypeError('options must be an object');
}

let { force } = options;

if (force !== undefined && typeof force !== 'boolean') {
return Promise.reject(
new TypeError('force argument must be a boolean'));
}
force ??= kDefaultCloseOptions.force;

return this.#pool.close({
force
});
}

destroy () {
return this.#pool.destroy();
}
Expand Down
93 changes: 93 additions & 0 deletions test/pool-close.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { test } from 'tap';
import Piscina from '..';
import { resolve } from 'path';
import { once } from 'events';

test('close()', async (t) => {
t.test('no pending tasks', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js') });
await pool.close();
t.pass('pool closed successfully');
});

t.test('queued tasks waits for all tasks to complete', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });

const task1 = pool.run({ time: 100 });
const task2 = pool.run({ time: 100 });

setImmediate(() => t.resolves(pool.close(), 'close is resolved when all running tasks are completed'));

await Promise.all([
t.resolves(once(pool, 'close'), 'handler is called when pool is closed'),
t.resolves(task1, 'complete running task'),
t.resolves(task2, 'complete running task')
]);
});

t.test('abort any task enqueued during closing up', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });

setImmediate(() => {
t.resolves(pool.close(), 'close is resolved when running tasks are completed');
t.rejects(pool.run({ time: 1000 }), /The task has been aborted/, 'abort any task enqueued during close');
});

await t.resolves(pool.run({ time: 100 }), 'complete running task');
});
});

test('close({force: true})', async (t) => {
t.test('queued tasks waits for all tasks already running and aborts tasks that are not started yet', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });

const task1 = pool.run({ time: 100 });
const task2 = pool.run({ time: 200 });

setImmediate(() => t.resolves(pool.close({ force: true }), 'close is resolved when all running tasks are completed'));

await Promise.all([
t.resolves(once(pool, 'close'), 'handler is called when pool is closed'),
t.resolves(task1, 'complete running task'),
t.rejects(task2, /The task has been aborted/, 'abort task that are not started yet')
]);
});

t.test('queued tasks waits for all tasks already running and aborts tasks that are not started yet', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1, concurrentTasksPerWorker: 2 });

const task1 = pool.run({ time: 500 });
const task2 = pool.run({ time: 100 });
const task3 = pool.run({ time: 100 });
const task4 = pool.run({ time: 100 });

setImmediate(() => t.resolves(pool.close({ force: true }), 'close is resolved when all running tasks are completed'));

await Promise.all([
t.resolves(once(pool, 'close'), 'handler is called when pool is closed'),
t.resolves(task1, 'complete running task'),
t.resolves(task2, 'complete running task'),
t.rejects(task3, /The task has been aborted/, 'abort task that are not started yet'),
t.rejects(task4, /The task has been aborted/, 'abort task that are not started yet')
]);
});
});

test('timed out close operation destroys the pool', async (t) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/sleep.js'),
maxThreads: 1,
closeTimeout: 500
});

const task1 = pool.run({ time: 5000 });
const task2 = pool.run({ time: 5000 });

setImmediate(() => t.resolves(pool.close(), 'close is resolved on timeout'));

await Promise.all([
t.resolves(once(pool, 'error'), 'error handler is called on timeout'),
t.rejects(task1, /Terminating worker thread/, 'task is aborted due to timeout'),
t.rejects(task2, /Terminating worker thread/, 'task is aborted due to timeout')
]);
});

0 comments on commit 5378e4c

Please sign in to comment.