diff --git a/src/client.ts b/src/client.ts index de49fba..cb304ba 100644 --- a/src/client.ts +++ b/src/client.ts @@ -315,6 +315,218 @@ export class APIClient extends BaseAPIClient { tasksState: QueueTasksState, ): void { const initialUrl = `/api/v1/groups/${groupId}/users?limit=1000`; + this.scheduleRequest(initialUrl, iteratee, limiter, tasksState, (err) => { + if (err.status === 404) { + //ignore it. It's probably a group that got deleted between steps + } else { + err.groupId = groupId; + throw err; + } + }); + } + + /** + * Iterates each Multi-Factor Authentication device assigned to a given user. + * + * @param iteratee receives each resource to produce relationships + */ + public async iterateFactorDevicesForUser( + userId: string, + iteratee: ResourceIteratee, + ): Promise { + try { + // Okta API does not currently allow a limit to be specified on the list + // factors API. + // + // See: https://developer.okta.com/docs/reference/api/factors/#list-enrolled-factors + const response = await this.retryableRequest( + `/api/v1/users/${userId}/factors`, + ); + const factors = await response.json(); + for (const factor of factors) { + await iteratee(factor); + } + } catch (err) { + if (err.status === 404) { + //ignore it. It's probably a user that got deleted between steps + } else { + throw err; + } + } + } + + /** + * Iterates each device resource in the provider. + * + * @param iteratee receives each resource to produce entities/relationships + */ + public async iterateDevices( + iteratee: ResourceIteratee, + ): Promise { + for await (const device of this.iteratePages( + '/api/v1/devices?limit=200&expand=user', + )) { + await iteratee(device); + } + } + + /** + * Iterates each application resource in the provider. + * + * @param iteratee receives each resource to produce entities/relationships + */ + public async iterateApplications( + iteratee: ResourceIteratee, + ): Promise { + for await (const application of this.iteratePages( + '/api/v1/apps?limit=200', + )) { + await iteratee(application); + } + } + + /** + * Iterates each group assigned to a given application. + * + * @param iteratee receives each resource to produce entities/relationships + */ + public async iterateGroupsForApp( + appId: string, + iteratee: ResourceIteratee, + ): Promise { + try { + for await (const group of this.iteratePages( + `/api/v1/apps/${appId}/groups?limit=200`, + )) { + await iteratee(group); + } + } catch (err) { + if (err.status === 404) { + //ignore it. It's probably an app that got deleted between steps + } else { + throw err; + } + } + } + + public async getAppUsersLimit(appId: string): Promise { + const response = await this.retryableRequest( + `/api/v1/apps/${appId}/users?limit=1`, + ); + await response.text(); // Consume body to avoid memory leaks + if (!response.headers.has('x-rate-limit-limit')) { + return; + } + const limitHeader = response.headers.get('x-rate-limit-limit'); + return parseInt(limitHeader as string, 10); + } + + /** + * Iterates each individual user assigned to a given application. + * + * @param iteratee receives each resource to produce entities/relationships + */ + public iterateUsersForApp( + appId: string, + iteratee: ResourceIteratee, + limiter: Bottleneck, + tasksState: QueueTasksState, + ) { + const initialUrl = `/api/v1/apps/${appId}/users?limit=500`; + this.scheduleRequest(initialUrl, iteratee, limiter, tasksState, (err) => { + if (err.status === 404) { + //ignore it. It's probably an app that got deleted between steps + } else { + throw err; + } + }); + } + + /** + * Iterates each rule resource in the provider. + * + * @param iteratee receives each resource to produce entities/relationships + */ + public async iterateRules( + iteratee: ResourceIteratee, + ): Promise { + try { + for await (const rule of this.iteratePages( + '/api/v1/groups/rules?limit=200', + )) { + await iteratee(rule); + } + } catch (err) { + //per https://developer.okta.com/docs/reference/error-codes/ + if (/\/api\/v1\/groups\/rules/.test(err.url) && err.status === 400) { + this.logger.info( + 'Rules not enabled for this account. Skipping processing of Okta Rules.', + ); + } else { + throw err; + } + } + } + + public async getSupportInfo(): Promise { + const response = await this.retryableRequest( + '/api/v1/org/privacy/oktaSupport', + ); + return await response.json(); + } + + public async iterateRolesByUser( + userId: string, + iteratee: ResourceIteratee, + ): Promise { + const response = await this.retryableRequest( + `/api/v1/users/${userId}/roles`, + ); + const roles = await response.json(); + for (const role of roles) { + await iteratee(role); + } + } + + public async iterateRolesByGroup( + groupId: string, + iteratee: ResourceIteratee, + ): Promise { + const response = await this.retryableRequest( + `/api/v1/groups/${groupId}/roles`, + ); + const roles = await response.json(); + for (const role of roles) { + await iteratee(role); + } + } + + public async iterateAppCreatedLogs( + iteratee: ResourceIteratee, + ): Promise { + // Use filter to only find instances of a newly created application. + // We must specify 'since' to a time far in the past, otherwise we + // will only get the last 7 days of data. Okta only saves the last + // 90 days, so this is not us limiting what we're able to get. + const daysAgo = Date.now() - NINETY_DAYS_AGO; + const startDate = new Date(daysAgo); + const filter = + 'eventType eq "application.lifecycle.update" and debugContext.debugData.requestUri ew "_new_"'; + const url = `/api/v1/logs?filter=${encodeURIComponent( + filter, + )}&since=${startDate.toISOString()}&until=${new Date().toISOString()}`; + for await (const logEvent of this.iteratePages(url)) { + await iteratee(logEvent); + } + } + + private scheduleRequest( + initialUrl: string, + iteratee: ResourceIteratee, + limiter: Bottleneck, + tasksState: QueueTasksState, + onError?: (err: any) => void, + ) { const iteratePages = async (url: string) => { if (tasksState.error) { // Stop processing if an error has occurred in previous tasks @@ -328,10 +540,11 @@ export class APIClient extends BaseAPIClient { if (err.code === 'RATE_LIMIT_REACHED') { // Retry this task after the rate limit is reset void limiter.schedule(() => iteratePages(url)); - } else if (err.status === 404) { - //ignore it. It's probably a group that got deleted between steps + return; + } + if (onError) { + onError(err); } else { - err.groupId = groupId; throw err; } } @@ -517,192 +730,6 @@ export class APIClient extends BaseAPIClient { const retryDate = new Date(parseInt(resetTimestamp, 10) * 1000); return retryDate.getTime() - nowDate.getTime() + 1000; } - - /** - * Iterates each Multi-Factor Authentication device assigned to a given user. - * - * @param iteratee receives each resource to produce relationships - */ - public async iterateFactorDevicesForUser( - userId: string, - iteratee: ResourceIteratee, - ): Promise { - try { - // Okta API does not currently allow a limit to be specified on the list - // factors API. - // - // See: https://developer.okta.com/docs/reference/api/factors/#list-enrolled-factors - const response = await this.retryableRequest( - `/api/v1/users/${userId}/factors`, - ); - const factors = await response.json(); - for (const factor of factors) { - await iteratee(factor); - } - } catch (err) { - if (err.status === 404) { - //ignore it. It's probably a user that got deleted between steps - } else { - throw err; - } - } - } - - /** - * Iterates each device resource in the provider. - * - * @param iteratee receives each resource to produce entities/relationships - */ - public async iterateDevices( - iteratee: ResourceIteratee, - ): Promise { - for await (const device of this.iteratePages( - '/api/v1/devices?limit=200&expand=user', - )) { - await iteratee(device); - } - } - - /** - * Iterates each application resource in the provider. - * - * @param iteratee receives each resource to produce entities/relationships - */ - public async iterateApplications( - iteratee: ResourceIteratee, - ): Promise { - for await (const application of this.iteratePages( - '/api/v1/apps?limit=200', - )) { - await iteratee(application); - } - } - - /** - * Iterates each group assigned to a given application. - * - * @param iteratee receives each resource to produce entities/relationships - */ - public async iterateGroupsForApp( - appId: string, - iteratee: ResourceIteratee, - ): Promise { - try { - for await (const group of this.iteratePages( - `/api/v1/apps/${appId}/groups?limit=200`, - )) { - await iteratee(group); - } - } catch (err) { - if (err.status === 404) { - //ignore it. It's probably an app that got deleted between steps - } else { - throw err; - } - } - } - - /** - * Iterates each individual user assigned to a given application. - * - * @param iteratee receives each resource to produce entities/relationships - */ - public async iterateUsersForApp( - appId: string, - iteratee: ResourceIteratee, - ): Promise { - try { - for await (const user of this.iteratePages( - `/api/v1/apps/${appId}/users?limit=500`, - )) { - await iteratee(user); - } - } catch (err) { - if (err.status === 404) { - //ignore it. It's probably an app that got deleted between steps - } else { - throw err; - } - } - } - - /** - * Iterates each rule resource in the provider. - * - * @param iteratee receives each resource to produce entities/relationships - */ - public async iterateRules( - iteratee: ResourceIteratee, - ): Promise { - try { - for await (const rule of this.iteratePages( - '/api/v1/groups/rules?limit=200', - )) { - await iteratee(rule); - } - } catch (err) { - //per https://developer.okta.com/docs/reference/error-codes/ - if (/\/api\/v1\/groups\/rules/.test(err.url) && err.status === 400) { - this.logger.info( - 'Rules not enabled for this account. Skipping processing of Okta Rules.', - ); - } else { - throw err; - } - } - } - - public async getSupportInfo(): Promise { - const response = await this.retryableRequest( - '/api/v1/org/privacy/oktaSupport', - ); - return await response.json(); - } - - public async iterateRolesByUser( - userId: string, - iteratee: ResourceIteratee, - ): Promise { - const response = await this.retryableRequest( - `/api/v1/users/${userId}/roles`, - ); - const roles = await response.json(); - for (const role of roles) { - await iteratee(role); - } - } - - public async iterateRolesByGroup( - groupId: string, - iteratee: ResourceIteratee, - ): Promise { - const response = await this.retryableRequest( - `/api/v1/groups/${groupId}/roles`, - ); - const roles = await response.json(); - for (const role of roles) { - await iteratee(role); - } - } - - public async iterateAppCreatedLogs( - iteratee: ResourceIteratee, - ): Promise { - // Use filter to only find instances of a newly created application. - // We must specify 'since' to a time far in the past, otherwise we - // will only get the last 7 days of data. Okta only saves the last - // 90 days, so this is not us limiting what we're able to get. - const daysAgo = Date.now() - NINETY_DAYS_AGO; - const startDate = new Date(daysAgo); - const filter = - 'eventType eq "application.lifecycle.update" and debugContext.debugData.requestUri ew "_new_"'; - const url = `/api/v1/logs?filter=${encodeURIComponent( - filter, - )}&since=${startDate.toISOString()}&until=${new Date().toISOString()}`; - for await (const logEvent of this.iteratePages(url)) { - await iteratee(logEvent); - } - } } let client: APIClient | undefined; diff --git a/src/config.ts b/src/config.ts index 652bd2d..e0af68f 100644 --- a/src/config.ts +++ b/src/config.ts @@ -59,7 +59,7 @@ export interface IntegrationConfig extends IntegrationInstanceConfig { } export interface ExecutionConfig { - logGroupMetrics?: boolean; + logMetrics?: boolean; } export async function validateInvocation( diff --git a/src/steps/applications.ts b/src/steps/applications.ts index 426d627..d20d5a5 100644 --- a/src/steps/applications.ts +++ b/src/steps/applications.ts @@ -1,4 +1,5 @@ import { + IntegrationError, IntegrationLogger, IntegrationStep, IntegrationStepExecutionContext, @@ -7,7 +8,7 @@ import { } from '@jupiterone/integration-sdk-core'; import { createAPIClient } from '../client'; -import { IntegrationConfig } from '../config'; +import { ExecutionConfig, IntegrationConfig } from '../config'; import { createAccountApplicationRelationship, createApplicationEntity, @@ -24,6 +25,10 @@ import { } from './constants'; import { OktaApplication } from '../okta/types'; import { buildBatchProcessing } from '../util/buildBatchProcessing'; +import { withConcurrentQueue } from '../util/withConcurrentQueue'; +import Bottleneck from 'bottleneck'; +import { QueueTasksState } from '../types/queue'; +import { createStatsLogger } from '../util/createStatsLogger'; export async function fetchApplications({ instance, @@ -143,65 +148,145 @@ export async function buildUserApplicationRelationships({ instance, jobState, logger, -}: IntegrationStepExecutionContext) { + executionConfig, +}: IntegrationStepExecutionContext) { + const BATCH_SIZE = 500; const apiClient = createAPIClient(instance.config, logger); + const { logMetrics } = executionConfig; + + const stats = { + processedApps: 0, + requestedApps: 0, + relationshipsCreated: 0, + }; + const statsLogger = createStatsLogger(stats, logger, logMetrics); - const processApplicationUsers = async ( + const processApplicationUsers = ( appEntity: StandardizedOktaApplication, + limiter: Bottleneck, + tasksState: QueueTasksState, ) => { const app = getRawData(appEntity) as OktaApplication; const appId = app.id as string; //get the individual users that are assigned to this app (ie. not assigned as part of group) - await apiClient.iterateUsersForApp(appId, async (user) => { - if (!user.id) { - return; - } + apiClient.iterateUsersForApp( + appId, + async (user) => { + if (!user.id) { + return; + } - if (jobState.hasKey(user.id)) { - const relationships: Relationship[] = - createApplicationUserRelationships( - appEntity as StandardizedOktaApplication, - user, - createOnInvalidRoleFormatFunction(logger, { - appId, - userId: user.id, - }), - ); - //these relationships include both USER_ASSIGNED_APPLICATION and USER_ASSIGNED_AWS_IAM_ROLE - //USER_ASSIGNED_APPLICATION will be unique to this user and app pair - //however, multiple apps for that user can use AWS and have the same IAM Role assigned - //therefore, the USER_ASSIGNED_AWS_IAM_ROLE relationship may have been specified in a previous app for this user - for (const rel of relationships) { - if (!jobState.hasKey(rel._key)) { - await jobState.addRelationship(rel); + if (jobState.hasKey(user.id)) { + const relationships: Relationship[] = + createApplicationUserRelationships( + appEntity as StandardizedOktaApplication, + user, + createOnInvalidRoleFormatFunction(logger, { + appId, + userId: user.id, + }), + ); + //these relationships include both USER_ASSIGNED_APPLICATION and USER_ASSIGNED_AWS_IAM_ROLE + //USER_ASSIGNED_APPLICATION will be unique to this user and app pair + //however, multiple apps for that user can use AWS and have the same IAM Role assigned + //therefore, the USER_ASSIGNED_AWS_IAM_ROLE relationship may have been specified in a previous app for this user + for (const rel of relationships) { + if (!jobState.hasKey(rel._key)) { + await jobState.addRelationship(rel); + stats.relationshipsCreated++; + statsLogger( + `[${Steps.USER_APP_RELATIONSHIP}] Added relationships`, + ); + } } + } else { + logger.warn( + { appId: app.id, appName: app.name, userId: user.id }, + '[SKIP] User not found in job state, could not build relationship to application', + ); } - } else { - logger.warn( - { appId: app.id, appName: app.name, userId: user.id }, - '[SKIP] User not found in job state, could not build relationship to application', - ); - } - }); + }, + limiter, + tasksState, + ); }; - const { withBatchProcessing, flushBatch } = - buildBatchProcessing({ - processCallback: processApplicationUsers, - batchSize: 200, - concurrency: 4, - }); + let firstAppId: string | undefined; + try { + await jobState.iterateEntities( + { _type: Entities.APPLICATION._type }, + (entity: StandardizedOktaApplication) => { + const rawEntity = getRawData(entity) as OktaApplication; + firstAppId = rawEntity?.id; + if (firstAppId) { + throw new IntegrationError({ + code: 'STOP_ITERATION', + message: 'Stop iteration', + }); + } + }, + ); + } catch (err) { + // Stop iteration + } - await jobState.iterateEntities( - { _type: Entities.APPLICATION._type }, - async (entity: StandardizedOktaApplication) => { - await withBatchProcessing(entity); - }, + if (!firstAppId) { + logger.warn('No applications found to process user relationships'); + return; + } + + const limit = await apiClient.getAppUsersLimit(firstAppId); + const maxConcurrent = limit ? limit * 0.5 : 20; // 50% of the limit. + logger.info( + `[${Steps.USER_APP_RELATIONSHIP}] Calculated concurrent requests: ${maxConcurrent}`, ); - await flushBatch(); + let appBatch: StandardizedOktaApplication[] = []; + const processBatch = async ( + limiter: Bottleneck, + tasksState: QueueTasksState, + waitForCompletion: () => Promise, + ) => { + for (const app of appBatch) { + stats.processedApps++; + processApplicationUsers(app, limiter, tasksState); + } + await waitForCompletion(); + + stats.requestedApps += appBatch.length; + logger.info( + { stats }, + `[${Steps.USER_APP_RELATIONSHIP}] Finished requesting apps batch`, + ); + + appBatch = []; + }; + await withConcurrentQueue( + { + maxConcurrent, + logger, + logPrefix: `[${Steps.USER_APP_RELATIONSHIP}]`, + logQueueState: true, + }, + async (limiter, tasksState, waitForCompletion) => { + await jobState.iterateEntities( + { _type: Entities.APPLICATION._type }, + async (entity: StandardizedOktaApplication) => { + appBatch.push(entity); + if (appBatch.length >= BATCH_SIZE) { + await processBatch(limiter, tasksState, waitForCompletion); + } + }, + ); + + // Process the remaining apps + if (appBatch.length) { + await processBatch(limiter, tasksState, waitForCompletion); + } + }, + ); } function createOnInvalidRoleFormatFunction( diff --git a/src/steps/groups.ts b/src/steps/groups.ts index 3345deb..a02dce5 100644 --- a/src/steps/groups.ts +++ b/src/steps/groups.ts @@ -1,5 +1,4 @@ import { - IntegrationLogger, IntegrationStep, IntegrationStepExecutionContext, IntegrationWarnEventName, @@ -25,9 +24,9 @@ import { USER_GROUP_IDS, } from './constants'; import { Group } from '@okta/okta-sdk-nodejs'; -import Bottleneck from 'bottleneck'; import { chunk } from 'lodash'; -import { QueueTasksState } from '../types/queue'; +import { withConcurrentQueue } from '../util/withConcurrentQueue'; +import { createStatsLogger } from '../util/createStatsLogger'; export async function fetchGroups({ instance, @@ -36,7 +35,7 @@ export async function fetchGroups({ executionConfig, }: IntegrationStepExecutionContext) { const apiClient = createAPIClient(instance.config, logger); - const { logGroupMetrics } = executionConfig; + const { logMetrics } = executionConfig; const accountEntity = (await jobState.getData( DATA_ACCOUNT_ENTITY, @@ -86,7 +85,7 @@ export async function fetchGroups({ logger.info( { groupCollectionEndTime, - ...(logGroupMetrics && { stats }), + ...(logMetrics && { stats }), }, 'Finished processing groups', ); @@ -215,9 +214,8 @@ async function buildGroupEntityToUserRelationships( return; } const BATCH_SIZE = 500; - const ONE_MINUTE_IN_MS = 60_000; const { instance, logger, jobState, executionConfig } = context; - const { logGroupMetrics } = executionConfig; + const { logMetrics } = executionConfig; const apiClient = createAPIClient(instance.config, logger); @@ -227,117 +225,80 @@ async function buildGroupEntityToUserRelationships( relationshipsCreated: 0, }; const skippedGroups: string[] = []; - const statsLogger = createStatsLogger(stats, logger, logGroupMetrics); + const statsLogger = createStatsLogger(stats, logger, logMetrics); const limit = await apiClient.getGroupUsersLimit(groupIds[0]); const maxConcurrent = limit ? limit * 0.5 : 20; // 50% of the limit. logger.info( `[${groupEntityType}] Calculated concurrent requests: ${maxConcurrent}`, ); - const limiter = new Bottleneck({ - maxConcurrent, - minTime: Math.floor(ONE_MINUTE_IN_MS / maxConcurrent), // space requests evenly over 1 minute. - reservoir: maxConcurrent, - reservoirRefreshAmount: maxConcurrent, - reservoirRefreshInterval: ONE_MINUTE_IN_MS, // refresh every minute. - }); - - const tasksState: QueueTasksState = { - error: undefined, - rateLimitReached: false, - }; - limiter.on('failed', (err) => { - if (err.code === 'ECONNRESET') { - const groupId = err.groupId as string; - logger.warn(`ECONNRESET error for group ${err.groupId}. Skipping.`); - if (typeof groupId === 'string') { - skippedGroups.push(err.groupId); - } - } else { - if (!tasksState.error) { - tasksState.error = err; - // After the first error, reset the limiter to allow all remaining tasks to finish immediately. - limiter.updateSettings({ - reservoir: null, - maxConcurrent: null, - minTime: 0, - reservoirRefreshAmount: null, - reservoirRefreshInterval: null, - }); - } - } - }); - - let resolveIdlePromise: () => void | undefined; - limiter.on('idle', () => { - resolveIdlePromise?.(); - }); - const waitForTasksCompletion = () => { - return new Promise((resolve) => { - resolveIdlePromise = resolve; - }); - }; - - // Log queue state every 5 minutes - const debugLimiterIntervalId = setInterval( - () => { - logger.info(`[${groupEntityType}] ${JSON.stringify(limiter.counts())}`); - }, - 5 * 60 * 1_000, - ); - const groupIdBatches = chunk(groupIds, BATCH_SIZE); + try { - for (const groupIdBatch of groupIdBatches) { - for (const groupId of groupIdBatch) { - stats.processedGroups++; - apiClient.iterateUsersForGroup( - groupId, - async (user) => { - if (!user.id) { - return; - } - const userKey = user.id; - if (jobState.hasKey(userKey)) { - const relationship = createGroupUserRelationship( - groupId, - userKey, - ); - if (!jobState.hasKey(relationship._key)) { - await jobState.addRelationship(relationship); - stats.relationshipsCreated++; - statsLogger(`[${groupEntityType}] Added relationships`); - } - } else { - logger.warn( - { groupId, userId: userKey }, - '[SKIP] User not found in job state, could not build relationship to group', - ); + await withConcurrentQueue( + { + maxConcurrent, + logger, + logPrefix: `[${groupEntityType}]`, + onFailed: (err) => { + if (err.code === 'ECONNRESET') { + const groupId = err.groupId as string; + logger.warn(`ECONNRESET error for group ${err.groupId}. Skipping.`); + if (typeof groupId === 'string') { + skippedGroups.push(err.groupId); } - }, - limiter, - tasksState, - ); - } - await waitForTasksCompletion(); - // Check if any of the tasks has failed with an unrecoverable error - // If so, throw the error to stop the execution. - if (tasksState.error) { - throw tasksState.error; - } - - stats.requestedGroups += groupIdBatch.length; - logger.info( - { stats }, - `[${groupEntityType}] Finished requesting groups batch`, - ); - } + return true; // Ignore the error. + } else { + return false; + } + }, + logQueueState: true, + }, + async (limiter, tasksState, waitForTasksCompletion) => { + for (const groupIdBatch of groupIdBatches) { + for (const groupId of groupIdBatch) { + stats.processedGroups++; + apiClient.iterateUsersForGroup( + groupId, + async (user) => { + if (!user.id) { + return; + } + const userKey = user.id; + if (jobState.hasKey(userKey)) { + const relationship = createGroupUserRelationship( + groupId, + userKey, + ); + if (!jobState.hasKey(relationship._key)) { + await jobState.addRelationship(relationship); + stats.relationshipsCreated++; + statsLogger(`[${groupEntityType}] Added relationships`); + } + } else { + logger.warn( + { groupId, userId: userKey }, + '[SKIP] User not found in job state, could not build relationship to group', + ); + } + }, + limiter, + tasksState, + ); + } + await waitForTasksCompletion(); + + stats.requestedGroups += groupIdBatch.length; + logger.info( + { stats }, + `[${groupEntityType}] Finished requesting groups batch`, + ); + } + }, + ); } catch (err) { logger.error({ err }, 'Failed to build group to user relationships'); throw err; - } finally { - clearInterval(debugLimiterIntervalId); - limiter.removeAllListeners(); } if (skippedGroups.length) { @@ -348,25 +309,6 @@ async function buildGroupEntityToUserRelationships( } } -/** - * Create a function that logs group stats every 5 minutes. - */ -function createStatsLogger( - stats: any, - logger: IntegrationLogger, - logGroupMetrics: boolean | undefined, -) { - const FIVE_MINUTES = 5 * 60 * 1000; - let lastLogTime = Date.now(); - return (message: string) => { - const now = Date.now(); - if (Date.now() - lastLogTime >= FIVE_MINUTES && logGroupMetrics) { - logger.info({ stats }, message); - lastLogTime = now; - } - }; -} - export const groupSteps: IntegrationStep[] = [ { id: Steps.GROUPS, diff --git a/src/util/createStatsLogger.ts b/src/util/createStatsLogger.ts new file mode 100644 index 0000000..9b5f892 --- /dev/null +++ b/src/util/createStatsLogger.ts @@ -0,0 +1,20 @@ +import { IntegrationLogger } from '@jupiterone/integration-sdk-core'; + +/** + * Create a function that logs group stats every 5 minutes. + */ +export function createStatsLogger( + stats: any, + logger: IntegrationLogger, + logMetrics: boolean | undefined, +) { + const FIVE_MINUTES = 5 * 60 * 1000; + let lastLogTime = Date.now(); + return (message: string) => { + const now = Date.now(); + if (Date.now() - lastLogTime >= FIVE_MINUTES && logMetrics) { + logger.info({ stats }, message); + lastLogTime = now; + } + }; +} diff --git a/src/util/withConcurrentQueue.ts b/src/util/withConcurrentQueue.ts new file mode 100644 index 0000000..21ced27 --- /dev/null +++ b/src/util/withConcurrentQueue.ts @@ -0,0 +1,129 @@ +import { IntegrationLogger } from '@jupiterone/integration-sdk-core'; +import Bottleneck from 'bottleneck'; +import { QueueTasksState } from '../types/queue'; + +/** + * Executes a function with managed concurrency using a rate-limiting strategy. + * This function sets up a `Bottleneck` limiter to control task execution, allowing + * tasks to be spread evenly over a defined time period while maintaining a max concurrency. + * + * @param {Object} options - Configuration options for concurrency control. + * @param {number} options.maxConcurrent - The maximum number of concurrent tasks. + * @param {IntegrationLogger} [options.logger] - Optional logger for internal state and errors. + * @param {string} [options.logPrefix] - Optional prefix for log messages. + * @param {boolean} [options.logQueueState] - Set to true to enable periodic logging of the queue state. + * @param {Function} [options.onFailed] - A function that takes an error object and returns a boolean indicating + * whether the error should be ignored. Returning true ignores the error. + * @param {Function} fn - The function to execute that handles task execution. + * This function receives three arguments: + * - limiter (Bottleneck): A Bottleneck instance for managing concurrency. + * - tasksState (QueueTasksState): An object to track errors and rate limiting status. + * - waitForTasksCompletion (() => Promise): A function to call to wait for all tasks to complete. + * @returns {Promise} A promise that resolves when all tasks have been completed or rejects if an error occurs. + * + * @example + * withConcurrentQueue({ + * maxConcurrent: 5, + * logger: console, + * logPrefix: 'TaskQueue', + * logQueueState: true, + * onFailed: err => { + * console.error('Failed task detected:', err); + * return false; // Do not ignore the error + * } + * }, async (limiter, tasksState, waitForCompletion) => { + * for (let i = 0; i < 10; i++) { + * limiter.schedule(() => someAsyncTask(i)); + * } + * await waitForCompletion(); + * }).then(() => { + * console.log('All tasks completed successfully.'); + * }).catch(error => { + * console.error('Error in executing tasks:', error); + * }); + */ +export async function withConcurrentQueue( + options: { + maxConcurrent: number; + logger?: IntegrationLogger; + logPrefix?: string; + logQueueState?: boolean; + onFailed?: (err: any) => boolean; + }, + fn: ( + limiter: Bottleneck, + tasksState: QueueTasksState, + waitForTasksCompletion: () => Promise, + ) => Promise, +): Promise { + const ONE_MINUTE_IN_MS = 60_000; + const { maxConcurrent } = options; + const limiter = new Bottleneck({ + maxConcurrent, + minTime: Math.floor(ONE_MINUTE_IN_MS / maxConcurrent), // space requests evenly over 1 minute. + reservoir: maxConcurrent, + reservoirRefreshAmount: maxConcurrent, + reservoirRefreshInterval: ONE_MINUTE_IN_MS, // refresh every minute. + }); + + const tasksState: QueueTasksState = { + error: undefined, + rateLimitReached: false, + }; + + limiter.on('failed', (err) => { + let ignoreError = false; + if (options.onFailed) { + ignoreError = options.onFailed(err); + } + if (ignoreError) { + return; + } + if (!tasksState.error) { + tasksState.error = err; + // After the first error, reset the limiter to allow all remaining tasks to finish immediately. + limiter.updateSettings({ + reservoir: null, + maxConcurrent: null, + minTime: 0, + reservoirRefreshAmount: null, + reservoirRefreshInterval: null, + }); + } + }); + + let resolveIdlePromise: () => void | undefined; + limiter.on('idle', () => { + resolveIdlePromise?.(); + }); + const waitForTasksCompletion = () => { + return new Promise((resolve) => { + resolveIdlePromise = resolve; + }); + }; + + let debugLimiterIntervalId: NodeJS.Timeout | undefined; + if (options.logQueueState) { + debugLimiterIntervalId = setInterval(() => { + options.logger?.info( + `${options.logPrefix ? `${options.logPrefix} ` : ''}${JSON.stringify(limiter.counts())}`, + ); + }, 30_000); + } + + try { + await fn(limiter, tasksState, async () => { + await waitForTasksCompletion(); + // Check if any of the tasks has failed with an unrecoverable error + // If so, throw the error to stop the execution. + if (tasksState.error) { + throw tasksState.error; + } + }); + } finally { + if (debugLimiterIntervalId) { + clearInterval(debugLimiterIntervalId); + } + limiter.removeAllListeners(); + } +}