diff --git a/src/client.ts b/src/client.ts index ac087b7..341a8a3 100644 --- a/src/client.ts +++ b/src/client.ts @@ -57,9 +57,6 @@ export class APIClient extends BaseAPIClient { super({ baseUrl: config.oktaOrgUrl, logger, - retryOptions: { - timeout: 0, - }, rateLimitThrottling: { threshold: rateLimitThreshold, resetMode: 'datetime_epoch_s', diff --git a/src/steps/roles.ts b/src/steps/roles.ts index d96ddb1..40ec807 100644 --- a/src/steps/roles.ts +++ b/src/steps/roles.ts @@ -3,6 +3,7 @@ import { createIntegrationEntity, IntegrationStep, IntegrationStepExecutionContext, + IntegrationWarnEventName, parseTimePropertyValue, RelationshipClass, } from '@jupiterone/integration-sdk-core'; @@ -125,8 +126,14 @@ export async function fetchRoles({ ); await flushBatch(); } catch (err) { - logger.error({ err }, 'Failed to fetch user roles'); - throw err; + if (err.status === 403) { + logger.publishWarnEvent({ + name: IntegrationWarnEventName.MissingPermission, + description: 'The API key does not have access to fetch user roles.', + }); + } else { + throw err; + } } try { @@ -146,8 +153,14 @@ export async function fetchRoles({ ); await flushBatch(); } catch (err) { - logger.error({ err }, 'Failed to fetch group roles'); - throw err; + if (err.status === 403) { + logger.publishWarnEvent({ + name: IntegrationWarnEventName.MissingPermission, + description: 'The API key does not have access to fetch group roles.', + }); + } else { + throw err; + } } } diff --git a/src/util/withConcurrentQueue.ts b/src/util/withConcurrentQueue.ts index dbee4f2..0e47805 100644 --- a/src/util/withConcurrentQueue.ts +++ b/src/util/withConcurrentQueue.ts @@ -1,7 +1,12 @@ -import { IntegrationLogger } from '@jupiterone/integration-sdk-core'; +import { + IntegrationError, + IntegrationLogger, +} from '@jupiterone/integration-sdk-core'; import Bottleneck from 'bottleneck'; import { QueueTasksState } from '../types/queue'; +const FIVE_MINUTES = 5 * 60 * 1_000; + /** * Executes a function with managed concurrency using a rate-limiting strategy. * This function sets up a `Bottleneck` limiter to control task execution, allowing @@ -66,6 +71,16 @@ export async function withConcurrentQueue( reservoirRefreshInterval: ONE_MINUTE_IN_MS, // refresh every minute. }); + const resetLimiter = () => { + limiter.updateSettings({ + reservoir: null, + maxConcurrent: null, + minTime: 0, + reservoirRefreshAmount: null, + reservoirRefreshInterval: null, + }); + }; + const tasksState: QueueTasksState = { error: undefined, rateLimitReached: false, @@ -82,13 +97,7 @@ export async function withConcurrentQueue( if (!tasksState.error) { tasksState.error = err; // After the first error, reset the limiter to allow all remaining tasks to finish immediately. - limiter.updateSettings({ - reservoir: null, - maxConcurrent: null, - minTime: 0, - reservoirRefreshAmount: null, - reservoirRefreshInterval: null, - }); + resetLimiter(); } }); @@ -102,17 +111,35 @@ export async function withConcurrentQueue( }); }; - let debugLimiterIntervalId: NodeJS.Timeout | undefined; - if (options.logQueueState) { - debugLimiterIntervalId = setInterval( - () => { - options.logger?.info( - `${options.logPrefix ? `${options.logPrefix} ` : ''}${JSON.stringify(limiter.counts())}`, - ); - }, - 5 * 60 * 1_000, // Log every 5 minutes - ); + let lastStateChangeTime = Date.now(); + const states = ['received', 'queued', 'scheduled', 'executing', 'done']; + for (const state of states) { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + limiter.on(state, () => { + lastStateChangeTime = Date.now(); + }); } + const limiterIntervalId = setInterval(() => { + const queueState = JSON.stringify(limiter.counts()); + if (options.logQueueState) { + options.logger?.info( + `${options.logPrefix ? `${options.logPrefix} ` : ''}${queueState}`, + ); + } + if (Date.now() - lastStateChangeTime >= FIVE_MINUTES) { + options.logger?.error( + { queueState }, + 'Queue has been in the same state for more than 5 minutes.', + ); + tasksState.error = new IntegrationError({ + code: 'QUEUE_STATE_CHANGE_TIMEOUT', + message: `Queue has been in the same state for more than 5 minutes.`, + }); + resetLimiter(); + resolveIdlePromise?.(); + } + }, FIVE_MINUTES); try { await fn(limiter, tasksState, async () => { @@ -124,9 +151,7 @@ export async function withConcurrentQueue( } }); } finally { - if (debugLimiterIntervalId) { - clearInterval(debugLimiterIntervalId); - } + clearInterval(limiterIntervalId); limiter.removeAllListeners(); } }