diff --git a/README.md b/README.md index de800369..68c57fbd 100644 --- a/README.md +++ b/README.md @@ -356,6 +356,8 @@ This class extends [`EventEmitter`][] from Node.js. `fs.close()`, and will close them automatically when the Worker exits. Defaults to `true`. (This option is only supported on Node.js 12.19+ and all Node.js versions higher than 14.6.0). + * `closeTimeout`: (`number`) An optional time (in milliseconds) to wait for the pool to + complete all in-flight tasks when `close()` is called. The default is `30000` Use caution when setting resource limits. Setting limits that are too low may result in the `Piscina` worker threads being unusable. @@ -420,6 +422,21 @@ Stops all Workers and rejects all `Promise`s for pending tasks. This returns a `Promise` that is fulfilled once all threads have stopped. +### Method: `close([options])` + +* `options`: + * `force`: A `boolean` value that indicates whether to abort all tasks that + are enqueued but not started yet. The default is `false`. + +Stops all Workers gracefully. + +This returns a `Promise` that is fulfilled once all tasks that were started +have completed and all threads have stopped. + +This method is similar to `destroy()`, but with the difference that `close()` +will wait for the worker tasks to finish, while `destroy()` +will abort them immediately. + ### Event: `'error'` An `'error'` event is emitted by instances of this class when: @@ -440,7 +457,7 @@ A `'drain'` event is emitted whenever the `queueSize` reaches `0`. Similar to [`Piscina#needsDrain`](#property-needsdrain-readonly); this event is triggered once the total capacity of the pool is exceeded -by number of tasks enequeued that are pending of execution. +by number of tasks enqueued that are pending of execution. ### Event: `'message'` diff --git a/src/index.ts b/src/index.ts index 89b67df9..38ad7780 100644 --- a/src/index.ts +++ b/src/index.ts @@ -31,6 +31,7 @@ import { kValue } from './common'; import { version } from '../package.json'; +import { setTimeout as sleep } from 'timers/promises'; const cpuCount : number = (() => { try { @@ -119,6 +120,7 @@ interface Options { taskQueue? : TaskQueue, niceIncrement? : number, trackUnmanagedFds? : boolean, + closeTimeout?: number } interface FilledOptions extends Options { @@ -131,7 +133,8 @@ interface FilledOptions extends Options { concurrentTasksPerWorker : number, useAtomics: boolean, taskQueue : TaskQueue, - niceIncrement : number + niceIncrement : number, + closeTimeout : number } const kDefaultOptions : FilledOptions = { @@ -145,7 +148,8 @@ const kDefaultOptions : FilledOptions = { useAtomics: true, taskQueue: new ArrayTaskQueue(), niceIncrement: 0, - trackUnmanagedFds: true + trackUnmanagedFds: true, + closeTimeout: 30000 }; interface RunOptions { @@ -169,6 +173,14 @@ const kDefaultRunOptions : FilledRunOptions = { name: null }; +interface CloseOptions { + force?: boolean, +} + +const kDefaultCloseOptions : Required = { + force: false +}; + class DirectlyTransferable implements Transferable { #value : object; constructor (value : object) { @@ -386,7 +398,9 @@ const Errors = { TaskQueueAtLimit: () => new Error('Task queue is at limit'), NoTaskQueueAvailable: - () => new Error('No task queue available and all Workers are busy') + () => new Error('No task queue available and all Workers are busy'), + CloseTimeout: + () => new Error('Close operation timed out') }; class WorkerInfo extends AsynchronouslyCreatedResource { @@ -525,6 +539,7 @@ class ThreadPool { start : number = performance.now(); inProcessPendingMessages : boolean = false; startingUp : boolean = false; + closingUp : boolean = false; workerFailsDuringBootstrap : boolean = false; constructor (publicInterface : Piscina, options : Options) { @@ -562,6 +577,9 @@ class ThreadPool { } _ensureMinimumWorkers () : void { + if (this.closingUp) { + return; + } while (this.workers.size < this.options.minThreads) { this._addNewWorker(); } @@ -751,8 +769,7 @@ class ThreadPool { name } = options; const { - transferList = [], - signal = null + transferList = [] } = options; if (filename == null) { filename = this.options.filename; @@ -765,6 +782,16 @@ class ThreadPool { } filename = maybeFileURLToPath(filename); + let signal: AbortSignalAny | null; + if (this.closingUp) { + const closingUpAbortController = new AbortController(); + closingUpAbortController.abort('queue is closing up'); + + signal = closingUpAbortController.signal; + } else { + signal = options.signal ?? null; + } + let resolve : (result : any) => void; let reject : (err : Error) => void; // eslint-disable-next-line @@ -913,6 +940,71 @@ class ThreadPool { await Promise.all(exitEvents); } + + async close (options : Required) { + this.closingUp = true; + + if (options.force) { + const skipQueueLength = this.skipQueue.length; + for (let i = 0; i < skipQueueLength; i++) { + const taskInfo : TaskInfo = this.skipQueue.shift() as TaskInfo; + if (taskInfo.workerInfo === null) { + taskInfo.done(new AbortError()); + } else { + this.skipQueue.push(taskInfo); + } + } + + const taskQueueLength = this.taskQueue.size; + for (let i = 0; i < taskQueueLength; i++) { + const taskInfo : TaskInfo = this.taskQueue.shift() as TaskInfo; + if (taskInfo.workerInfo === null) { + taskInfo.done(new AbortError()); + } else { + this.taskQueue.push(taskInfo); + } + } + } + + const onPoolFlushed = () => new Promise((resolve) => { + const numberOfWorkers = this.workers.size; + let numberOfWorkersDone = 0; + + const checkIfWorkerIsDone = (workerInfo: WorkerInfo) => { + if (workerInfo.taskInfos.size === 0) { + numberOfWorkersDone++; + } + + if (numberOfWorkers === numberOfWorkersDone) { + resolve(); + } + }; + + for (const workerInfo of this.workers) { + checkIfWorkerIsDone(workerInfo); + + workerInfo.port.on('message', () => checkIfWorkerIsDone(workerInfo)); + } + }); + + const throwOnTimeOut = async (timeout: number) => { + await sleep(timeout); + throw Errors.CloseTimeout(); + }; + + try { + await Promise.race([ + onPoolFlushed(), + throwOnTimeOut(this.options.closeTimeout) + ]); + } catch (error) { + this.publicInterface.emit('error', error); + } finally { + await this.destroy(); + this.publicInterface.emit('close'); + this.closingUp = false; + } + } } class Piscina extends EventEmitterAsyncResource { @@ -945,7 +1037,7 @@ class Piscina extends EventEmitterAsyncResource { } if (options.maxQueue !== undefined && options.maxQueue !== 'auto' && - (typeof options.maxQueue !== 'number' || options.maxQueue < 0)) { + (typeof options.maxQueue !== 'number' || options.maxQueue < 0)) { throw new TypeError('options.maxQueue must be a non-negative integer'); } if (options.concurrentTasksPerWorker !== undefined && @@ -974,6 +1066,9 @@ class Piscina extends EventEmitterAsyncResource { typeof options.trackUnmanagedFds !== 'boolean') { throw new TypeError('options.trackUnmanagedFds must be a boolean value'); } + if (options.closeTimeout !== undefined && (typeof options.closeTimeout !== 'number' || options.closeTimeout < 0)) { + throw new TypeError('options.closeTimeout must be a non-negative integer'); + } this.#pool = new ThreadPool(this, options); } @@ -1055,6 +1150,24 @@ class Piscina extends EventEmitterAsyncResource { return this.#pool.runTask(task, { transferList, filename, name, signal }); } + async close (options : CloseOptions = kDefaultCloseOptions) { + if (options === null || typeof options !== 'object') { + throw TypeError('options must be an object'); + } + + let { force } = options; + + if (force !== undefined && typeof force !== 'boolean') { + return Promise.reject( + new TypeError('force argument must be a boolean')); + } + force ??= kDefaultCloseOptions.force; + + return this.#pool.close({ + force + }); + } + destroy () { return this.#pool.destroy(); } diff --git a/test/pool-close.ts b/test/pool-close.ts new file mode 100644 index 00000000..c6734440 --- /dev/null +++ b/test/pool-close.ts @@ -0,0 +1,93 @@ +import { test } from 'tap'; +import Piscina from '..'; +import { resolve } from 'path'; +import { once } from 'events'; + +test('close()', async (t) => { + t.test('no pending tasks', async (t) => { + const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js') }); + await pool.close(); + t.pass('pool closed successfully'); + }); + + t.test('queued tasks waits for all tasks to complete', async (t) => { + const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 }); + + const task1 = pool.run({ time: 100 }); + const task2 = pool.run({ time: 100 }); + + setImmediate(() => t.resolves(pool.close(), 'close is resolved when all running tasks are completed')); + + await Promise.all([ + t.resolves(once(pool, 'close'), 'handler is called when pool is closed'), + t.resolves(task1, 'complete running task'), + t.resolves(task2, 'complete running task') + ]); + }); + + t.test('abort any task enqueued during closing up', async (t) => { + const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 }); + + setImmediate(() => { + t.resolves(pool.close(), 'close is resolved when running tasks are completed'); + t.rejects(pool.run({ time: 1000 }), /The task has been aborted/, 'abort any task enqueued during close'); + }); + + await t.resolves(pool.run({ time: 100 }), 'complete running task'); + }); +}); + +test('close({force: true})', async (t) => { + t.test('queued tasks waits for all tasks already running and aborts tasks that are not started yet', async (t) => { + const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 }); + + const task1 = pool.run({ time: 100 }); + const task2 = pool.run({ time: 200 }); + + setImmediate(() => t.resolves(pool.close({ force: true }), 'close is resolved when all running tasks are completed')); + + await Promise.all([ + t.resolves(once(pool, 'close'), 'handler is called when pool is closed'), + t.resolves(task1, 'complete running task'), + t.rejects(task2, /The task has been aborted/, 'abort task that are not started yet') + ]); + }); + + t.test('queued tasks waits for all tasks already running and aborts tasks that are not started yet', async (t) => { + const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1, concurrentTasksPerWorker: 2 }); + + const task1 = pool.run({ time: 500 }); + const task2 = pool.run({ time: 100 }); + const task3 = pool.run({ time: 100 }); + const task4 = pool.run({ time: 100 }); + + setImmediate(() => t.resolves(pool.close({ force: true }), 'close is resolved when all running tasks are completed')); + + await Promise.all([ + t.resolves(once(pool, 'close'), 'handler is called when pool is closed'), + t.resolves(task1, 'complete running task'), + t.resolves(task2, 'complete running task'), + t.rejects(task3, /The task has been aborted/, 'abort task that are not started yet'), + t.rejects(task4, /The task has been aborted/, 'abort task that are not started yet') + ]); + }); +}); + +test('timed out close operation destroys the pool', async (t) => { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures/sleep.js'), + maxThreads: 1, + closeTimeout: 500 + }); + + const task1 = pool.run({ time: 5000 }); + const task2 = pool.run({ time: 5000 }); + + setImmediate(() => t.resolves(pool.close(), 'close is resolved on timeout')); + + await Promise.all([ + t.resolves(once(pool, 'error'), 'error handler is called on timeout'), + t.rejects(task1, /Terminating worker thread/, 'task is aborted due to timeout'), + t.rejects(task2, /Terminating worker thread/, 'task is aborted due to timeout') + ]); +});