Skip to content

Commit

Permalink
Implement a windowed batch shcheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
yornaath committed Nov 10, 2023
1 parent 06a4fb9 commit c1ce344
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 18 deletions.
8 changes: 4 additions & 4 deletions packages/batshit/.turbo/turbo-test.log
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@

RUN v0.26.3 /Users/jorntangen/progg/batshit/packages/batshit

✓ tests/index.test.ts (14 tests) 354ms
✓ tests/index.test.ts (22 tests) 380ms

Test Files 1 passed (1)
Tests 14 passed (14)
Start at 16:46:42
Duration 834ms (transform 231ms, setup 0ms, collect 58ms, tests 354ms)
Tests 22 passed (22)
Start at 12:41:52
Duration 818ms (transform 210ms, setup 0ms, collect 54ms, tests 380ms)

57 changes: 45 additions & 12 deletions packages/batshit/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ export type BatcherScheduler = {
* @param latest number - time stamp of the latest queued fetch.
* @returns number - the number of ms to wait from latest queued fetch until executing batchh fetch call.
*/
(start: number, latest: number): number;
(start: number, latest: number, batchSize: number): Schedule;
};

export type Schedule = number | "immediate";

export type BatcherMemory<T, Q> = {
seq: number;
batch: Set<Q>;
Expand Down Expand Up @@ -108,14 +110,22 @@ export const create = <T, Q, R = T>(

devtools?.create({ seq: mem.seq });

const nextBatch = () => {
mem.batch = new Set();
mem.currentRequest = deferred<T>();
mem.timer = undefined;
mem.start = null;
mem.latest = null;
};

const fetch = (query: Q): Promise<R> => {
if (!mem.start) mem.start = Date.now();
mem.latest = Date.now();

mem.batch.add(query);
clearTimeout(mem.timer);

const scheduled = scheduler(mem.start, mem.latest);
const scheduled = scheduler(mem.start, mem.latest, mem.batch.size);

devtools?.queue({
seq: mem.seq,
Expand All @@ -126,18 +136,14 @@ export const create = <T, Q, R = T>(
start: mem.start,
});

mem.timer = setTimeout(() => {
const fetchBatch = () => {
const currentSeq = mem.seq;
const req = config.fetcher([...mem.batch]);
const currentRequest = mem.currentRequest;

devtools?.fetch({ seq: currentSeq, batch: [...mem.batch] });

mem.batch = new Set();
mem.currentRequest = deferred<T>();
mem.timer = undefined;
mem.start = null;
mem.latest = null;
nextBatch();

req
.then((data) => {
Expand All @@ -150,11 +156,20 @@ export const create = <T, Q, R = T>(
});

mem.seq++;
}, scheduled);

return mem.currentRequest.value.then((items) =>
config.resolver(items, query)
);
return req;
};

if (scheduled === "immediate") {
const req = mem.currentRequest;
fetchBatch();
return req.value.then((items) => config.resolver(items, query));
} else {
mem.timer = setTimeout(fetchBatch, scheduled);
return mem.currentRequest.value.then((items) =>
config.resolver(items, query)
);
}
};

return { fetch };
Expand Down Expand Up @@ -207,3 +222,21 @@ export const windowScheduler: (ms: number) => BatcherScheduler =
export const bufferScheduler: (ms: number) => BatcherScheduler = (ms) => () => {
return ms;
};

/**
* Give a window in ms where all queued fetched made within the window will be batched into
* one singler batch fetch call.
*
* @param ms number
* @returns BatcherScheduler
*/
export const windowedBatchScheduler: (config: {
windowMs: number;
maxBatchSize: number;
}) => BatcherScheduler =
({ windowMs: ms, maxBatchSize }) =>
(start, latest, batchSize) => {
if (batchSize >= maxBatchSize) return "immediate";
const spent = latest - start;
return ms - spent;
};
98 changes: 98 additions & 0 deletions packages/batshit/tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
windowScheduler,
keyResolver,
indexedResolver,
windowedBatchScheduler,
} from "../src/index";
import * as mock from "./mock";

Expand Down Expand Up @@ -229,6 +230,103 @@ const tests = () => {

expect(all).toEqual(mock.users);
});

test("handle undefined responses", async () => {
const batcher = create({
fetcher: async (ids: number[]) => {
return mock.usersByIds(ids);
},
resolver: (items, id) => items.find((item) => item.id === id) ?? null,
});

const all = await Promise.all([batcher.fetch(2), batcher.fetch(100)]);

expect(all).toEqual([{ id: 2, name: "Alice" }, null]);
});

test("immediate scheduled batchers should work", async () => {
let fetchCounter = 0;
const batcher = create({
fetcher: async (ids: number[]) => {
fetchCounter++;
return mock.usersByIds(ids);
},
scheduler: () => "immediate",
resolver: keyResolver("id"),
});

const one = batcher.fetch(1);
const two = batcher.fetch(2);

expect(await one).toEqual({ id: 1, name: "Bob" });
expect(await two).toEqual({ id: 2, name: "Alice" });

expect(fetchCounter).toEqual(2);
});

describe("windowedBatchScheduler", () => {
test("should batch within a window, but fetch immediatly if batch size 2 is reached", async () => {
let fetchCounter = 0;
const batcher = create({
fetcher: async (ids: number[]) => {
fetchCounter++;
return mock.usersByIds(ids);
},
scheduler: windowedBatchScheduler({
windowMs: 10,
maxBatchSize: 2,
}),
resolver: keyResolver("id"),
});

const one = batcher.fetch(1);
const two = batcher.fetch(2);
const three = batcher.fetch(3);
const four = batcher.fetch(4);

const all = await Promise.all([one, two, three, four]);

expect(fetchCounter).toBe(2);

expect(all).toEqual([
{ id: 1, name: "Bob" },
{ id: 2, name: "Alice" },
{ id: 3, name: "Sally" },
{ id: 4, name: "John" },
]);
});

test("should batch within a window, but fetch immediatly if batch size 1 is reached", async () => {
let fetchCounter = 0;
const batcher = create({
fetcher: async (ids: number[]) => {
fetchCounter++;
return mock.usersByIds(ids);
},
scheduler: windowedBatchScheduler({
windowMs: 10,
maxBatchSize: 1,
}),
resolver: keyResolver("id"),
});

const one = batcher.fetch(1);
const two = batcher.fetch(2);
const three = batcher.fetch(3);
const four = batcher.fetch(4);

const all = await Promise.all([one, two, three, four]);

expect(fetchCounter).toBe(4);

expect(all).toEqual([
{ id: 1, name: "Bob" },
{ id: 2, name: "Alice" },
{ id: 3, name: "Sally" },
{ id: 4, name: "John" },
]);
});
});
};

describe("batcher", tests);
Expand Down
3 changes: 2 additions & 1 deletion packages/devtools/src/devtools.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { Schedule } from "@yornaath/batshit";
import {
BatshitEvent,
CreateEvent,
Expand Down Expand Up @@ -35,7 +36,7 @@ export type DevtoolsListener<T, Q> = {
batch: Q[];
start: number;
latest: number;
scheduled: number;
scheduled: Schedule;
}) => void;
fetch: (_event: { seq: number; batch: Q[] }) => void;
data: (_event: { seq: number; data: T }) => void;
Expand Down
4 changes: 3 additions & 1 deletion packages/devtools/src/events.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { Schedule } from "@yornaath/batshit";

export type BatshitEvent<Q, T> =
| CreateEvent<T, Q>
| QueueEvent<T, Q>
Expand All @@ -18,7 +20,7 @@ export type QueueEvent<T, Q> = {
batch: Q[];
start: number;
latest: number;
scheduled: number;
scheduled: Schedule;
};

export type FetchEvent<T, Q> = {
Expand Down

0 comments on commit c1ce344

Please sign in to comment.