Skip to content

Commit

Permalink
Merge pull request #75 from dmurvihill/priority
Browse files Browse the repository at this point in the history
Add priority queueing for Mutex and Semaphore
  • Loading branch information
DirtyHairy committed Mar 11, 2024
2 parents 6af938b + 285229f commit 43e8858
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 65 deletions.
20 changes: 15 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,14 @@ the semaphore is released. `runExclusive` returns a promise that adopts the stat
The semaphore is released and the result rejected if an exception occurs during execution
of the callback.

`runExclusive` accepts an optional argument `weight`. Specifying a `weight` will decrement the
`runExclusive` accepts a first optional argument `weight`. Specifying a `weight` will decrement the
semaphore by the specified value, and the callback will only be invoked once the semaphore's
value greater or equal to `weight`.

`runExclusive` accepts a second optional argument `priority`. Specifying a greater value for `priority`
tells the scheduler to run this task before other tasks. `priority` can be any real number. The default
is zero.

### Manual locking / releasing

Promise style:
Expand Down Expand Up @@ -328,10 +332,14 @@ has completed. The `release` callback is idempotent.
likely deadlock the application. Make sure to call `release` under all circumstances
and handle exceptions accordingly.

`runExclusive` accepts an optional argument `weight`. Specifying a `weight` will decrement the
semaphore by the specified value, and the semaphore will only be acquired once the its
`acquire` accepts a first optional argument `weight`. Specifying a `weight` will decrement the
semaphore by the specified value, and the semaphore will only be acquired once its
value is greater or equal to `weight`.

`acquire` accepts a second optional argument `priority`. Specifying a greater value for `priority`
tells the scheduler to release the semaphore to the caller before other callers. `priority` can be
any real number. The default is zero.

### Unscoped release

As an alternative to calling the `release` callback returned by `acquire`, the semaphore
Expand Down Expand Up @@ -444,8 +452,10 @@ await semaphore.waitForUnlock();
// ...
```

`waitForUnlock` accepts an optional argument `weight`. If `weight` is specified the promise
will only resolve once the semaphore's value is greater or equal to `weight`;
`waitForUnlock` accepts optional arguments `weight` and `priority`. The promise will resolve as soon
as it is possible to `acquire` the semaphore with the given weight and priority. Scheduled tasks with
the greatest `priority` values execute first.


## Limiting the time waiting for a mutex or semaphore to become available

Expand Down
12 changes: 6 additions & 6 deletions src/Mutex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ class Mutex implements MutexInterface {
this._semaphore = new Semaphore(1, cancelError);
}

async acquire(): Promise<MutexInterface.Releaser> {
const [, releaser] = await this._semaphore.acquire();
async acquire(priority = 0): Promise<MutexInterface.Releaser> {
const [, releaser] = await this._semaphore.acquire(1, priority);

return releaser;
}

runExclusive<T>(callback: MutexInterface.Worker<T>): Promise<T> {
return this._semaphore.runExclusive(() => callback());
runExclusive<T>(callback: MutexInterface.Worker<T>, priority = 0): Promise<T> {
return this._semaphore.runExclusive(() => callback(), 1, priority);
}

isLocked(): boolean {
return this._semaphore.isLocked();
}

waitForUnlock(): Promise<void> {
return this._semaphore.waitForUnlock();
waitForUnlock(priority = 0): Promise<void> {
return this._semaphore.waitForUnlock(1, priority);
}

release(): void {
Expand Down
6 changes: 3 additions & 3 deletions src/MutexInterface.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
interface MutexInterface {
acquire(): Promise<MutexInterface.Releaser>;
acquire(priority?: number): Promise<MutexInterface.Releaser>;

runExclusive<T>(callback: MutexInterface.Worker<T>): Promise<T>;
runExclusive<T>(callback: MutexInterface.Worker<T>, priority?: number): Promise<T>;

waitForUnlock(): Promise<void>;
waitForUnlock(priority?: number): Promise<void>;

isLocked(): boolean;

Expand Down
121 changes: 83 additions & 38 deletions src/Semaphore.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,43 @@
import { E_CANCELED } from './errors';
import SemaphoreInterface from './SemaphoreInterface';


interface Priority {
priority: number;
}

interface QueueEntry {
resolve(result: [number, SemaphoreInterface.Releaser]): void;
reject(error: unknown): void;
weight: number;
priority: number;
}

interface Waiter {
resolve(): void;
priority: number;
}

class Semaphore implements SemaphoreInterface {
constructor(private _value: number, private _cancelError: Error = E_CANCELED) {}

acquire(weight = 1): Promise<[number, SemaphoreInterface.Releaser]> {
acquire(weight = 1, priority = 0): Promise<[number, SemaphoreInterface.Releaser]> {
if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`);

