Skip to content

Commit

Permalink
add error handling when sequenceId is undefined
Browse files Browse the repository at this point in the history
  • Loading branch information
kaschula committed Mar 18, 2024
1 parent f122396 commit c96cf2a
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 3 deletions.
129 changes: 127 additions & 2 deletions src/Model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import pino from 'pino';
import { Subject } from 'rxjs';
import { it, describe, expect, afterEach, vi, beforeEach } from 'vitest';

import { StreamDiscontinuityError } from './Errors.js';
import Model from './Model.js';
import { defaultSyncOptions, defaultEventBufferOptions, defaultOptimisticEventOptions } from './Options.js';
import { IStream } from './stream/Stream.js';
Expand Down Expand Up @@ -126,6 +127,69 @@ describe('Model', () => {
expect([undefined, { current: 'ready', previous: 'syncing', reason: undefined }]).toContain(syncResult);
});

it<ModelTestContext>('it fails sync when sequenceId is undefined and does not retry', async ({
channelName,
ably,
logger,
}) => {
const configNumRetries = 5;
const sync = vi.fn(async () => ({ data: simpleTestData, sequenceId: undefined }));
const merge = vi.fn();
const erroredListener = vi.fn();

const model = new Model(
'test',
{ sync, merge },
{
ably,
channelName,
logger,
syncOptions: { ...defaultSyncOptions, retryStrategy: fixedRetryStrategy(10, configNumRetries) },
optimisticEventOptions: defaultOptimisticEventOptions,
eventBufferOptions: defaultEventBufferOptions,
},
);
model.on('errored', erroredListener);

await model
.sync()
.catch((err) => expect(err.message).toEqual('The sync function returned an undefined sequenceId'));
expect(sync).toHaveBeenCalledOnce(); // sync is not retried
expect(merge).not.toHaveBeenCalled();
expect(erroredListener).toHaveBeenCalledOnce();
});

it<ModelTestContext>('it fails sync when sequenceId is undefined with no retryable', async ({
channelName,
ably,
logger,
}) => {
const sync = vi.fn(async () => ({ data: simpleTestData, sequenceId: undefined }));
const merge = vi.fn();
const erroredListener = vi.fn();

const model = new Model(
'test',
{ sync, merge },
{
ably,
channelName,
logger,
syncOptions: { ...defaultSyncOptions, retryStrategy: () => -1 },
optimisticEventOptions: defaultOptimisticEventOptions,
eventBufferOptions: defaultEventBufferOptions,
},
);
model.on('errored', erroredListener);

await model
.sync()
.catch((err) => expect(err.message).toEqual('The sync function returned an undefined sequenceId'));
expect(sync).toHaveBeenCalledOnce();
expect(merge).not.toHaveBeenCalled();
expect(erroredListener).toHaveBeenCalledOnce();
});

