Skip to content

Commit

Permalink
feat: queue to spawn workers (#255)
Browse files Browse the repository at this point in the history
* feat: queue to spawn workers

* fix: using optional chaining
  • Loading branch information
MARCROCK22 committed Aug 25, 2024
1 parent 498c66e commit 633b19c
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 19 deletions.
24 changes: 18 additions & 6 deletions src/client/workerclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type {
WorkerSendResultPayload,
WorkerSendShardInfo,
WorkerShardInfo,
WorkerShardsConnected,
WorkerStart,
} from '../websocket/discord/worker';
import type { ManagerMessages } from '../websocket/discord/workermanager';
Expand Down Expand Up @@ -225,6 +226,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
const onPacket = this.onPacket.bind(this);
const handlePayload = this.options?.handlePayload?.bind(this);
const self = this;
const { sendPayloadToParent } = this.options;
for (const id of workerData.shards) {
let shard = this.shards.get(id);
if (!shard) {
Expand All @@ -241,12 +243,13 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
async handlePayload(shardId, payload) {
await handlePayload?.(shardId, payload);
await onPacket?.(payload, shardId);
self.postMessage({
workerId: workerData.workerId,
shardId,
type: 'RECEIVE_PAYLOAD',
payload,
} satisfies WorkerReceivePayload);
if (sendPayloadToParent)
self.postMessage({
workerId: workerData.workerId,
shardId,
type: 'RECEIVE_PAYLOAD',
payload,
} satisfies WorkerReceivePayload);
},
});
this.shards.set(id, shard);
Expand Down Expand Up @@ -418,6 +421,13 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
this.applicationId = packet.d.application.id;
this.me = Transformers.ClientUser(this, packet.d.user, packet.d.application) as never;
await this.events?.execute(packet.t as never, packet, this, shardId);
if ([...this.shards.values()].every(shard => shard.data.session_id)) {
this.postMessage({
type: 'WORKER_SHARDS_CONNECTED',
workerId: this.workerId,
} as WorkerShardsConnected);
await this.events?.runEvent('WORKER_SHARDS_CONNECTED', this, this.me, -1);
}
if (
!(
this.__handleGuilds?.size &&
Expand Down Expand Up @@ -460,4 +470,6 @@ interface WorkerClientOptions extends BaseClientOptions {
handlePayload?: ShardManagerOptions['handlePayload'];
gateway?: ClientOptions['gateway'];
postMessage?: (body: unknown) => unknown;
/** can have perfomance issues in big bots if the client sends every event, specially in startup (false by default) */
sendPayloadToParent?: boolean;
}
4 changes: 4 additions & 0 deletions src/events/hooks/custom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ export const BOT_READY = (_self: UsingClient, me: ClientUserStructure) => {
export const WORKER_READY = (_self: UsingClient, me: ClientUserStructure) => {
return me;
};

export const WORKER_SHARDS_CONNECTED = (_self: UsingClient, me: ClientUserStructure) => {
return me;
};
2 changes: 2 additions & 0 deletions src/websocket/discord/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export type WorkerSendCacheRequest = CreateWorkerMessage<
export type WorkerSendShardInfo = CreateWorkerMessage<'SHARD_INFO', WorkerShardInfo & { nonce: string }>;
export type WorkerSendInfo = CreateWorkerMessage<'WORKER_INFO', WorkerInfo & { nonce: string }>;
export type WorkerReady = CreateWorkerMessage<'WORKER_READY'>;
export type WorkerShardsConnected = CreateWorkerMessage<'WORKER_SHARDS_CONNECTED'>;
export type WorkerStart = CreateWorkerMessage<'WORKER_START'>;
export type WorkerSendApiRequest = CreateWorkerMessage<
'WORKER_API_REQUEST',
Expand Down Expand Up @@ -87,6 +88,7 @@ export type WorkerMessage =
| WorkerSendShardInfo
| WorkerSendInfo
| WorkerReady
| WorkerShardsConnected
| WorkerSendApiRequest
| WorkerSendEvalResponse
| WorkerSendEval
Expand Down
42 changes: 29 additions & 13 deletions src/websocket/discord/workermanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export class WorkerManager extends Map<
options!: MakePartial<Required<WorkerManagerOptions>, 'adapter'>;
debugger?: Logger;
connectQueue!: ConnectQueue;
workerQueue: (() => void)[] = [];
cacheAdapter: Adapter;
promises = new Map<string, { resolve: (value: any) => void; timeout: NodeJS.Timeout }>();
rest!: ApiHandler;
Expand Down Expand Up @@ -135,20 +136,22 @@ export class WorkerManager extends Map<
if (!worker_threads) throw new Error('Cannot prepare workers without worker_threads.');

for (let i = 0; i < shards.length; i++) {
let worker = this.get(i);
if (!worker) {
worker = this.createWorker({
path: this.options.path,
debug: this.options.debug,
token: this.options.token,
shards: shards[i],
intents: this.options.intents,
workerId: i,
workerProxy: this.options.workerProxy,
totalShards: this.totalShards,
mode: this.options.mode,
const workerExists = this.has(i);
if (!workerExists) {
this.workerQueue.push(() => {
const worker = this.createWorker({
path: this.options.path,
debug: this.options.debug,
token: this.options.token,
shards: shards[i],
intents: this.options.intents,
workerId: i,
workerProxy: this.options.workerProxy,
totalShards: this.totalShards,
mode: this.options.mode,
});
this.set(i, worker);
});
this.set(i, worker);
}
}
}
Expand Down Expand Up @@ -288,6 +291,17 @@ export class WorkerManager extends Map<
}
}
break;
case 'WORKER_SHARDS_CONNECTED':
{
const nextWorker = this.workerQueue.shift();
if (nextWorker) {
this.debugger?.info('Spawning next worker');
nextWorker();
} else {
this.debugger?.info('No more workers to spawn left');
}
}
break;
case 'WORKER_API_REQUEST':
{
const response = await this.rest.request(message.method, message.url, message.requestOptions);
Expand Down Expand Up @@ -431,6 +445,8 @@ export class WorkerManager extends Map<

const spaces = this.prepareSpaces();
await this.prepareWorkers(spaces);
// Start workers queue
return this.workerQueue.shift()?.();
}
}

Expand Down

0 comments on commit 633b19c

Please sign in to comment.