From b64a2033cce76e43e93cd715169ef79460a56385 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Mon, 22 Jan 2024 14:35:59 -0800 Subject: [PATCH 01/19] Draft interface for priority-queuing feature Adds a second optional argument `nice` to the interface definitions for `acquire` and related functions. This is a non-compiling change created for discussion purposes. Not to be merged until the feature is complete. Some functions now have two optional arguments, making the interface a bit clunky. Consider altering it to accept a configuration object next time a breaking change is released. --- README.md | 19 ++++++++++++++++--- src/MutexInterface.ts | 6 +++--- src/SemaphoreInterface.ts | 6 +++--- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 2095172..3335b8e 100644 --- a/README.md +++ b/README.md @@ -291,10 +291,14 @@ the semaphore is released. `runExclusive` returns a promise that adopts the stat The semaphore is released and the result rejected if an exception occurs during execution of the callback. -`runExclusive` accepts an optional argument `weight`. Specifying a `weight` will decrement the +`runExclusive` accepts a first optional argument `weight`. Specifying a `weight` will decrement the semaphore by the specified value, and the callback will only be invoked once the semaphore's value greater or equal to `weight`. +`runExclusive` accepts a second optional argument `nice`. Specifying a higher `nice` value will +cause the task to be scheduled after tasks with a lower `nice` value and before tasks with a higher +`nice` value. `nice` can be negative and the default is zero. + ### Manual locking / releasing Promise style: @@ -328,10 +332,14 @@ has completed. The `release` callback is idempotent. likely deadlock the application. Make sure to call `release` under all circumstances and handle exceptions accordingly. -`runExclusive` accepts an optional argument `weight`. Specifying a `weight` will decrement the -semaphore by the specified value, and the semaphore will only be acquired once the its +`runExclusive` accepts a first optional argument `weight`. Specifying a `weight` will decrement the +semaphore by the specified value, and the semaphore will only be acquired once its value is greater or equal to `weight`. +`runExclusive` accepts a second optional argument `nice`. Specifying a higher `nice` value will +cause the task to be scheduled after tasks with a lower `nice` value and before tasks with a higher +`nice` value. `nice` can be negative and the default is zero. + ### Unscoped release As an alternative to calling the `release` callback returned by `acquire`, the semaphore @@ -447,6 +455,11 @@ await semaphore.waitForUnlock(); `waitForUnlock` accepts an optional argument `weight`. If `weight` is specified the promise will only resolve once the semaphore's value is greater or equal to `weight`; +`waitForUnlock` accepts a second optional argument `nice`. Specifying a higher `nice` value will +cause the promise to resolve after tasks with a lower `nice` value and before tasks with a higher +`nice` value. `nice` can be negative and the default is zero. + + ## Limiting the time waiting for a mutex or semaphore to become available Sometimes it is desirable to limit the time a program waits for a mutex or diff --git a/src/MutexInterface.ts b/src/MutexInterface.ts index 31e1a3e..10346de 100644 --- a/src/MutexInterface.ts +++ b/src/MutexInterface.ts @@ -1,9 +1,9 @@ interface MutexInterface { - acquire(): Promise; + acquire(nice?: number): Promise; - runExclusive(callback: MutexInterface.Worker): Promise; + runExclusive(callback: MutexInterface.Worker, nice?: number): Promise; - waitForUnlock(): Promise; + waitForUnlock(nice?: number): Promise; isLocked(): boolean; diff --git a/src/SemaphoreInterface.ts b/src/SemaphoreInterface.ts index 6398c24..7c1e2e0 100644 --- a/src/SemaphoreInterface.ts +++ b/src/SemaphoreInterface.ts @@ -1,9 +1,9 @@ interface SemaphoreInterface { - acquire(weight?: number): Promise<[number, SemaphoreInterface.Releaser]>; + acquire(weight?: number, nice?: number): Promise<[number, SemaphoreInterface.Releaser]>; - runExclusive(callback: SemaphoreInterface.Worker, weight?: number): Promise; + runExclusive(callback: SemaphoreInterface.Worker, weight?: number, nice?: number): Promise; - waitForUnlock(weight?: number): Promise; + waitForUnlock(weight?: number, nice?: number): Promise; isLocked(): boolean; From cdb2725eba525d3ec50bee517ff9422ee79b227a Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Mon, 22 Jan 2024 19:32:08 -0800 Subject: [PATCH 02/19] Stub unit tests for priority queue --- test/mutex.ts | 6 ++++++ test/semaphoreSuite.ts | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/test/mutex.ts b/test/mutex.ts index 241aafc..c40e102 100644 --- a/test/mutex.ts +++ b/test/mutex.ts @@ -36,6 +36,8 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo assert(flag); })); + test('acquire unblocks the nicest waiter last'); + test('runExclusive passes result (immediate)', async () => { assert.strictEqual(await mutex.runExclusive(() => 10), 10); }); @@ -81,6 +83,8 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo assert(flag); })); + test('runExclusive unblocks the nicest waiters last'); + test('exceptions during runExclusive do not leave mutex locked', async () => { let flag = false; @@ -265,6 +269,8 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo assert.strictEqual(flag, true); }); + + test('waitForUnlock unblocks the nicest waiters last'); }; suite('Mutex', () => mutexSuite((e) => new Mutex(e))); diff --git a/test/semaphoreSuite.ts b/test/semaphoreSuite.ts index 4c9bc6c..909f945 100644 --- a/test/semaphoreSuite.ts +++ b/test/semaphoreSuite.ts @@ -50,6 +50,8 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.deepStrictEqual(values.sort(), [2, 2]); }); + test('acquire unblocks the nicest waiters last'); + test('acquire blocks when the semaphore has reached zero until it is released again', async () => { const values: Array = []; @@ -232,6 +234,8 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.strictEqual(semaphore.getValue(), 2); }); + test('runExclusive executes the nicest waiters last'); + test('new semaphore is unlocked', () => { assert(!semaphore.isLocked()); }); @@ -441,6 +445,8 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.deepStrictEqual([flag1, flag2], [true, true]); }); + test('waitForUnlock unblocks the nicest waiters last'); + test('waitForUnlock only unblocks when the semaphore can actually be acquired again', async () => { semaphore.acquire(2); semaphore.acquire(2); From f40a4650d18fd720160af676b99bcbc12013c898 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Mon, 22 Jan 2024 21:08:47 -0800 Subject: [PATCH 03/19] Add unused nice variable 'nice' to function definitions --- src/Mutex.ts | 6 +++--- src/Semaphore.ts | 15 +++++++++++++-- src/withTimeout.ts | 4 ++-- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/Mutex.ts b/src/Mutex.ts index 5298c22..341a008 100644 --- a/src/Mutex.ts +++ b/src/Mutex.ts @@ -6,13 +6,13 @@ class Mutex implements MutexInterface { this._semaphore = new Semaphore(1, cancelError); } - async acquire(): Promise { + async acquire(nice = 0): Promise { const [, releaser] = await this._semaphore.acquire(); return releaser; } - runExclusive(callback: MutexInterface.Worker): Promise { + runExclusive(callback: MutexInterface.Worker, nice = 0): Promise { return this._semaphore.runExclusive(() => callback()); } @@ -20,7 +20,7 @@ class Mutex implements MutexInterface { return this._semaphore.isLocked(); } - waitForUnlock(): Promise { + waitForUnlock(nice = 0): Promise { return this._semaphore.waitForUnlock(); } diff --git a/src/Semaphore.ts b/src/Semaphore.ts index 90ae148..5c08e8a 100644 --- a/src/Semaphore.ts +++ b/src/Semaphore.ts @@ -9,7 +9,7 @@ interface QueueEntry { class Semaphore implements SemaphoreInterface { constructor(private _value: number, private _cancelError: Error = E_CANCELED) {} - acquire(weight = 1): Promise<[number, SemaphoreInterface.Releaser]> { + acquire(weight = 1, nice = 0): Promise<[number, SemaphoreInterface.Releaser]> { if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); return new Promise((resolve, reject) => { @@ -30,7 +30,7 @@ class Semaphore implements SemaphoreInterface { } } - waitForUnlock(weight = 1): Promise { + waitForUnlock(weight = 1, nice = 0): Promise { if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); return new Promise((resolve) => { @@ -107,4 +107,15 @@ class Semaphore implements SemaphoreInterface { private _weightedWaiters: Array void>> = []; } +function insertSorted(a: T[], v: T): void { + console.log(`insertSorted; a = ${JSON.stringify(a.map(o => o.nice))}; nice = ${v.nice}`); + const i = a.findIndex((other) => v.nice < other.nice); + console.log(`i = ${i}`); + if (i === -1) { + a.push(v); + } else { + a.splice(i, 0, v); + } +} + export default Semaphore; diff --git a/src/withTimeout.ts b/src/withTimeout.ts index 4e75352..c5af0fa 100644 --- a/src/withTimeout.ts +++ b/src/withTimeout.ts @@ -7,7 +7,7 @@ export function withTimeout(mutex: MutexInterface, timeout: number, timeoutError export function withTimeout(semaphore: SemaphoreInterface, timeout: number, timeoutError?: Error): SemaphoreInterface; export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: number, timeoutError = E_TIMEOUT): any { return { - acquire: (weight?: number): Promise => { + acquire: (weight?: number, nice?: number): Promise => { if (weight !== undefined && weight <= 0) { throw new Error(`invalid weight ${weight}: must be positive`); } @@ -21,7 +21,7 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: }, timeout); try { - const ticket = await sync.acquire(weight); + const ticket = await sync.acquire(weight, nice); if (isTimeout) { const release = Array.isArray(ticket) ? ticket[1] : ticket; From f50ee83674603c1ff51a193d2b5af43dc50078b5 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Mon, 22 Jan 2024 21:09:15 -0800 Subject: [PATCH 04/19] Implement basic queuing for 'Semaphore.acquire' --- src/Semaphore.ts | 11 ++++++++--- test/semaphoreSuite.ts | 27 ++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/Semaphore.ts b/src/Semaphore.ts index 5c08e8a..387d324 100644 --- a/src/Semaphore.ts +++ b/src/Semaphore.ts @@ -1,9 +1,15 @@ import { E_CANCELED } from './errors'; import SemaphoreInterface from './SemaphoreInterface'; + +interface Nice { + nice: number; +} + interface QueueEntry { resolve(result: [number, SemaphoreInterface.Releaser]): void; reject(error: unknown): void; + nice: number; } class Semaphore implements SemaphoreInterface { @@ -14,13 +20,12 @@ class Semaphore implements SemaphoreInterface { return new Promise((resolve, reject) => { if (!this._weightedQueues[weight - 1]) this._weightedQueues[weight - 1] = []; - this._weightedQueues[weight - 1].push({ resolve, reject }); - + insertSorted(this._weightedQueues[weight - 1], { resolve, reject, nice }); this._dispatch(); }); } - async runExclusive(callback: SemaphoreInterface.Worker, weight = 1): Promise { + async runExclusive(callback: SemaphoreInterface.Worker, weight = 1, nice = 0): Promise { const [value, release] = await this.acquire(weight); try { diff --git a/test/semaphoreSuite.ts b/test/semaphoreSuite.ts index 909f945..d26e80d 100644 --- a/test/semaphoreSuite.ts +++ b/test/semaphoreSuite.ts @@ -50,7 +50,32 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.deepStrictEqual(values.sort(), [2, 2]); }); - test('acquire unblocks the nicest waiters last'); + test('acquire unblocks the nicest waiters last', async () => { + const values: Array = []; + + // nice=0; runs first because nothing else is waiting + semaphore.acquire(2, 0).then(([, release]) => { + values.push(0); + setTimeout(release, 100); + }); + + // nice=1; queues first + semaphore.acquire(2, 1).then(([, release]) => { + values.push(1); + setTimeout(release, 100); + }); + + // nice=-1; jumps ahead of nice=+1 + semaphore.acquire(2, -1).then(([, release]) => { + values.push(-1); + setTimeout(release, 100); + }); + + await clock.runAllAsync(); + assert.deepStrictEqual(values, [0, -1, 1]); + }); + + test('acquire de-prioritizes nice waiters even if they are lighter'); test('acquire blocks when the semaphore has reached zero until it is released again', async () => { const values: Array = []; From 3e34bcc89c1ef9197f9862c36f652637f68e2db6 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Mon, 22 Jan 2024 22:13:22 -0800 Subject: [PATCH 05/19] Use priority instead of weight to determine execution order The previous commit made all tasks of the same weight execute in order of niceness, but lighter items were still executed first. That obviously defeats the purpose of having a priority parameter. Now, the least nice items are scheduled first. At the same priority level, execution order is now FIFO. Weight is no longer a factor in execution order. The execution queue is now implemented using a single JavaScript array, rather than an array of arrays by weight. Performance will be noticeably worse when scheduling a very (very) large number of tasks, especially when they have diverse weights, but probably better with a large number of semaphores with few tasks due to fewer array allocations. If there are users for whom this is a concern, a good heap-based priority queue could be added as a dependency. --- src/Semaphore.ts | 64 ++++++++++++++++++------------------------ test/semaphoreSuite.ts | 31 ++++++++++++++++++-- 2 files changed, 56 insertions(+), 39 deletions(-) diff --git a/src/Semaphore.ts b/src/Semaphore.ts index 387d324..f3d8d4e 100644 --- a/src/Semaphore.ts +++ b/src/Semaphore.ts @@ -2,13 +2,10 @@ import { E_CANCELED } from './errors'; import SemaphoreInterface from './SemaphoreInterface'; -interface Nice { - nice: number; -} - interface QueueEntry { resolve(result: [number, SemaphoreInterface.Releaser]): void; reject(error: unknown): void; + weight: number; nice: number; } @@ -19,9 +16,17 @@ class Semaphore implements SemaphoreInterface { if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); return new Promise((resolve, reject) => { - if (!this._weightedQueues[weight - 1]) this._weightedQueues[weight - 1] = []; - insertSorted(this._weightedQueues[weight - 1], { resolve, reject, nice }); - this._dispatch(); + const task = { resolve, reject, weight, nice }; + const i = this._queue.findIndex((other) => nice < other.nice); + if (i === 0 && weight <= this._value) { + // Needs immediate dispatch, skip the queue + this._dispatchItem(task); + } else if (i === -1) { + this._queue.push(task); + } else { + this._queue.splice(i, 0, task); + } + this._dispatchQueue(); }); } @@ -42,7 +47,7 @@ class Semaphore implements SemaphoreInterface { if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = []; this._weightedWaiters[weight - 1].push(resolve); - this._dispatch(); + this._dispatchQueue(); }); } @@ -56,38 +61,34 @@ class Semaphore implements SemaphoreInterface { setValue(value: number): void { this._value = value; - this._dispatch(); + this._dispatchQueue(); } release(weight = 1): void { if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); this._value += weight; - this._dispatch(); + this._dispatchQueue(); } cancel(): void { - this._weightedQueues.forEach((queue) => queue.forEach((entry) => entry.reject(this._cancelError))); - this._weightedQueues = []; + this._queue.forEach((entry) => entry.reject(this._cancelError)); + this._queue = []; } - private _dispatch(): void { - for (let weight = this._value; weight > 0; weight--) { - const queueEntry = this._weightedQueues[weight - 1]?.shift(); - if (!queueEntry) continue; - - const previousValue = this._value; - const previousWeight = weight; - - this._value -= weight; - weight = this._value + 1; - - queueEntry.resolve([previousValue, this._newReleaser(previousWeight)]); + private _dispatchQueue(): void { + while (this._queue.length > 0 && this._queue[0].weight <= this._value) { + this._dispatchItem(this._queue.shift()!); } - this._drainUnlockWaiters(); } + private _dispatchItem(item: QueueEntry): void { + const previousValue = this._value; + this._value -= item.weight; + item.resolve([previousValue, this._newReleaser(item.weight)]); + } + private _newReleaser(weight: number): () => void { let called = false; @@ -108,19 +109,8 @@ class Semaphore implements SemaphoreInterface { } } - private _weightedQueues: Array> = []; + private _queue: Array = []; private _weightedWaiters: Array void>> = []; } -function insertSorted(a: T[], v: T): void { - console.log(`insertSorted; a = ${JSON.stringify(a.map(o => o.nice))}; nice = ${v.nice}`); - const i = a.findIndex((other) => v.nice < other.nice); - console.log(`i = ${i}`); - if (i === -1) { - a.push(v); - } else { - a.splice(i, 0, v); - } -} - export default Semaphore; diff --git a/test/semaphoreSuite.ts b/test/semaphoreSuite.ts index d26e80d..78d6822 100644 --- a/test/semaphoreSuite.ts +++ b/test/semaphoreSuite.ts @@ -75,7 +75,34 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.deepStrictEqual(values, [0, -1, 1]); }); - test('acquire de-prioritizes nice waiters even if they are lighter'); + test('acquire de-prioritizes nice waiters even if they are lighter', async () => { + const values: Array = []; + + // two items with weight 1; runs first because nothing else is waiting + semaphore.acquire(1, 0).then(([, release]) => { + values.push(0); + setTimeout(release, 100); + }); + semaphore.acquire(1, 0).then(([, release]) => { + values.push(0); + setTimeout(release, 100); + }); + + // nice item with weight 1 + semaphore.acquire(1, 1).then(([, release]) => { + values.push(1); + setTimeout(release, 100); + }); + + // high-priority item with weight 2; should run before the others + semaphore.acquire(2, -1).then(([, release]) => { + values.push(-1); + setTimeout(release, 100); + }); + + await clock.runAllAsync(); + assert.deepStrictEqual(values, [0, 0, -1, 1]); + }); test('acquire blocks when the semaphore has reached zero until it is released again', async () => { const values: Array = []; @@ -319,8 +346,8 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => test('setValue works fine with isolated weights', async () => { let flag = false; - semaphore.acquire(8); semaphore.acquire(4).then(() => (flag = true)); + semaphore.acquire(8); semaphore.setValue(4); await clock.tickAsync(1); From be76be287d7218f3bf3a1d396ef3550bd68d75e2 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Mon, 22 Jan 2024 22:19:35 -0800 Subject: [PATCH 06/19] Implement nice behavior for Semaphore.runExclusive --- src/Semaphore.ts | 2 +- src/withTimeout.ts | 4 ++-- test/semaphoreSuite.ts | 9 ++++++++- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/Semaphore.ts b/src/Semaphore.ts index f3d8d4e..9a24779 100644 --- a/src/Semaphore.ts +++ b/src/Semaphore.ts @@ -31,7 +31,7 @@ class Semaphore implements SemaphoreInterface { } async runExclusive(callback: SemaphoreInterface.Worker, weight = 1, nice = 0): Promise { - const [value, release] = await this.acquire(weight); + const [value, release] = await this.acquire(weight, nice); try { return await callback(value); diff --git a/src/withTimeout.ts b/src/withTimeout.ts index c5af0fa..9e44753 100644 --- a/src/withTimeout.ts +++ b/src/withTimeout.ts @@ -41,11 +41,11 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: }); }, - async runExclusive(callback: (value?: number) => Promise | T, weight?: number): Promise { + async runExclusive(callback: (value?: number) => Promise | T, weight?: number, nice?: number): Promise { let release: () => void = () => undefined; try { - const ticket = await this.acquire(weight); + const ticket = await this.acquire(weight, nice); if (Array.isArray(ticket)) { release = ticket[1]; diff --git a/test/semaphoreSuite.ts b/test/semaphoreSuite.ts index 78d6822..60a9c4d 100644 --- a/test/semaphoreSuite.ts +++ b/test/semaphoreSuite.ts @@ -286,7 +286,14 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.strictEqual(semaphore.getValue(), 2); }); - test('runExclusive executes the nicest waiters last'); + test('runExclusive executes the nicest waiters last', async () => { + const values: number[] = []; + semaphore.runExclusive(() => { values.push(0) }, 2); + semaphore.runExclusive(() => { values.push(1) }, 2, 1); + semaphore.runExclusive(() => { values.push(-1) }, 2, -1); + await clock.runAllAsync(); + assert.deepStrictEqual(values, [0, -1, 1]); + }); test('new semaphore is unlocked', () => { assert(!semaphore.isLocked()); From fd097a7b27b7c75d7b9aabe4cc81806e96e16c86 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Mon, 22 Jan 2024 22:32:49 -0800 Subject: [PATCH 07/19] Add coverage for immediate dispatch case --- test/semaphoreSuite.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/semaphoreSuite.ts b/test/semaphoreSuite.ts index 60a9c4d..97d2d50 100644 --- a/test/semaphoreSuite.ts +++ b/test/semaphoreSuite.ts @@ -75,6 +75,17 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.deepStrictEqual(values, [0, -1, 1]); }); + test('acquire allows high-priority and light tasks to skip the line', async () => { + let executed = false; + semaphore.acquire(3, 1); + semaphore.acquire(1, 0).then(([, release]) => { + executed = true; + setTimeout(release, 100); + }); + await clock.runAllAsync(); + assert.strictEqual(executed, true); + }); + test('acquire de-prioritizes nice waiters even if they are lighter', async () => { const values: Array = []; From b1a38fcd7dab300860bf3a6cfe07651f03bc8104 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Tue, 23 Jan 2024 10:44:31 -0800 Subject: [PATCH 08/19] Implement Semaphore.waitForUnlock --- src/Semaphore.ts | 67 ++++++++++++++++++++++++++++++++++-------- src/withTimeout.ts | 4 +-- test/semaphoreSuite.ts | 10 ++++++- 3 files changed, 65 insertions(+), 16 deletions(-) diff --git a/src/Semaphore.ts b/src/Semaphore.ts index 9a24779..2185a8e 100644 --- a/src/Semaphore.ts +++ b/src/Semaphore.ts @@ -2,15 +2,28 @@ import { E_CANCELED } from './errors'; import SemaphoreInterface from './SemaphoreInterface'; +interface Nice { + nice: number; +} + interface QueueEntry { resolve(result: [number, SemaphoreInterface.Releaser]): void; + reject(error: unknown): void; + weight: number; nice: number; } +interface Waiter { + resolve(): void; + + nice: number; +} + class Semaphore implements SemaphoreInterface { - constructor(private _value: number, private _cancelError: Error = E_CANCELED) {} + constructor(private _value: number, private _cancelError: Error = E_CANCELED) { + } acquire(weight = 1, nice = 0): Promise<[number, SemaphoreInterface.Releaser]> { if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); @@ -43,12 +56,15 @@ class Semaphore implements SemaphoreInterface { waitForUnlock(weight = 1, nice = 0): Promise { if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); - return new Promise((resolve) => { - if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = []; - this._weightedWaiters[weight - 1].push(resolve); - - this._dispatchQueue(); - }); + if (this._couldLockImmediately(weight, nice)) { + return Promise.resolve(); + } else { + return new Promise((resolve) => { + if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = []; + insertSorted(this._weightedWaiters[weight - 1], { resolve, nice }); + this._dispatchQueue(); + }); + } } isLocked(): boolean { @@ -101,16 +117,41 @@ class Semaphore implements SemaphoreInterface { } private _drainUnlockWaiters(): void { - for (let weight = this._value; weight > 0; weight--) { - if (!this._weightedWaiters[weight - 1]) continue; - - this._weightedWaiters[weight - 1].forEach((waiter) => waiter()); - this._weightedWaiters[weight - 1] = []; + if (this._queue.length === 0) { + for (let weight = this._value; weight > 0; weight--) { + const waiters = this._weightedWaiters[weight - 1]; + if (!waiters) continue; + waiters.forEach((waiter) => waiter.resolve()); + this._weightedWaiters[weight - 1] = []; + } + } else { + const queuedNice = this._queue[0].nice; + for (let weight = this._value; weight > 0; weight--) { + const waiters = this._weightedWaiters[weight - 1]; + if (!waiters) continue; + const i = waiters.findIndex((waiter) => waiter.nice >= queuedNice); + (i === -1 ? waiters : waiters.splice(0, i)) + .forEach((waiter => { waiter.resolve(); })); + } } } + private _couldLockImmediately(weight: number, nice: number) { + return (this._queue.length === 0 || this._queue[0].nice > nice) && + weight <= this._value; + } + private _queue: Array = []; - private _weightedWaiters: Array void>> = []; + private _weightedWaiters: Array> = []; +} + +function insertSorted(a: T[], v: T) { + const i = a.findIndex((other) => v.nice < other.nice); + if (i === -1) { + a.push(v); + } else { + a.splice(i, 0, v); + } } export default Semaphore; diff --git a/src/withTimeout.ts b/src/withTimeout.ts index 9e44753..f8668d0 100644 --- a/src/withTimeout.ts +++ b/src/withTimeout.ts @@ -69,14 +69,14 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: return sync.cancel(); }, - waitForUnlock: (weight?: number): Promise => { + waitForUnlock: (weight?: number, nice?: number): Promise => { if (weight !== undefined && weight <= 0) { throw new Error(`invalid weight ${weight}: must be positive`); } return new Promise((resolve, reject) => { const handle = setTimeout(() => reject(timeoutError), timeout); - sync.waitForUnlock(weight).then(() => { + sync.waitForUnlock(weight, nice).then(() => { clearTimeout(handle); resolve(); }); diff --git a/test/semaphoreSuite.ts b/test/semaphoreSuite.ts index 97d2d50..7d99452 100644 --- a/test/semaphoreSuite.ts +++ b/test/semaphoreSuite.ts @@ -515,7 +515,15 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.deepStrictEqual([flag1, flag2], [true, true]); }); - test('waitForUnlock unblocks the nicest waiters last'); + test('waitForUnlock unblocks the nicest waiters last', async () => { + const calledBack: number[] = []; + semaphore.acquire(3, 1); // A big heavy waiting task + semaphore.waitForUnlock(1, 0).then(() => { calledBack.push(0); }); // High priority + semaphore.waitForUnlock(1, 1).then(() => { calledBack.push(1); }); // Queued behind the heavy task + semaphore.waitForUnlock(1, 2).then(() => { calledBack.push(2); }); // Low priority + await clock.runAllAsync(); + assert.deepStrictEqual(calledBack, [0]); + }); test('waitForUnlock only unblocks when the semaphore can actually be acquired again', async () => { semaphore.acquire(2); From 87f7238c495a4b9a9f761ededf828457a8e81c52 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Tue, 23 Jan 2024 12:21:46 -0800 Subject: [PATCH 09/19] Fix bug in timing of mid-priority waiters --- src/Semaphore.ts | 5 +++-- test/semaphoreSuite.ts | 26 +++++++++++++++++++++++++- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/Semaphore.ts b/src/Semaphore.ts index 2185a8e..ff9384e 100644 --- a/src/Semaphore.ts +++ b/src/Semaphore.ts @@ -93,10 +93,11 @@ class Semaphore implements SemaphoreInterface { } private _dispatchQueue(): void { + this._drainUnlockWaiters(); while (this._queue.length > 0 && this._queue[0].weight <= this._value) { this._dispatchItem(this._queue.shift()!); + this._drainUnlockWaiters(); } - this._drainUnlockWaiters(); } private _dispatchItem(item: QueueEntry): void { @@ -131,7 +132,7 @@ class Semaphore implements SemaphoreInterface { if (!waiters) continue; const i = waiters.findIndex((waiter) => waiter.nice >= queuedNice); (i === -1 ? waiters : waiters.splice(0, i)) - .forEach((waiter => { waiter.resolve(); })); + .forEach((waiter => waiter.resolve())); } } } diff --git a/test/semaphoreSuite.ts b/test/semaphoreSuite.ts index 7d99452..9dedc60 100644 --- a/test/semaphoreSuite.ts +++ b/test/semaphoreSuite.ts @@ -515,7 +515,7 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.deepStrictEqual([flag1, flag2], [true, true]); }); - test('waitForUnlock unblocks the nicest waiters last', async () => { + test('waitForUnlock unblocks only high-priority waiters immediately', async () => { const calledBack: number[] = []; semaphore.acquire(3, 1); // A big heavy waiting task semaphore.waitForUnlock(1, 0).then(() => { calledBack.push(0); }); // High priority @@ -525,6 +525,30 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.deepStrictEqual(calledBack, [0]); }); + test('waitForUnlock unblocks nice waiters as the queue drains', async () => { + let calledBack = false; + let release: SemaphoreInterface.Releaser; + + semaphore.acquire(2, 0).then(([, r]) => { release = r; }); + semaphore.acquire(2, 2).then(([, r]) => { setTimeout(r, 100); }); + + semaphore.waitForUnlock(2, 1).then(() => { calledBack = true; }); + + await clock.tickAsync(0); + assert.strictEqual(calledBack, false); + release!(); + await clock.tickAsync(0); + assert.strictEqual(calledBack, true); + await clock.runAllAsync(); + }); + + test('waitForUnlock resolves immediately when the queue is empty', async () => { + let calledBack = false; + semaphore.waitForUnlock(1).then(() => { calledBack = true; }); + await clock.tickAsync(0); + assert.strictEqual(calledBack, true); + }); + test('waitForUnlock only unblocks when the semaphore can actually be acquired again', async () => { semaphore.acquire(2); semaphore.acquire(2); From 2b11b72f4427146f9b59b372b29c0c7c4b2c5069 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Tue, 23 Jan 2024 21:16:17 -0800 Subject: [PATCH 10/19] Rename 'nice' to 'priority' --- README.md | 23 +++++++--------- src/Mutex.ts | 6 ++--- src/MutexInterface.ts | 6 ++--- src/Semaphore.ts | 36 ++++++++++++------------- src/SemaphoreInterface.ts | 6 ++--- src/withTimeout.ts | 12 ++++----- test/mutex.ts | 6 ++--- test/semaphoreSuite.ts | 56 +++++++++++++++++++-------------------- 8 files changed, 74 insertions(+), 77 deletions(-) diff --git a/README.md b/README.md index 3335b8e..95c30b8 100644 --- a/README.md +++ b/README.md @@ -295,9 +295,9 @@ of the callback. semaphore by the specified value, and the callback will only be invoked once the semaphore's value greater or equal to `weight`. -`runExclusive` accepts a second optional argument `nice`. Specifying a higher `nice` value will -cause the task to be scheduled after tasks with a lower `nice` value and before tasks with a higher -`nice` value. `nice` can be negative and the default is zero. +`runExclusive` accepts a second optional argument `priority`. Specifying a greater value for `priority` +tells the scheduler to run this task before other tasks. `priority` can be any real number. The default +is zero. ### Manual locking / releasing @@ -332,13 +332,13 @@ has completed. The `release` callback is idempotent. likely deadlock the application. Make sure to call `release` under all circumstances and handle exceptions accordingly. -`runExclusive` accepts a first optional argument `weight`. Specifying a `weight` will decrement the +`acquire` accepts a first optional argument `weight`. Specifying a `weight` will decrement the semaphore by the specified value, and the semaphore will only be acquired once its value is greater or equal to `weight`. -`runExclusive` accepts a second optional argument `nice`. Specifying a higher `nice` value will -cause the task to be scheduled after tasks with a lower `nice` value and before tasks with a higher -`nice` value. `nice` can be negative and the default is zero. +`acquire` accepts a second optional argument `priority`. Specifying a greater value for `priority` +tells the scheduler to release the semaphore to the caller before other callers. `priority` can be +any real number. The default is zero. ### Unscoped release @@ -452,12 +452,9 @@ await semaphore.waitForUnlock(); // ... ``` -`waitForUnlock` accepts an optional argument `weight`. If `weight` is specified the promise -will only resolve once the semaphore's value is greater or equal to `weight`; - -`waitForUnlock` accepts a second optional argument `nice`. Specifying a higher `nice` value will -cause the promise to resolve after tasks with a lower `nice` value and before tasks with a higher -`nice` value. `nice` can be negative and the default is zero. +`waitForUnlock` accepts optional arguments `weight` and `priority`. The promise will resolve as soon +as it is possible to `acquire` the semaphore with the given weight and priority. Scheduled tasks with +the greatest `priority` values execute first. ## Limiting the time waiting for a mutex or semaphore to become available diff --git a/src/Mutex.ts b/src/Mutex.ts index 341a008..c75d127 100644 --- a/src/Mutex.ts +++ b/src/Mutex.ts @@ -6,13 +6,13 @@ class Mutex implements MutexInterface { this._semaphore = new Semaphore(1, cancelError); } - async acquire(nice = 0): Promise { + async acquire(priority = 0): Promise { const [, releaser] = await this._semaphore.acquire(); return releaser; } - runExclusive(callback: MutexInterface.Worker, nice = 0): Promise { + runExclusive(callback: MutexInterface.Worker, priority = 0): Promise { return this._semaphore.runExclusive(() => callback()); } @@ -20,7 +20,7 @@ class Mutex implements MutexInterface { return this._semaphore.isLocked(); } - waitForUnlock(nice = 0): Promise { + waitForUnlock(priority = 0): Promise { return this._semaphore.waitForUnlock(); } diff --git a/src/MutexInterface.ts b/src/MutexInterface.ts index 10346de..c09f22a 100644 --- a/src/MutexInterface.ts +++ b/src/MutexInterface.ts @@ -1,9 +1,9 @@ interface MutexInterface { - acquire(nice?: number): Promise; + acquire(priority?: number): Promise; - runExclusive(callback: MutexInterface.Worker, nice?: number): Promise; + runExclusive(callback: MutexInterface.Worker, priority?: number): Promise; - waitForUnlock(nice?: number): Promise; + waitForUnlock(priority?: number): Promise; isLocked(): boolean; diff --git a/src/Semaphore.ts b/src/Semaphore.ts index ff9384e..527a01e 100644 --- a/src/Semaphore.ts +++ b/src/Semaphore.ts @@ -2,8 +2,8 @@ import { E_CANCELED } from './errors'; import SemaphoreInterface from './SemaphoreInterface'; -interface Nice { - nice: number; +interface Priority { + priority: number; } interface QueueEntry { @@ -12,25 +12,25 @@ interface QueueEntry { reject(error: unknown): void; weight: number; - nice: number; + priority: number; } interface Waiter { resolve(): void; - nice: number; + priority: number; } class Semaphore implements SemaphoreInterface { constructor(private _value: number, private _cancelError: Error = E_CANCELED) { } - acquire(weight = 1, nice = 0): Promise<[number, SemaphoreInterface.Releaser]> { + acquire(weight = 1, priority = 0): Promise<[number, SemaphoreInterface.Releaser]> { if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); return new Promise((resolve, reject) => { - const task = { resolve, reject, weight, nice }; - const i = this._queue.findIndex((other) => nice < other.nice); + const task = { resolve, reject, weight, priority }; + const i = this._queue.findIndex((other) => priority > other.priority); if (i === 0 && weight <= this._value) { // Needs immediate dispatch, skip the queue this._dispatchItem(task); @@ -43,8 +43,8 @@ class Semaphore implements SemaphoreInterface { }); } - async runExclusive(callback: SemaphoreInterface.Worker, weight = 1, nice = 0): Promise { - const [value, release] = await this.acquire(weight, nice); + async runExclusive(callback: SemaphoreInterface.Worker, weight = 1, priority = 0): Promise { + const [value, release] = await this.acquire(weight, priority); try { return await callback(value); @@ -53,15 +53,15 @@ class Semaphore implements SemaphoreInterface { } } - waitForUnlock(weight = 1, nice = 0): Promise { + waitForUnlock(weight = 1, priority = 0): Promise { if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); - if (this._couldLockImmediately(weight, nice)) { + if (this._couldLockImmediately(weight, priority)) { return Promise.resolve(); } else { return new Promise((resolve) => { if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = []; - insertSorted(this._weightedWaiters[weight - 1], { resolve, nice }); + insertSorted(this._weightedWaiters[weight - 1], { resolve, priority }); this._dispatchQueue(); }); } @@ -126,19 +126,19 @@ class Semaphore implements SemaphoreInterface { this._weightedWaiters[weight - 1] = []; } } else { - const queuedNice = this._queue[0].nice; + const queuedPriority = this._queue[0].priority; for (let weight = this._value; weight > 0; weight--) { const waiters = this._weightedWaiters[weight - 1]; if (!waiters) continue; - const i = waiters.findIndex((waiter) => waiter.nice >= queuedNice); + const i = waiters.findIndex((waiter) => waiter.priority <= queuedPriority); (i === -1 ? waiters : waiters.splice(0, i)) .forEach((waiter => waiter.resolve())); } } } - private _couldLockImmediately(weight: number, nice: number) { - return (this._queue.length === 0 || this._queue[0].nice > nice) && + private _couldLockImmediately(weight: number, priority: number) { + return (this._queue.length === 0 || this._queue[0].priority < priority) && weight <= this._value; } @@ -146,8 +146,8 @@ class Semaphore implements SemaphoreInterface { private _weightedWaiters: Array> = []; } -function insertSorted(a: T[], v: T) { - const i = a.findIndex((other) => v.nice < other.nice); +function insertSorted(a: T[], v: T) { + const i = a.findIndex((other) => v.priority > other.priority); if (i === -1) { a.push(v); } else { diff --git a/src/SemaphoreInterface.ts b/src/SemaphoreInterface.ts index 7c1e2e0..14e40f6 100644 --- a/src/SemaphoreInterface.ts +++ b/src/SemaphoreInterface.ts @@ -1,9 +1,9 @@ interface SemaphoreInterface { - acquire(weight?: number, nice?: number): Promise<[number, SemaphoreInterface.Releaser]>; + acquire(weight?: number, priority?: number): Promise<[number, SemaphoreInterface.Releaser]>; - runExclusive(callback: SemaphoreInterface.Worker, weight?: number, nice?: number): Promise; + runExclusive(callback: SemaphoreInterface.Worker, weight?: number, priority?: number): Promise; - waitForUnlock(weight?: number, nice?: number): Promise; + waitForUnlock(weight?: number, priority?: number): Promise; isLocked(): boolean; diff --git a/src/withTimeout.ts b/src/withTimeout.ts index f8668d0..cf794d7 100644 --- a/src/withTimeout.ts +++ b/src/withTimeout.ts @@ -7,7 +7,7 @@ export function withTimeout(mutex: MutexInterface, timeout: number, timeoutError export function withTimeout(semaphore: SemaphoreInterface, timeout: number, timeoutError?: Error): SemaphoreInterface; export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: number, timeoutError = E_TIMEOUT): any { return { - acquire: (weight?: number, nice?: number): Promise => { + acquire: (weight?: number, priority?: number): Promise => { if (weight !== undefined && weight <= 0) { throw new Error(`invalid weight ${weight}: must be positive`); } @@ -21,7 +21,7 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: }, timeout); try { - const ticket = await sync.acquire(weight, nice); + const ticket = await sync.acquire(weight, priority); if (isTimeout) { const release = Array.isArray(ticket) ? ticket[1] : ticket; @@ -41,11 +41,11 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: }); }, - async runExclusive(callback: (value?: number) => Promise | T, weight?: number, nice?: number): Promise { + async runExclusive(callback: (value?: number) => Promise | T, weight?: number, priority?: number): Promise { let release: () => void = () => undefined; try { - const ticket = await this.acquire(weight, nice); + const ticket = await this.acquire(weight, priority); if (Array.isArray(ticket)) { release = ticket[1]; @@ -69,14 +69,14 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: return sync.cancel(); }, - waitForUnlock: (weight?: number, nice?: number): Promise => { + waitForUnlock: (weight?: number, priority?: number): Promise => { if (weight !== undefined && weight <= 0) { throw new Error(`invalid weight ${weight}: must be positive`); } return new Promise((resolve, reject) => { const handle = setTimeout(() => reject(timeoutError), timeout); - sync.waitForUnlock(weight, nice).then(() => { + sync.waitForUnlock(weight, priority).then(() => { clearTimeout(handle); resolve(); }); diff --git a/test/mutex.ts b/test/mutex.ts index c40e102..7b89ae3 100644 --- a/test/mutex.ts +++ b/test/mutex.ts @@ -36,7 +36,7 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo assert(flag); })); - test('acquire unblocks the nicest waiter last'); + test('acquire unblocks the highest-priority task first'); test('runExclusive passes result (immediate)', async () => { assert.strictEqual(await mutex.runExclusive(() => 10), 10); @@ -83,7 +83,7 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo assert(flag); })); - test('runExclusive unblocks the nicest waiters last'); + test('runExclusive unblocks the highest-priority task first'); test('exceptions during runExclusive do not leave mutex locked', async () => { let flag = false; @@ -270,7 +270,7 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo assert.strictEqual(flag, true); }); - test('waitForUnlock unblocks the nicest waiters last'); + test('waitForUnlock unblocks the highest-priority tasks first'); }; suite('Mutex', () => mutexSuite((e) => new Mutex(e))); diff --git a/test/semaphoreSuite.ts b/test/semaphoreSuite.ts index 9dedc60..be084b9 100644 --- a/test/semaphoreSuite.ts +++ b/test/semaphoreSuite.ts @@ -50,35 +50,35 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.deepStrictEqual(values.sort(), [2, 2]); }); - test('acquire unblocks the nicest waiters last', async () => { + test('acquire unblocks high-priority tasks first', async () => { const values: Array = []; - // nice=0; runs first because nothing else is waiting + // priority=0; runs first because nothing else is waiting semaphore.acquire(2, 0).then(([, release]) => { values.push(0); setTimeout(release, 100); }); - // nice=1; queues first - semaphore.acquire(2, 1).then(([, release]) => { - values.push(1); + // priority=-1; queues first + semaphore.acquire(2, -1).then(([, release]) => { + values.push(-1); setTimeout(release, 100); }); - // nice=-1; jumps ahead of nice=+1 - semaphore.acquire(2, -1).then(([, release]) => { - values.push(-1); + // priority=+1; jumps ahead of priority=-1 + semaphore.acquire(2, +1).then(([, release]) => { + values.push(+1); setTimeout(release, 100); }); await clock.runAllAsync(); - assert.deepStrictEqual(values, [0, -1, 1]); + assert.deepStrictEqual(values, [0, +1, -1]); }); - test('acquire allows high-priority and light tasks to skip the line', async () => { + test('acquire allows light high-priority tasks to skip the line', async () => { let executed = false; - semaphore.acquire(3, 1); - semaphore.acquire(1, 0).then(([, release]) => { + semaphore.acquire(3, 0); + semaphore.acquire(1, 1).then(([, release]) => { executed = true; setTimeout(release, 100); }); @@ -86,7 +86,7 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.strictEqual(executed, true); }); - test('acquire de-prioritizes nice waiters even if they are lighter', async () => { + test('acquire prioritizes high-priority tasks even if they are heavier', async () => { const values: Array = []; // two items with weight 1; runs first because nothing else is waiting @@ -99,20 +99,20 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => setTimeout(release, 100); }); - // nice item with weight 1 - semaphore.acquire(1, 1).then(([, release]) => { - values.push(1); + // low-priority item with weight 1 + semaphore.acquire(1, -1).then(([, release]) => { + values.push(-1); setTimeout(release, 100); }); // high-priority item with weight 2; should run before the others - semaphore.acquire(2, -1).then(([, release]) => { - values.push(-1); + semaphore.acquire(2, +1).then(([, release]) => { + values.push(+1); setTimeout(release, 100); }); await clock.runAllAsync(); - assert.deepStrictEqual(values, [0, 0, -1, 1]); + assert.deepStrictEqual(values, [0, 0, +1, -1]); }); test('acquire blocks when the semaphore has reached zero until it is released again', async () => { @@ -297,13 +297,13 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.strictEqual(semaphore.getValue(), 2); }); - test('runExclusive executes the nicest waiters last', async () => { + test('runExclusive executes high-priority tasks first', async () => { const values: number[] = []; semaphore.runExclusive(() => { values.push(0) }, 2); - semaphore.runExclusive(() => { values.push(1) }, 2, 1); semaphore.runExclusive(() => { values.push(-1) }, 2, -1); + semaphore.runExclusive(() => { values.push(+1) }, 2, +1); await clock.runAllAsync(); - assert.deepStrictEqual(values, [0, -1, 1]); + assert.deepStrictEqual(values, [0, +1, -1]); }); test('new semaphore is unlocked', () => { @@ -518,19 +518,19 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => test('waitForUnlock unblocks only high-priority waiters immediately', async () => { const calledBack: number[] = []; semaphore.acquire(3, 1); // A big heavy waiting task - semaphore.waitForUnlock(1, 0).then(() => { calledBack.push(0); }); // High priority + semaphore.waitForUnlock(1, 2).then(() => { calledBack.push(2); }); // High priority semaphore.waitForUnlock(1, 1).then(() => { calledBack.push(1); }); // Queued behind the heavy task - semaphore.waitForUnlock(1, 2).then(() => { calledBack.push(2); }); // Low priority + semaphore.waitForUnlock(1, 0).then(() => { calledBack.push(0); }); // Low priority await clock.runAllAsync(); - assert.deepStrictEqual(calledBack, [0]); + assert.deepStrictEqual(calledBack, [2]); }); - test('waitForUnlock unblocks nice waiters as the queue drains', async () => { + test('waitForUnlock unblocks waiters of descending priority as the queue drains', async () => { let calledBack = false; let release: SemaphoreInterface.Releaser; - semaphore.acquire(2, 0).then(([, r]) => { release = r; }); - semaphore.acquire(2, 2).then(([, r]) => { setTimeout(r, 100); }); + semaphore.acquire(2, 2).then(([, r]) => { release = r; }); + semaphore.acquire(2, 0).then(([, r]) => { setTimeout(r, 100); }); semaphore.waitForUnlock(2, 1).then(() => { calledBack = true; }); From 90d109230222d2b983d8539945f92995d935e0f6 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Tue, 23 Jan 2024 21:21:00 -0800 Subject: [PATCH 11/19] Switch up test init order to exercise a branch --- test/semaphoreSuite.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/semaphoreSuite.ts b/test/semaphoreSuite.ts index be084b9..6943904 100644 --- a/test/semaphoreSuite.ts +++ b/test/semaphoreSuite.ts @@ -518,9 +518,9 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => test('waitForUnlock unblocks only high-priority waiters immediately', async () => { const calledBack: number[] = []; semaphore.acquire(3, 1); // A big heavy waiting task + semaphore.waitForUnlock(1, 0).then(() => { calledBack.push(0); }); // Low priority semaphore.waitForUnlock(1, 2).then(() => { calledBack.push(2); }); // High priority semaphore.waitForUnlock(1, 1).then(() => { calledBack.push(1); }); // Queued behind the heavy task - semaphore.waitForUnlock(1, 0).then(() => { calledBack.push(0); }); // Low priority await clock.runAllAsync(); assert.deepStrictEqual(calledBack, [2]); }); From aeeae188446fa62465aa228271a4ae1af7482f26 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Tue, 23 Jan 2024 22:28:23 -0800 Subject: [PATCH 12/19] Add priority to Mutex.acquire --- src/Mutex.ts | 2 +- src/withTimeout.ts | 19 ++++++++++++++++--- test/mutex.ts | 29 +++++++++++++++++++++++++++-- 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/src/Mutex.ts b/src/Mutex.ts index c75d127..242257b 100644 --- a/src/Mutex.ts +++ b/src/Mutex.ts @@ -7,7 +7,7 @@ class Mutex implements MutexInterface { } async acquire(priority = 0): Promise { - const [, releaser] = await this._semaphore.acquire(); + const [, releaser] = await this._semaphore.acquire(1, priority); return releaser; } diff --git a/src/withTimeout.ts b/src/withTimeout.ts index cf794d7..8da0f82 100644 --- a/src/withTimeout.ts +++ b/src/withTimeout.ts @@ -7,7 +7,14 @@ export function withTimeout(mutex: MutexInterface, timeout: number, timeoutError export function withTimeout(semaphore: SemaphoreInterface, timeout: number, timeoutError?: Error): SemaphoreInterface; export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: number, timeoutError = E_TIMEOUT): any { return { - acquire: (weight?: number, priority?: number): Promise => { + acquire: (weightOrPriority?: number, priority?: number): Promise => { + let weight: number | undefined; + if (isSemaphore(sync)) { + weight = weightOrPriority; + } else { + weight = undefined; + priority = weightOrPriority; + } if (weight !== undefined && weight <= 0) { throw new Error(`invalid weight ${weight}: must be positive`); } @@ -21,8 +28,10 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: }, timeout); try { - const ticket = await sync.acquire(weight, priority); - + const ticket = await (isSemaphore(sync) + ? sync.acquire(weight, priority) + : sync.acquire(priority) + ); if (isTimeout) { const release = Array.isArray(ticket) ? ticket[1] : ticket; @@ -90,3 +99,7 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: setValue: (value: number) => (sync as SemaphoreInterface).setValue(value), }; } + +function isSemaphore(sync: SemaphoreInterface | MutexInterface): sync is SemaphoreInterface { + return (sync as SemaphoreInterface).getValue !== undefined; +} diff --git a/test/mutex.ts b/test/mutex.ts index 7b89ae3..3078179 100644 --- a/test/mutex.ts +++ b/test/mutex.ts @@ -36,7 +36,32 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo assert(flag); })); - test('acquire unblocks the highest-priority task first'); + test('acquire unblocks the highest-priority task first', () => + withTimer(clock, async () => { + const values: number[] = []; + + // Scheduled immediately + mutex.acquire(0).then((release) => { + values.push(0); + setTimeout(release, 100) + }); + + // Low priority task + mutex.acquire(-1).then((release) => { + values.push(-1); + setTimeout(release, 100) + }); + + // High priority task; jumps the queue + mutex.acquire(1).then((release) => { + values.push(1); + setTimeout(release, 100) + }); + + await clock.runAllAsync(); + assert.deepStrictEqual(values, [0, 1, -1]); + }) + ); test('runExclusive passes result (immediate)', async () => { assert.strictEqual(await mutex.runExclusive(() => 10), 10); @@ -270,7 +295,7 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo assert.strictEqual(flag, true); }); - test('waitForUnlock unblocks the highest-priority tasks first'); + test('waitForUnlock unblocks the highest-priority task first'); }; suite('Mutex', () => mutexSuite((e) => new Mutex(e))); From 8711a6d52c747c887870eb88e010c6830d75434a Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Tue, 23 Jan 2024 22:37:32 -0800 Subject: [PATCH 13/19] Add priority to Mutex.runExclusive and waitForUnlock --- src/Mutex.ts | 4 ++-- src/withTimeout.ts | 18 ++++++++++++++---- test/mutex.ts | 29 +++++++++++++++++++++++++++-- 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/src/Mutex.ts b/src/Mutex.ts index 242257b..20cffd6 100644 --- a/src/Mutex.ts +++ b/src/Mutex.ts @@ -13,7 +13,7 @@ class Mutex implements MutexInterface { } runExclusive(callback: MutexInterface.Worker, priority = 0): Promise { - return this._semaphore.runExclusive(() => callback()); + return this._semaphore.runExclusive(() => callback(), 1, priority); } isLocked(): boolean { @@ -21,7 +21,7 @@ class Mutex implements MutexInterface { } waitForUnlock(priority = 0): Promise { - return this._semaphore.waitForUnlock(); + return this._semaphore.waitForUnlock(1, priority); } release(): void { diff --git a/src/withTimeout.ts b/src/withTimeout.ts index 8da0f82..77e232f 100644 --- a/src/withTimeout.ts +++ b/src/withTimeout.ts @@ -78,16 +78,26 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: return sync.cancel(); }, - waitForUnlock: (weight?: number, priority?: number): Promise => { + waitForUnlock: (weightOrPriority?: number, priority?: number): Promise => { + let weight: number | undefined; + if (isSemaphore(sync)) { + weight = weightOrPriority; + } else { + weight = undefined; + priority = weightOrPriority; + } if (weight !== undefined && weight <= 0) { throw new Error(`invalid weight ${weight}: must be positive`); } return new Promise((resolve, reject) => { const handle = setTimeout(() => reject(timeoutError), timeout); - sync.waitForUnlock(weight, priority).then(() => { - clearTimeout(handle); - resolve(); + (isSemaphore(sync) + ? sync.waitForUnlock(weight, priority) + : sync.waitForUnlock(priority) + ).then(() => { + clearTimeout(handle); + resolve(); }); }); }, diff --git a/test/mutex.ts b/test/mutex.ts index 3078179..d945209 100644 --- a/test/mutex.ts +++ b/test/mutex.ts @@ -108,7 +108,14 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo assert(flag); })); - test('runExclusive unblocks the highest-priority task first'); + test('runExclusive unblocks the highest-priority task first', async () => { + const values: number[] = []; + mutex.runExclusive(() => { values.push(0); }, 0); + mutex.runExclusive(() => { values.push(-1); }, -1); + mutex.runExclusive(() => { values.push(+1); }, +1); + await clock.runAllAsync(); + assert.deepStrictEqual(values, [0, +1, -1]); + }); test('exceptions during runExclusive do not leave mutex locked', async () => { let flag = false; @@ -295,7 +302,25 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo assert.strictEqual(flag, true); }); - test('waitForUnlock unblocks the highest-priority task first'); + test('waitForUnlock unblocks high-priority waiters before low-priority queued tasks', async () => { + mutex.acquire(0); // Immediately scheduled + mutex.acquire(0); // Waiting + let flag = false; + mutex.waitForUnlock(1).then(() => { flag = true; }); + mutex.release(); + await clock.tickAsync(0); + assert.strictEqual(flag, true); + }); + + test('waitForUnlock unblocks low-priority waiters after high-priority queued tasks', async () => { + mutex.acquire(0); // Immediately scheduled + mutex.acquire(0); // Waiting + let flag = false; + mutex.waitForUnlock(-1).then(() => { flag = true; }); + mutex.release(); + await clock.tickAsync(0); + assert.strictEqual(flag, false); + }); }; suite('Mutex', () => mutexSuite((e) => new Mutex(e))); From 7aacdc7a226cd8ca86f0236150e778f0fd5bc226 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Tue, 23 Jan 2024 23:18:22 -0800 Subject: [PATCH 14/19] Polish PR --- src/Semaphore.ts | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/Semaphore.ts b/src/Semaphore.ts index 527a01e..2a457c3 100644 --- a/src/Semaphore.ts +++ b/src/Semaphore.ts @@ -8,28 +8,24 @@ interface Priority { interface QueueEntry { resolve(result: [number, SemaphoreInterface.Releaser]): void; - reject(error: unknown): void; - weight: number; priority: number; } interface Waiter { resolve(): void; - priority: number; } class Semaphore implements SemaphoreInterface { - constructor(private _value: number, private _cancelError: Error = E_CANCELED) { - } + constructor(private _value: number, private _cancelError: Error = E_CANCELED) {} acquire(weight = 1, priority = 0): Promise<[number, SemaphoreInterface.Releaser]> { if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); return new Promise((resolve, reject) => { - const task = { resolve, reject, weight, priority }; + const task: QueueEntry = { resolve, reject, weight, priority }; const i = this._queue.findIndex((other) => priority > other.priority); if (i === 0 && weight <= this._value) { // Needs immediate dispatch, skip the queue From 6a55acb63ef528a8f6bb6bcd904f6fafafd78ba1 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Thu, 1 Feb 2024 20:36:38 -0800 Subject: [PATCH 15/19] Add failing test for crowd-out condition --- test/semaphoreSuite.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/semaphoreSuite.ts b/test/semaphoreSuite.ts index 4c9bc6c..11c0461 100644 --- a/test/semaphoreSuite.ts +++ b/test/semaphoreSuite.ts @@ -50,6 +50,24 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.deepStrictEqual(values.sort(), [2, 2]); }); + test('acquire allows light items to run eventually', async () => { + let done = false; + async function lightLoop() { + while (!done) { + const [,release] = await semaphore.acquire(1); + await new Promise((resolve) => { setTimeout(resolve, 10); }); + release(); + } + } + lightLoop(); + await clock.tickAsync(5); + lightLoop(); + semaphore.acquire(2).then(() => { done = true; }); + await clock.tickAsync(10); + await clock.tickAsync(10); + assert.strictEqual(done, true); + }); + test('acquire blocks when the semaphore has reached zero until it is released again', async () => { const values: Array = []; From 2d13278159e318955ea9bc144366023ffd2e4df8 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Thu, 1 Feb 2024 21:27:37 -0800 Subject: [PATCH 16/19] Reverse findIndex for queuing tasks in acquire() --- src/Semaphore.ts | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/Semaphore.ts b/src/Semaphore.ts index 2a457c3..21f756c 100644 --- a/src/Semaphore.ts +++ b/src/Semaphore.ts @@ -26,14 +26,14 @@ class Semaphore implements SemaphoreInterface { return new Promise((resolve, reject) => { const task: QueueEntry = { resolve, reject, weight, priority }; - const i = this._queue.findIndex((other) => priority > other.priority); - if (i === 0 && weight <= this._value) { + const i = findIndexFromEnd(this._queue, (other) => priority <= other.priority); + if (i === -1 && weight <= this._value) { // Needs immediate dispatch, skip the queue this._dispatchItem(task); } else if (i === -1) { - this._queue.push(task); + this._queue.splice(0, 0, task); } else { - this._queue.splice(i, 0, task); + this._queue.splice(i + 1, 0, task); } this._dispatchQueue(); }); @@ -151,4 +151,13 @@ function insertSorted(a: T[], v: T) { } } +function findIndexFromEnd(a: T[], predicate: (e: T) => boolean): number { + for (let i = a.length - 1; i >= 0; i--) { + if (predicate(a[i])) { + return i; + } + } + return -1; +} + export default Semaphore; From 981554fb9519dc210c5bae8c40859b55e7388f53 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Thu, 1 Feb 2024 21:28:50 -0800 Subject: [PATCH 17/19] Fix test name --- test/semaphoreSuite.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/semaphoreSuite.ts b/test/semaphoreSuite.ts index 17fef43..c75a179 100644 --- a/test/semaphoreSuite.ts +++ b/test/semaphoreSuite.ts @@ -115,7 +115,7 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.deepStrictEqual(values, [0, 0, +1, -1]); }); - test('acquire allows light items to run eventually', async () => { + test('acquire allows heavy items to run eventually', async () => { let done = false; async function lightLoop() { while (!done) { From 4350d809276d7f85281c13e58706586f34e3f84b Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Thu, 1 Feb 2024 21:35:43 -0800 Subject: [PATCH 18/19] Reverse findIndex for queuing tasks in waitForUnlock() --- src/Semaphore.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Semaphore.ts b/src/Semaphore.ts index 21f756c..aad9ce9 100644 --- a/src/Semaphore.ts +++ b/src/Semaphore.ts @@ -143,11 +143,11 @@ class Semaphore implements SemaphoreInterface { } function insertSorted(a: T[], v: T) { - const i = a.findIndex((other) => v.priority > other.priority); + const i = findIndexFromEnd(a, (other) => v.priority <= other.priority); if (i === -1) { - a.push(v); + a.splice(0, 0, v); } else { - a.splice(i, 0, v); + a.splice(i + 1, 0, v); } } From 285229f04303e6bc2e9c3c810c39fcfe215e4990 Mon Sep 17 00:00:00 2001 From: Dolan Murvihill Date: Sat, 9 Mar 2024 16:29:40 -0800 Subject: [PATCH 19/19] Remove redundant instructions --- src/Semaphore.ts | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/Semaphore.ts b/src/Semaphore.ts index aad9ce9..b912f2d 100644 --- a/src/Semaphore.ts +++ b/src/Semaphore.ts @@ -30,12 +30,9 @@ class Semaphore implements SemaphoreInterface { if (i === -1 && weight <= this._value) { // Needs immediate dispatch, skip the queue this._dispatchItem(task); - } else if (i === -1) { - this._queue.splice(0, 0, task); } else { this._queue.splice(i + 1, 0, task); } - this._dispatchQueue(); }); } @@ -58,7 +55,6 @@ class Semaphore implements SemaphoreInterface { return new Promise((resolve) => { if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = []; insertSorted(this._weightedWaiters[weight - 1], { resolve, priority }); - this._dispatchQueue(); }); } } @@ -144,11 +140,7 @@ class Semaphore implements SemaphoreInterface { function insertSorted(a: T[], v: T) { const i = findIndexFromEnd(a, (other) => v.priority <= other.priority); - if (i === -1) { - a.splice(0, 0, v); - } else { - a.splice(i + 1, 0, v); - } + a.splice(i + 1, 0, v); } function findIndexFromEnd(a: T[], predicate: (e: T) => boolean): number {