Skip to content

Commit

Permalink
add: scheduler config option for cluster
Browse files Browse the repository at this point in the history
scheduler let's use a custom scheduler function for scheduling workers.
  • Loading branch information
yashLadha committed Mar 12, 2022
1 parent 457567f commit 645b540
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 15 deletions.
17 changes: 17 additions & 0 deletions lib/internal/cluster/primary.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ cluster.SCHED_RR = SCHED_RR; // Primary distributes connections.
let ids = 0;
let debugPortOffset = 1;
let initialized = false;
let scheduler = RoundRobinHandle.scheduler;

// XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings?
let schedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY;
Expand All @@ -58,6 +59,19 @@ else if (process.platform === 'win32') {

cluster.schedulingPolicy = schedulingPolicy;

function validateAndReturnScheduler(scheduler, schedulingPolicy) {
if (scheduler !== undefined) {
if (typeof scheduler !== 'function') {
throw new TypeError('scheduler must be a function');
}
return scheduler;
} else if (schedulingPolicy === SCHED_RR) {
return RoundRobinHandle.scheduler;
} else if (schedulingPolicy !== SCHED_NONE) {
assert(false, `Bad cluster.schedulingPolicy: ${schedulingPolicy}`);
}
}

cluster.setupPrimary = function(options) {
const settings = {
args: ArrayPrototypeSlice(process.argv, 2),
Expand Down Expand Up @@ -89,6 +103,7 @@ cluster.setupPrimary = function(options) {
assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
`Bad cluster.schedulingPolicy: ${schedulingPolicy}`);

scheduler = validateAndReturnScheduler(cluster.settings.scheduler, schedulingPolicy);
process.nextTick(setupSettingsNT, settings);

process.on('internalMessage', (message) => {
Expand Down Expand Up @@ -310,6 +325,8 @@ function queryServer(worker, message) {
message.addressType === 'udp6') {
handle = new SharedHandle(key, address, message);
} else {
// FIXME: Better structure to code
message.scheduler = scheduler;
handle = new RoundRobinHandle(key, address, message);
}

Expand Down
36 changes: 21 additions & 15 deletions lib/internal/cluster/round_robin_handle.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ const { constants } = internalBinding('tcp_wrap');

module.exports = RoundRobinHandle;

function RoundRobinHandle(key, address, { port, fd, flags, backlog }) {
function RoundRobinHandle(key, address, { port, fd, flags, backlog, scheduler }) {
this.key = key;
this.all = new SafeMap();
this.free = new SafeMap();
this.workers = new SafeMap();
this.handles = init(ObjectCreate(null));
this.handle = null;
this.scheduler = scheduler;
this.server = net.createServer(assert.fail);

if (fd >= 0)
Expand All @@ -47,6 +48,7 @@ function RoundRobinHandle(key, address, { port, fd, flags, backlog }) {
RoundRobinHandle.prototype.add = function(worker, send) {
assert(this.all.has(worker.id) === false);
this.all.set(worker.id, worker);
this.workers.set(worker.id, worker);

const done = () => {
if (this.handle.getsockname) {
Expand All @@ -58,7 +60,7 @@ RoundRobinHandle.prototype.add = function(worker, send) {
send(null, null, null); // UNIX socket.
}

this.handoff(worker); // In case there are connections pending.
this.handoff(); // In case there are connections pending.
};

if (this.server === null)
Expand All @@ -77,7 +79,7 @@ RoundRobinHandle.prototype.remove = function(worker) {
if (!existed)
return false;

this.free.delete(worker.id);
this.workers.delete(worker.id);

if (this.all.size !== 0)
return false;
Expand All @@ -95,28 +97,32 @@ RoundRobinHandle.prototype.remove = function(worker) {

RoundRobinHandle.prototype.distribute = function(err, handle) {
append(this.handles, handle);
// eslint-disable-next-line node-core/no-array-destructuring
const [ workerEntry ] = this.free; // this.free is a SafeMap
this.handoff();
};

RoundRobinHandle.scheduler = function(workers) {
const [ workerEntry ] = workers;

if (ArrayIsArray(workerEntry)) {
const { 0: workerId, 1: worker } = workerEntry;
this.free.delete(workerId);
this.handoff(worker);
workers.delete(workerId);
workers.set(workerId, worker)
return worker;
}
};

RoundRobinHandle.prototype.handoff = function(worker) {
if (!this.all.has(worker.id)) {
return; // Worker is closing (or has closed) the server.
}

RoundRobinHandle.prototype.handoff = function () {
const handle = peek(this.handles);

if (handle === null) {
this.free.set(worker.id, worker); // Add to ready queue again.
return;
}

const worker = this.scheduler(this.workers);
if (typeof worker === 'undefined' || !this.all.has(worker.id)) {
return; // Worker is closing (or has closed) the server.
}

remove(handle);

const message = { act: 'newconn', key: this.key };
Expand All @@ -127,6 +133,6 @@ RoundRobinHandle.prototype.handoff = function(worker) {
else
this.distribute(0, handle); // Worker is shutting down. Send to another.

this.handoff(worker);
this.handoff();
});
};

0 comments on commit 645b540

Please sign in to comment.