diff --git a/lib/internal/cluster/primary.js b/lib/internal/cluster/primary.js index ed5b06d798868c..0c5e2bf71d75ec 100644 --- a/lib/internal/cluster/primary.js +++ b/lib/internal/cluster/primary.js @@ -42,6 +42,7 @@ cluster.SCHED_RR = SCHED_RR; // Primary distributes connections. let ids = 0; let debugPortOffset = 1; let initialized = false; +let scheduler = RoundRobinHandle.scheduler; // XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings? let schedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY; @@ -58,6 +59,19 @@ else if (process.platform === 'win32') { cluster.schedulingPolicy = schedulingPolicy; +function validateAndReturnScheduler(scheduler, schedulingPolicy) { + if (scheduler !== undefined) { + if (typeof scheduler !== 'function') { + throw new TypeError('scheduler must be a function'); + } + return scheduler; + } else if (schedulingPolicy === SCHED_RR) { + return RoundRobinHandle.scheduler; + } else if (schedulingPolicy !== SCHED_NONE) { + assert(false, `Bad cluster.schedulingPolicy: ${schedulingPolicy}`); + } +} + cluster.setupPrimary = function(options) { const settings = { args: ArrayPrototypeSlice(process.argv, 2), @@ -89,6 +103,7 @@ cluster.setupPrimary = function(options) { assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR, `Bad cluster.schedulingPolicy: ${schedulingPolicy}`); + scheduler = validateAndReturnScheduler(cluster.settings.scheduler, schedulingPolicy); process.nextTick(setupSettingsNT, settings); process.on('internalMessage', (message) => { @@ -310,6 +325,8 @@ function queryServer(worker, message) { message.addressType === 'udp6') { handle = new SharedHandle(key, address, message); } else { + // FIXME: Better structure to code + message.scheduler = scheduler; handle = new RoundRobinHandle(key, address, message); } diff --git a/lib/internal/cluster/round_robin_handle.js b/lib/internal/cluster/round_robin_handle.js index 9d242cc60ad7c1..ed77fa29743e98 100644 --- a/lib/internal/cluster/round_robin_handle.js +++ b/lib/internal/cluster/round_robin_handle.js @@ -15,12 +15,13 @@ const { constants } = internalBinding('tcp_wrap'); module.exports = RoundRobinHandle; -function RoundRobinHandle(key, address, { port, fd, flags, backlog }) { +function RoundRobinHandle(key, address, { port, fd, flags, backlog, scheduler }) { this.key = key; this.all = new SafeMap(); - this.free = new SafeMap(); + this.workers = new SafeMap(); this.handles = init(ObjectCreate(null)); this.handle = null; + this.scheduler = scheduler; this.server = net.createServer(assert.fail); if (fd >= 0) @@ -47,6 +48,7 @@ function RoundRobinHandle(key, address, { port, fd, flags, backlog }) { RoundRobinHandle.prototype.add = function(worker, send) { assert(this.all.has(worker.id) === false); this.all.set(worker.id, worker); + this.workers.set(worker.id, worker); const done = () => { if (this.handle.getsockname) { @@ -58,7 +60,7 @@ RoundRobinHandle.prototype.add = function(worker, send) { send(null, null, null); // UNIX socket. } - this.handoff(worker); // In case there are connections pending. + this.handoff(); // In case there are connections pending. }; if (this.server === null) @@ -77,7 +79,7 @@ RoundRobinHandle.prototype.remove = function(worker) { if (!existed) return false; - this.free.delete(worker.id); + this.workers.delete(worker.id); if (this.all.size !== 0) return false; @@ -95,28 +97,32 @@ RoundRobinHandle.prototype.remove = function(worker) { RoundRobinHandle.prototype.distribute = function(err, handle) { append(this.handles, handle); - // eslint-disable-next-line node-core/no-array-destructuring - const [ workerEntry ] = this.free; // this.free is a SafeMap + this.handoff(); +}; + +RoundRobinHandle.scheduler = function(workers) { + const [ workerEntry ] = workers; if (ArrayIsArray(workerEntry)) { const { 0: workerId, 1: worker } = workerEntry; - this.free.delete(workerId); - this.handoff(worker); + workers.delete(workerId); + workers.set(workerId, worker) + return worker; } }; -RoundRobinHandle.prototype.handoff = function(worker) { - if (!this.all.has(worker.id)) { - return; // Worker is closing (or has closed) the server. - } - +RoundRobinHandle.prototype.handoff = function () { const handle = peek(this.handles); if (handle === null) { - this.free.set(worker.id, worker); // Add to ready queue again. return; } + const worker = this.scheduler(this.workers); + if (typeof worker === 'undefined' || !this.all.has(worker.id)) { + return; // Worker is closing (or has closed) the server. + } + remove(handle); const message = { act: 'newconn', key: this.key }; @@ -127,6 +133,6 @@ RoundRobinHandle.prototype.handoff = function(worker) { else this.distribute(0, handle); // Worker is shutting down. Send to another. - this.handoff(worker); + this.handoff(); }); };