return new Promise((resolve, reject) => {
if (!this._weightedQueues[weight - 1]) this._weightedQueues[weight - 1] = [];
this._weightedQueues[weight - 1].push({ resolve, reject });

this._dispatch();
const task: QueueEntry = { resolve, reject, weight, priority };
const i = findIndexFromEnd(this._queue, (other) => priority <= other.priority);
if (i === -1 && weight <= this._value) {
// Needs immediate dispatch, skip the queue
this._dispatchItem(task);
} else {
this._queue.splice(i + 1, 0, task);
}
});
}

async runExclusive<T>(callback: SemaphoreInterface.Worker<T>, weight = 1): Promise<T> {
const [value, release] = await this.acquire(weight);
async runExclusive<T>(callback: SemaphoreInterface.Worker<T>, weight = 1, priority = 0): Promise<T> {
const [value, release] = await this.acquire(weight, priority);

try {
return await callback(value);
Expand All @@ -30,15 +46,17 @@ class Semaphore implements SemaphoreInterface {
}
}

waitForUnlock(weight = 1): Promise<void> {
waitForUnlock(weight = 1, priority = 0): Promise<void> {
if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`);

return new Promise((resolve) => {
if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = [];
this._weightedWaiters[weight - 1].push(resolve);

this._dispatch();
});
if (this._couldLockImmediately(weight, priority)) {
return Promise.resolve();
} else {
return new Promise((resolve) => {
if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = [];
insertSorted(this._weightedWaiters[weight - 1], { resolve, priority });
});
}
}

isLocked(): boolean {
Expand All @@ -51,36 +69,33 @@ class Semaphore implements SemaphoreInterface {

setValue(value: number): void {
this._value = value;
this._dispatch();
this._dispatchQueue();
}

release(weight = 1): void {
if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`);

this._value += weight;
this._dispatch();
this._dispatchQueue();
}

cancel(): void {
this._weightedQueues.forEach((queue) => queue.forEach((entry) => entry.reject(this._cancelError)));
this._weightedQueues = [];
this._queue.forEach((entry) => entry.reject(this._cancelError));
this._queue = [];
}

private _dispatch(): void {
for (let weight = this._value; weight > 0; weight--) {
const queueEntry = this._weightedQueues[weight - 1]?.shift();
if (!queueEntry) continue;

const previousValue = this._value;
const previousWeight = weight;

this._value -= weight;
weight = this._value + 1;

queueEntry.resolve([previousValue, this._newReleaser(previousWeight)]);
private _dispatchQueue(): void {
this._drainUnlockWaiters();
while (this._queue.length > 0 && this._queue[0].weight <= this._value) {
this._dispatchItem(this._queue.shift()!);
this._drainUnlockWaiters();
}
}

this._drainUnlockWaiters();
private _dispatchItem(item: QueueEntry): void {
const previousValue = this._value;
this._value -= item.weight;
item.resolve([previousValue, this._newReleaser(item.weight)]);
}

private _newReleaser(weight: number): () => void {
Expand All @@ -95,16 +110,46 @@ class Semaphore implements SemaphoreInterface {
}

private _drainUnlockWaiters(): void {
for (let weight = this._value; weight > 0; weight--) {
if (!this._weightedWaiters[weight - 1]) continue;

this._weightedWaiters[weight - 1].forEach((waiter) => waiter());
this._weightedWaiters[weight - 1] = [];
if (this._queue.length === 0) {
for (let weight = this._value; weight > 0; weight--) {
const waiters = this._weightedWaiters[weight - 1];
if (!waiters) continue;
waiters.forEach((waiter) => waiter.resolve());
this._weightedWaiters[weight - 1] = [];
}
} else {
const queuedPriority = this._queue[0].priority;
for (let weight = this._value; weight > 0; weight--) {
const waiters = this._weightedWaiters[weight - 1];
if (!waiters) continue;
const i = waiters.findIndex((waiter) => waiter.priority <= queuedPriority);
(i === -1 ? waiters : waiters.splice(0, i))
.forEach((waiter => waiter.resolve()));
}
}
}

private _weightedQueues: Array<Array<QueueEntry>> = [];
private _weightedWaiters: Array<Array<() => void>> = [];
private _couldLockImmediately(weight: number, priority: number) {
return (this._queue.length === 0 || this._queue[0].priority < priority) &&
weight <= this._value;
}

private _queue: Array<QueueEntry> = [];
private _weightedWaiters: Array<Array<Waiter>> = [];
}

function insertSorted<T extends Priority>(a: T[], v: T) {
const i = findIndexFromEnd(a, (other) => v.priority <= other.priority);
a.splice(i + 1, 0, v);
}

function findIndexFromEnd<T>(a: T[], predicate: (e: T) => boolean): number {
for (let i = a.length - 1; i >= 0; i--) {
if (predicate(a[i])) {
return i;
}
}
return -1;
}

export default Semaphore;
6 changes: 3 additions & 3 deletions src/SemaphoreInterface.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
interface SemaphoreInterface {
acquire(weight?: number): Promise<[number, SemaphoreInterface.Releaser]>;
acquire(weight?: number, priority?: number): Promise<[number, SemaphoreInterface.Releaser]>;

runExclusive<T>(callback: SemaphoreInterface.Worker<T>, weight?: number): Promise<T>;
runExclusive<T>(callback: SemaphoreInterface.Worker<T>, weight?: number, priority?: number): Promise<T>;

waitForUnlock(weight?: number): Promise<void>;
waitForUnlock(weight?: number, priority?: number): Promise<void>;

isLocked(): boolean;

Expand Down
41 changes: 32 additions & 9 deletions src/withTimeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ export function withTimeout(mutex: MutexInterface, timeout: number, timeoutError
export function withTimeout(semaphore: SemaphoreInterface, timeout: number, timeoutError?: Error): SemaphoreInterface;
export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: number, timeoutError = E_TIMEOUT): any {
return {
acquire: (weight?: number): Promise<MutexInterface.Releaser | [number, SemaphoreInterface.Releaser]> => {
acquire: (weightOrPriority?: number, priority?: number): Promise<MutexInterface.Releaser | [number, SemaphoreInterface.Releaser]> => {
let weight: number | undefined;
if (isSemaphore(sync)) {
weight = weightOrPriority;
} else {
weight = undefined;
priority = weightOrPriority;
}
if (weight !== undefined && weight <= 0) {
throw new Error(`invalid weight ${weight}: must be positive`);
}
Expand All @@ -21,8 +28,10 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
}, timeout);

try {
const ticket = await sync.acquire(weight);

const ticket = await (isSemaphore(sync)
? sync.acquire(weight, priority)
: sync.acquire(priority)
);
if (isTimeout) {
const release = Array.isArray(ticket) ? ticket[1] : ticket;

Expand All @@ -41,11 +50,11 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
});
},

async runExclusive<T>(callback: (value?: number) => Promise<T> | T, weight?: number): Promise<T> {
async runExclusive<T>(callback: (value?: number) => Promise<T> | T, weight?: number, priority?: number): Promise<T> {
let release: () => void = () => undefined;

try {
const ticket = await this.acquire(weight);
const ticket = await this.acquire(weight, priority);

if (Array.isArray(ticket)) {
release = ticket[1];
Expand All @@ -69,16 +78,26 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
return sync.cancel();
},

waitForUnlock: (weight?: number): Promise<void> => {
waitForUnlock: (weightOrPriority?: number, priority?: number): Promise<void> => {
let weight: number | undefined;
if (isSemaphore(sync)) {
weight = weightOrPriority;
} else {
weight = undefined;
priority = weightOrPriority;
}
if (weight !== undefined && weight <= 0) {
throw new Error(`invalid weight ${weight}: must be positive`);
}

return new Promise((resolve, reject) => {
const handle = setTimeout(() => reject(timeoutError), timeout);
sync.waitForUnlock(weight).then(() => {
clearTimeout(handle);
resolve();
(isSemaphore(sync)
? sync.waitForUnlock(weight, priority)
: sync.waitForUnlock(priority)
).then(() => {
clearTimeout(handle);
resolve();
});
});
},
Expand All @@ -90,3 +109,7 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
setValue: (value: number) => (sync as SemaphoreInterface).setValue(value),
};
}

function isSemaphore(sync: SemaphoreInterface | MutexInterface): sync is SemaphoreInterface {
return (sync as SemaphoreInterface).getValue !== undefined;
}
Loading

0 comments on commit 43e8858

Please sign in to comment.