From c96cf2a5feaa7ee125cb6251b9796dd0579fbaee Mon Sep 17 00:00:00 2001 From: Matt Kaschula Date: Mon, 18 Mar 2024 08:09:21 +0000 Subject: [PATCH] add error handling when sequenceId is undefined --- src/Model.test.ts | 129 +++++++++++++++++++++++++++++++++++++++++++++- src/Model.ts | 37 ++++++++++++- 2 files changed, 163 insertions(+), 3 deletions(-) diff --git a/src/Model.test.ts b/src/Model.test.ts index b57af97..ded2036 100644 --- a/src/Model.test.ts +++ b/src/Model.test.ts @@ -3,6 +3,7 @@ import pino from 'pino'; import { Subject } from 'rxjs'; import { it, describe, expect, afterEach, vi, beforeEach } from 'vitest'; +import { StreamDiscontinuityError } from './Errors.js'; import Model from './Model.js'; import { defaultSyncOptions, defaultEventBufferOptions, defaultOptimisticEventOptions } from './Options.js'; import { IStream } from './stream/Stream.js'; @@ -126,6 +127,69 @@ describe('Model', () => { expect([undefined, { current: 'ready', previous: 'syncing', reason: undefined }]).toContain(syncResult); }); + it('it fails sync when sequenceId is undefined and does not retry', async ({ + channelName, + ably, + logger, + }) => { + const configNumRetries = 5; + const sync = vi.fn(async () => ({ data: simpleTestData, sequenceId: undefined })); + const merge = vi.fn(); + const erroredListener = vi.fn(); + + const model = new Model( + 'test', + { sync, merge }, + { + ably, + channelName, + logger, + syncOptions: { ...defaultSyncOptions, retryStrategy: fixedRetryStrategy(10, configNumRetries) }, + optimisticEventOptions: defaultOptimisticEventOptions, + eventBufferOptions: defaultEventBufferOptions, + }, + ); + model.on('errored', erroredListener); + + await model + .sync() + .catch((err) => expect(err.message).toEqual('The sync function returned an undefined sequenceId')); + expect(sync).toHaveBeenCalledOnce(); // sync is not retried + expect(merge).not.toHaveBeenCalled(); + expect(erroredListener).toHaveBeenCalledOnce(); + }); + + it('it fails sync when sequenceId is undefined with no retryable', async ({ + channelName, + ably, + logger, + }) => { + const sync = vi.fn(async () => ({ data: simpleTestData, sequenceId: undefined })); + const merge = vi.fn(); + const erroredListener = vi.fn(); + + const model = new Model( + 'test', + { sync, merge }, + { + ably, + channelName, + logger, + syncOptions: { ...defaultSyncOptions, retryStrategy: () => -1 }, + optimisticEventOptions: defaultOptimisticEventOptions, + eventBufferOptions: defaultEventBufferOptions, + }, + ); + model.on('errored', erroredListener); + + await model + .sync() + .catch((err) => expect(err.message).toEqual('The sync function returned an undefined sequenceId')); + expect(sync).toHaveBeenCalledOnce(); + expect(merge).not.toHaveBeenCalled(); + expect(erroredListener).toHaveBeenCalledOnce(); + }); + it('allows sync to be called manually, with params', async ({ channelName, ably, logger }) => { let completeSync: (...args: any[]) => void = () => { throw new Error('completeSync not defined'); @@ -1543,6 +1607,68 @@ describe('Model', () => { expect(lis).toHaveBeenCalledTimes(0); }); + it('stream message error can handle the a the StreamDiscontinuityError', async ({ + channelName, + ably, + logger, + streams, + }) => { + const events = { channelEvents: new Subject() }; + + const streamDiscontinuityError = new StreamDiscontinuityError('stream error'); + streams.newStream({ channelName }).subscribe = vi.fn(async (callback) => { + events.channelEvents.subscribe(() => callback(streamDiscontinuityError)); + }); + + let syncCalled = 0; + const sync = vi.fn(async () => { + if (syncCalled > 0) { + syncCalled++; + throw new Error('Syncing Error'); + } + + // first call successful + syncCalled++; + return { data: 'data_0', sequenceId: '0' }; + }); + + const merge = vi.fn((_, event) => event.data); + const model = new Model( + 'test', + { sync, merge }, + { + ably, + channelName, + logger, + syncOptions: { ...defaultSyncOptions, retryStrategy: fixedRetryStrategy(1, 1) }, + optimisticEventOptions: defaultOptimisticEventOptions, + eventBufferOptions: defaultEventBufferOptions, + }, + ); + let subscription = new Subject(); + const subscriptionCall = getEventPromises(subscription, 1)[0]; + + const subscriptionListener = vi.fn(() => subscription.next()); + const erroredListener = vi.fn(); + + // initialize and sync + model.on('errored', erroredListener); + await model.subscribe(subscriptionListener); + expect(sync).toHaveBeenCalledOnce(); + + await subscriptionCall; // wait for first event to propagate + expect(subscriptionListener).toHaveBeenCalledTimes(1); + events.channelEvents.next(createMessage(1)); + + await statePromise(model, 'paused'); // pause before attempting to recover from error + await statePromise(model, 'syncing'); // resume() -> resync() to recover from error + await statePromise(model, 'errored'); // resync failed + + expect(merge).toHaveBeenCalledTimes(0); // message fails to process, merge not called + expect(sync).toHaveBeenCalledTimes(2); + expect(erroredListener).toHaveBeenCalledTimes(1); + }); + it('if merge function fails on confirmed event, and subsequent replay fails, set model to errored', async ({ channelName, ably, @@ -1550,7 +1676,6 @@ describe('Model', () => { streams, }) => { const s1 = streams.newStream({ channelName: channelName }); - s1.subscribe = vi.fn(); const events = new Subject(); s1.subscribe = vi.fn(async (callback) => { @@ -1588,7 +1713,7 @@ describe('Model', () => { eventBufferOptions: defaultEventBufferOptions, }, ); - model.sync(); + await model.sync(); expect(syncFn).toHaveReturnedTimes(1); const lis: EventListener = vi.fn(); diff --git a/src/Model.ts b/src/Model.ts index 20f56bd..a53b35d 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -196,6 +196,7 @@ export default class Model extends EventEmitter} params - The parameters to pass to the sync function. * @returns A promise that resolves when the model has successfully re-synchronised its state and is ready to start emitting updates. + * @throws {Error} If there is an error during the resync operation. */ public async sync(...params: Parameters) { this.logger.trace({ ...this.baseLogContext, action: 'sync()', params }); @@ -299,6 +300,8 @@ export default class Model extends EventEmitter} callback - The callback to invoke when the model data changes. * @param {SubscriptionOptions} options - Optional subscription options that can be used to specify whether to subscribe to * optimistic or only confirmed updates. Defaults to optimistic. + * @return A promise that resolves when the subscription has been set up and if in an initialized state successfully synchronised + * @throws {Error} If there is an error during the sync operation. */ public async subscribe(callback: SubscriptionCallback, options: SubscriptionOptions = { optimistic: true }) { if (typeof callback !== 'function') { @@ -367,6 +370,7 @@ export default class Model extends EventEmitter} callback - The callback to unsubscribe. + * @throws {Error} If callback is not a function */ public unsubscribe(callback: SubscriptionCallback) { if (typeof callback !== 'function') { @@ -405,6 +409,10 @@ export default class Model extends EventEmitter extends EventEmitter 0) { try { + if (this.state === 'errored') { + return; + } + await fn(); return; } catch (err) { + if (this.state === 'errored') { + throw err; + } + delay = retries(++i); if (delay < 0) { throw err; @@ -458,7 +478,16 @@ export default class Model extends EventEmitter))); + if (!sequenceId) { + const err = Error('The sync function returned an undefined sequenceId'); + // we set the state to errored here to ensure that this function is not retried by the Model.retryable() + // this avoids a sync function that returns the wrong response structure from being retried. + this.setState('errored', err); + throw err; + } + this.setConfirmedData(data); await this.computeState(this.confirmedData, this.optimisticData, this.optimisticEvents); await this.addStream(sequenceId); @@ -470,6 +499,7 @@ export default class Model extends EventEmitter extends EventEmitter) {