it<ModelTestContext>('allows sync to be called manually, with params', async ({ channelName, ably, logger }) => {
let completeSync: (...args: any[]) => void = () => {
throw new Error('completeSync not defined');
Expand Down Expand Up @@ -1543,14 +1607,75 @@ describe('Model', () => {
expect(lis).toHaveBeenCalledTimes(0);
});

it<ModelTestContext>('stream message error can handle the a the StreamDiscontinuityError', async ({
channelName,
ably,
logger,
streams,
}) => {
const events = { channelEvents: new Subject<Types.Message>() };

const streamDiscontinuityError = new StreamDiscontinuityError('stream error');
streams.newStream({ channelName }).subscribe = vi.fn(async (callback) => {
events.channelEvents.subscribe(() => callback(streamDiscontinuityError));
});

let syncCalled = 0;
const sync = vi.fn(async () => {
if (syncCalled > 0) {
syncCalled++;
throw new Error('Syncing Error');
}

// first call successful
syncCalled++;
return { data: 'data_0', sequenceId: '0' };
});

const merge = vi.fn((_, event) => event.data);
const model = new Model(
'test',
{ sync, merge },
{
ably,
channelName,
logger,
syncOptions: { ...defaultSyncOptions, retryStrategy: fixedRetryStrategy(1, 1) },
optimisticEventOptions: defaultOptimisticEventOptions,
eventBufferOptions: defaultEventBufferOptions,
},
);
let subscription = new Subject<void>();
const subscriptionCall = getEventPromises(subscription, 1)[0];

const subscriptionListener = vi.fn(() => subscription.next());
const erroredListener = vi.fn();

// initialize and sync
model.on('errored', erroredListener);
await model.subscribe(subscriptionListener);
expect(sync).toHaveBeenCalledOnce();

await subscriptionCall; // wait for first event to propagate
expect(subscriptionListener).toHaveBeenCalledTimes(1);
events.channelEvents.next(createMessage(1));

await statePromise(model, 'paused'); // pause before attempting to recover from error
await statePromise(model, 'syncing'); // resume() -> resync() to recover from error
await statePromise(model, 'errored'); // resync failed

expect(merge).toHaveBeenCalledTimes(0); // message fails to process, merge not called
expect(sync).toHaveBeenCalledTimes(2);
expect(erroredListener).toHaveBeenCalledTimes(1);
});

it<ModelTestContext>('if merge function fails on confirmed event, and subsequent replay fails, set model to errored', async ({
channelName,
ably,
logger,
streams,
}) => {
const s1 = streams.newStream({ channelName: channelName });
s1.subscribe = vi.fn();

const events = new Subject<Types.Message>();
s1.subscribe = vi.fn(async (callback) => {
Expand Down Expand Up @@ -1588,7 +1713,7 @@ describe('Model', () => {
eventBufferOptions: defaultEventBufferOptions,
},
);
model.sync();
await model.sync();
expect(syncFn).toHaveReturnedTimes(1);

const lis: EventListener<ModelStateChange> = vi.fn<any, any>();
Expand Down
37 changes: 36 additions & 1 deletion src/Model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ export default class Model<S extends SyncFuncConstraint> extends EventEmitter<Re
* @template S - The type of the sync function that is used to synchronise the model state with your backend.
* @param {Parameters<S>} params - The parameters to pass to the sync function.
* @returns A promise that resolves when the model has successfully re-synchronised its state and is ready to start emitting updates.
* @throws {Error} If there is an error during the resync operation.
*/
public async sync(...params: Parameters<S>) {
this.logger.trace({ ...this.baseLogContext, action: 'sync()', params });
Expand Down Expand Up @@ -299,6 +300,8 @@ export default class Model<S extends SyncFuncConstraint> extends EventEmitter<Re
* @param {SubscriptionCallback<S>} callback - The callback to invoke when the model data changes.
* @param {SubscriptionOptions} options - Optional subscription options that can be used to specify whether to subscribe to
* optimistic or only confirmed updates. Defaults to optimistic.
* @return A promise that resolves when the subscription has been set up and if in an initialized state successfully synchronised
* @throws {Error} If there is an error during the sync operation.
*/
public async subscribe(callback: SubscriptionCallback<S>, options: SubscriptionOptions = { optimistic: true }) {
if (typeof callback !== 'function') {
Expand Down Expand Up @@ -367,6 +370,7 @@ export default class Model<S extends SyncFuncConstraint> extends EventEmitter<Re
* Unsubscribes the given callback to changes to the data.
* @template S - The type of the sync function that is used to synchronise the model state with your backend.
* @param {SubscriptionCallback<S>} callback - The callback to unsubscribe.
* @throws {Error} If callback is not a function
*/
public unsubscribe(callback: SubscriptionCallback<S>) {
if (typeof callback !== 'function') {
Expand Down Expand Up @@ -405,6 +409,10 @@ export default class Model<S extends SyncFuncConstraint> extends EventEmitter<Re
}

protected setState(state: ModelState, reason?: Error) {
if (state === this.currentState) {
return;
}

this.logger.trace({ ...this.baseLogContext, action: 'setState()', state, reason });
const previous = this.currentState;
this.currentState = state;
Expand All @@ -420,15 +428,27 @@ export default class Model<S extends SyncFuncConstraint> extends EventEmitter<Re
let delay = retries(1);

if (delay < 0) {
if (this.state === 'errored') {
return;
}

await fn();
return;
}

while (delay > 0) {
try {
if (this.state === 'errored') {
return;
}

await fn();
return;
} catch (err) {
if (this.state === 'errored') {
throw err;
}

delay = retries(++i);
if (delay < 0) {
throw err;
Expand Down Expand Up @@ -458,7 +478,16 @@ export default class Model<S extends SyncFuncConstraint> extends EventEmitter<Re
lastSyncParams: this.lastSyncParams,
});
this.removeStream();

const { data, sequenceId } = await this.syncFunc(...(this.lastSyncParams || ([] as unknown as Parameters<S>)));
if (!sequenceId) {
const err = Error('The sync function returned an undefined sequenceId');
// we set the state to errored here to ensure that this function is not retried by the Model.retryable()
// this avoids a sync function that returns the wrong response structure from being retried.
this.setState('errored', err);
throw err;
}

this.setConfirmedData(data);
await this.computeState(this.confirmedData, this.optimisticData, this.optimisticEvents);
await this.addStream(sequenceId);
Expand All @@ -470,6 +499,7 @@ export default class Model<S extends SyncFuncConstraint> extends EventEmitter<Re
} catch (err) {
this.logger.error('retries exhausted', { ...this.baseLogContext, action: 'resync()', err });
this.setState('errored', toError(err));

throw err;
}
}
Expand Down Expand Up @@ -573,7 +603,12 @@ export default class Model<S extends SyncFuncConstraint> extends EventEmitter<Re
throw err;
}
};
await this.retryable(fixedRetryStrategy(delay), fn);

try {
await this.retryable(fixedRetryStrategy(delay), fn);
} catch (err) {
this.logger.warn('failed to resume after error', { ...this.baseLogContext, action: 'handleErrorResume', err });
}
}

private setOptimisticData(data: ExtractData<S>) {
Expand Down

0 comments on commit c96cf2a

Please sign in to comment.