diff --git a/src/lib/options.ts b/src/lib/options.ts index e1cff6f3..94a140ab 100644 --- a/src/lib/options.ts +++ b/src/lib/options.ts @@ -1,6 +1,21 @@ -export default { +// TODO, keep defaults there too? +import type { ValidationOptions } from "./validateAndCoerceTypes"; +import type { QueueOptions } from "./queue"; + +export interface Options { + queue: QueueOptions; + validation: ValidationOptions; +} + +const options: Options = { + queue: { + concurrency: 4, // Min: 1, Max: Infinity + timeout: 60, + }, validation: { logErrors: true, logOptionsErrors: true, }, }; + +export default options; diff --git a/src/lib/queue.ts b/src/lib/queue.ts new file mode 100644 index 00000000..0686d099 --- /dev/null +++ b/src/lib/queue.ts @@ -0,0 +1,48 @@ +interface Job { + func: Function; + resolve: Function; + reject: Function; +} + +export interface QueueOptions { + _queue?: Queue; + concurrency?: number; + timeout?: number; // TODO +} + +export default class Queue { + concurrency: number = 1; + + _running: number = 0; + _queue: Array = []; + + constructor(opts: QueueOptions = {}) { + if (opts.concurrency) this.concurrency = opts.concurrency; + } + + runNext() { + const job = this._queue.shift(); + if (!job) return; + + this._running++; + job + .func() + .then((result: any) => job.resolve(result)) + .catch((error: any) => job.reject(error)) + .finally(() => { + this._running--; + this.checkQueue(); + }); + } + + checkQueue() { + if (this._running < this.concurrency) this.runNext(); + } + + add(func: Function) { + return new Promise((resolve, reject) => { + this._queue.push({ func, resolve, reject }); + this.checkQueue(); + }); + } +} diff --git a/src/lib/validateAndCoerceTypes.ts b/src/lib/validateAndCoerceTypes.ts index 89cfc8bb..87d9160a 100644 --- a/src/lib/validateAndCoerceTypes.ts +++ b/src/lib/validateAndCoerceTypes.ts @@ -160,7 +160,7 @@ export function resolvePath(obj: any, instancePath: string) { return ref; } -interface ValidationOptions { +export interface ValidationOptions { logErrors: boolean; logOptionsErrors: boolean; } diff --git a/src/lib/yahooFinanceFetch.spec.ts b/src/lib/yahooFinanceFetch.spec.ts index b95f4610..254390cb 100644 --- a/src/lib/yahooFinanceFetch.spec.ts +++ b/src/lib/yahooFinanceFetch.spec.ts @@ -1,9 +1,17 @@ +import * as util from "util"; + +import Queue from "./queue"; import _yahooFinanceFetch from "./yahooFinanceFetch"; import errors from "./errors"; import _env from "../env-node"; import _opts from "./options"; +// https://dev.to/devcrafter91/elegant-way-to-check-if-a-promise-is-pending-577g +function isPending(promise: any) { + return util.inspect(promise).includes("pending"); +} + describe("yahooFinanceFetch", () => { const yahooFinanceFetch = _yahooFinanceFetch.bind({ _env, _opts }); @@ -50,4 +58,114 @@ describe("yahooFinanceFetch", () => { ) ).rejects.toBeInstanceOf(Error); }); + + describe("concurrency", () => { + /* + process.on("unhandledRejection", (up) => { + console.error("Unhandled promise rejection!"); + throw up; + }); + */ + + function immediate() { + return new Promise((resolve) => { + setImmediate(resolve); + }); + } + + function makeFetch() { + function fetch() { + return new Promise((resolve, reject) => { + fetch.fetches.push({ + resolve, + reject, + resolveWith(obj: any) { + resolve({ + ok: true, + async json() { + return obj; + }, + }); + return immediate(); + }, + }); + }); + } + fetch.fetches = [] as any[]; + fetch.reset = function reset() { + // TODO check that all are resolved/rejected + fetch.fetches = []; + }; + return fetch; + } + + let env: any; + let yahooFinanceFetch: any; + let moduleOpts: any; + + beforeEach(() => { + env = { ..._env, fetch: makeFetch() }; + yahooFinanceFetch = _yahooFinanceFetch.bind({ _env: env, _opts }); + moduleOpts = { queue: { _queue: new Queue() } }; + }); + + it("Queue takes options in constructor", () => { + const queue = new Queue({ concurrency: 5 }); + expect(queue.concurrency).toBe(5); + }); + + it("yahooFinanceFetch branch check for alternate queue", () => { + const promises = [ + yahooFinanceFetch("", {}), + yahooFinanceFetch("", {}, {}), + yahooFinanceFetch("", {}, { queue: {} }), + ]; + + env.fetch.fetches[0].resolveWith({ ok: true }); + env.fetch.fetches[1].resolveWith({ ok: true }); + env.fetch.fetches[2].resolveWith({ ok: true }); + + return Promise.all(promises); + }); + + it("assert defualts to {} for empty queue opts", () => { + moduleOpts.queue.concurrency = 1; + const opts = { ..._opts }; + // @ts-ignore: intentional to test runtime failures + delete opts.queue; + const yahooFinanceFetch = _yahooFinanceFetch.bind({ _env: env, _opts }); + + const promise = yahooFinanceFetch("", {}, moduleOpts); + env.fetch.fetches[0].resolveWith({ ok: true }); + return expect(promise).resolves.toMatchObject({ ok: true }); + }); + + it("single item in queue", () => { + moduleOpts.queue.concurrency = 1; + + const promise = yahooFinanceFetch("", {}, moduleOpts); + env.fetch.fetches[0].resolveWith({ ok: true }); + return expect(promise).resolves.toMatchObject({ ok: true }); + }); + + it("waits if exceeding concurrency max", async () => { + moduleOpts.queue.concurrency = 1; + + const promises = [ + yahooFinanceFetch("", {}, moduleOpts), + yahooFinanceFetch("", {}, moduleOpts), + ]; + + // Second func should not be called until 1st reoslves (limit 1) + expect(env.fetch.fetches.length).toBe(1); + + await env.fetch.fetches[0].resolveWith({ ok: true }); + expect(env.fetch.fetches.length).toBe(2); + + await env.fetch.fetches[1].resolveWith({ ok: true }); + await Promise.all(promises); + }); + + // TODO, timeout test + }); }); diff --git a/src/lib/yahooFinanceFetch.ts b/src/lib/yahooFinanceFetch.ts index b9ea6c59..292dbd2e 100644 --- a/src/lib/yahooFinanceFetch.ts +++ b/src/lib/yahooFinanceFetch.ts @@ -1,3 +1,8 @@ +import Queue from "./queue"; + +import type { Options } from "./options"; +import type { QueueOptions } from "./queue"; + import errors from "./errors"; import pkg from "../../package.json"; @@ -13,12 +18,27 @@ interface YahooFinanceFetchThisEnv { interface YahooFinanceFetchThis { [key: string]: any; _env: YahooFinanceFetchThisEnv; - _opts: Object; + _opts: Options; } interface YahooFinanceFetchModuleOptions { devel?: string | boolean; fetchOptions?: Object; + queue?: QueueOptions; +} + +const _queue = new Queue(); + +function assertQueueOptions(queue: any, opts: any) { + opts; //? + if ( + typeof opts.concurrency === "number" && + queue.concurrency !== opts.concurrency + ) + queue.concurrency = opts.concurrency; + + if (typeof opts.timeout === "number" && queue.timeout !== opts.timeout) + queue.timeout = opts.timeout; } async function yahooFinanceFetch( @@ -33,6 +53,9 @@ async function yahooFinanceFetch( "yahooFinanceFetch called without this._env set" ); + const queue = moduleOpts.queue?._queue || _queue; + assertQueueOptions(queue, { ...this._opts.queue, ...moduleOpts.queue }); + const { URLSearchParams, fetch, fetchDevel } = this._env; // @ts-ignore TODO copy interface? @types lib? @@ -52,7 +75,7 @@ async function yahooFinanceFetch( // used in moduleExec.ts if (func === "csv") func = "text"; - const res = await fetchFunc(url, fetchOptions); + const res = (await queue.add(() => fetchFunc(url, fetchOptions))) as any; const result = await res[func](); /*