Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spawn a new worker when an old one is stopped #62

Merged
merged 6 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
build:
strategy:
matrix:
node-version: [12.x, 16.x, 18.x, 20.x]
node-version: [14.x, 16.x, 18.x, 20.x]
platform: [ubuntu-latest, macos-latest, windows-latest]

runs-on: ${{ matrix.platform }}
Expand Down
9 changes: 7 additions & 2 deletions e2e/call-timeout.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@ test('should kill the worker that was involved in processing the task', async t
await workerNodes.ready();

// when
await t.throwsAsync(workerNodes.call.task500ms, { instanceOf: errors.TimeoutError });
let executingWorkerId;
await t.throwsAsync(async () => {
const p = workerNodes.call.task500ms();
workerNodes.workersQueue.storage.forEach(worker => executingWorkerId = worker.id);
await p;
}, { instanceOf: errors.TimeoutError });
await wait(200);

// then
t.is(workerNodes.workersQueue.storage.length, 0);
t.is(workerNodes.workersQueue.storage.filter(worker => worker.id === executingWorkerId).length, 0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion wasn't entirely accurate IMO, since

should kill the worker that was involved in processing the task

Previous assertion checks that pool is empty, while the correct assertion is that the worker in question was replaced.

});

test('should result with rejection of all the calls that the worker was processing at the moment', async t => {
Expand Down
45 changes: 45 additions & 0 deletions e2e/spawn-new-workers-on-stop.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
const test = require('ava');

const WorkerNodes = require('..');
const { fixture, repeatCall, eventually } = require('./utils');


test('should spawn new workers when old workers exit even if no items in the queue', async (t) => {
// given
const maxWorkers = 4;
const minWorkers = 2;
const workerNodes = new WorkerNodes(fixture('process-info'), { maxWorkers, minWorkers, workerEndurance: 1, autoStart: true });

await workerNodes.ready();

const operationWorkersCountBefore = workerNodes.workersQueue.filter((worker) => worker.isOperational()).length;
t.is(operationWorkersCountBefore, minWorkers);

// when
await repeatCall(workerNodes.call.noop, maxWorkers);
t.is(workerNodes.workersQueue.filter((worker) => worker.isOperational()).length, 0);


const getOperationalWorkersCount = () => workerNodes.workersQueue.filter((worker) => worker.isOperational()).length;

// then
// we're waiting to all workers to be operational, the time for that might variate greatly between machines (Developer machine vs CI machine)
await eventually(() => getOperationalWorkersCount() === maxWorkers);
t.is(getOperationalWorkersCount(), maxWorkers);
});


test('should shutdown fine', async (t) => {
// given
const maxWorkers = 4;
const minWorkers = 2;
const workerNodes = new WorkerNodes(fixture('process-info'), { maxWorkers, minWorkers, workerEndurance: 1, autoStart: true });

await workerNodes.ready();
await t.notThrowsAsync(() => workerNodes.terminate());

const getAliveWorkersCount = () => workerNodes.workersQueue.filter((worker) => worker.isProcessAlive).length;

await eventually(() => getAliveWorkersCount() === 0);
t.is(getAliveWorkersCount(), 0);
});
37 changes: 36 additions & 1 deletion e2e/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,39 @@ module.exports.unique = elements => [...new Set(elements)];

module.exports.repeatCall = (call, count) => Promise.all(new Array(count).fill().map(async () => await call()));

module.exports.wait = delay => new Promise(resolve => setTimeout(resolve, delay));
module.exports.wait = delay => new Promise(resolve => setTimeout(resolve, delay));

function getPromiseDescriptor() {
let promiseResolve, promiseReject
const promise = new Promise((resolve, reject) => {
promiseResolve = resolve;
promiseReject = reject
});

return {
promise,
resolve: promiseResolve,
reject: promiseReject,
};
}

module.exports.eventually = async (predicate, timeout = 5000) => {
const promiseDescriptor = getPromiseDescriptor();
const startTime = Date.now();

do {
try {
const result = await predicate();
if (result) {
promiseDescriptor.resolve(result);
}
} catch (ex) {
console.warn(`eventually(): provided predicate throwed: ${ex.message}`);
}
await this.wait(100);
} while (Date.now() < startTime + timeout);

promiseDescriptor.reject(new Error(`Timed out waiting ${timeout}ms for predicate`));

return promiseDescriptor.promise;
}
6 changes: 5 additions & 1 deletion lib/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ class WorkerNodes extends EventEmitter {

this.workersQueue.remove(worker);

setImmediate(() => {
if (this.canStartWorker()) this.startWorker();
});

this.processQueue();
}

Expand All @@ -139,7 +143,7 @@ class WorkerNodes extends EventEmitter {
* @returns {boolean} true if it's possible to spawn a new worker
*/
canStartWorker() {
return !this.workersQueue.isFull();
return !this.isTerminationStarted && !this.workersQueue.isFull();
}

/**
Expand Down