diff --git a/package-lock.json b/package-lock.json index 3914ba1e..9d187e05 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,9 @@ "version": "0.0.1", "license": "ISC", "dependencies": { - "ably": "^1.2.39", + "@types/lodash": "^4.14.195", + "ably": "^1.2.40", + "lodash": "^4.17.21", "rxjs": "^7.8.1" }, "devDependencies": { @@ -637,6 +639,11 @@ "@types/node": "*" } }, + "node_modules/@types/lodash": { + "version": "4.14.195", + "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.14.195.tgz", + "integrity": "sha512-Hwx9EUgdwf2GLarOjQp5ZH8ZmblzcbTBC2wtQWNKARBSxM9ezRIAUpeDTgoQRAFB0+8CNWXVA9+MaSOzOF3nPg==" + }, "node_modules/@types/node": { "version": "18.11.18", "resolved": "https://registry.npmjs.org/@types/node/-/node-18.11.18.tgz", @@ -1037,9 +1044,9 @@ } }, "node_modules/ably": { - "version": "1.2.39", - "resolved": "https://registry.npmjs.org/ably/-/ably-1.2.39.tgz", - "integrity": "sha512-xL57mIHGZsdJlgQv2DqZUFxgvh3QmUZOOZioP4dBbzCu6hgnoTlWHiZT40uwr0D8/OpAyvhidjhoVcqMk3TIAQ==", + "version": "1.2.40", + "resolved": "https://registry.npmjs.org/ably/-/ably-1.2.40.tgz", + "integrity": "sha512-Rmc/IBW9BQeDqaVZdeYD2HMakfzoumHw8Ag3JtqxdC8xcc50k69FDY3Y1t9Mp7nrDQDjDYXvHgiOkVp99jNjHA==", "dependencies": { "@ably/msgpack-js": "^0.4.0", "got": "^11.8.5", @@ -2454,6 +2461,11 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/lodash": { + "version": "4.17.21", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", + "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==" + }, "node_modules/lodash.merge": { "version": "4.6.2", "resolved": "https://registry.npmjs.org/lodash.merge/-/lodash.merge-4.6.2.tgz", @@ -4086,6 +4098,11 @@ "@types/node": "*" } }, + "@types/lodash": { + "version": "4.14.195", + "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.14.195.tgz", + "integrity": "sha512-Hwx9EUgdwf2GLarOjQp5ZH8ZmblzcbTBC2wtQWNKARBSxM9ezRIAUpeDTgoQRAFB0+8CNWXVA9+MaSOzOF3nPg==" + }, "@types/node": { "version": "18.11.18", "resolved": "https://registry.npmjs.org/@types/node/-/node-18.11.18.tgz", @@ -4354,9 +4371,9 @@ } }, "ably": { - "version": "1.2.39", - "resolved": "https://registry.npmjs.org/ably/-/ably-1.2.39.tgz", - "integrity": "sha512-xL57mIHGZsdJlgQv2DqZUFxgvh3QmUZOOZioP4dBbzCu6hgnoTlWHiZT40uwr0D8/OpAyvhidjhoVcqMk3TIAQ==", + "version": "1.2.40", + "resolved": "https://registry.npmjs.org/ably/-/ably-1.2.40.tgz", + "integrity": "sha512-Rmc/IBW9BQeDqaVZdeYD2HMakfzoumHw8Ag3JtqxdC8xcc50k69FDY3Y1t9Mp7nrDQDjDYXvHgiOkVp99jNjHA==", "requires": { "@ably/msgpack-js": "^0.4.0", "got": "^11.8.5", @@ -5411,6 +5428,11 @@ "p-locate": "^5.0.0" } }, + "lodash": { + "version": "4.17.21", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", + "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==" + }, "lodash.merge": { "version": "4.6.2", "resolved": "https://registry.npmjs.org/lodash.merge/-/lodash.merge-4.6.2.tgz", diff --git a/package.json b/package.json index e0046f9e..7b4b0c60 100644 --- a/package.json +++ b/package.json @@ -49,7 +49,9 @@ "vitest": "^0.29.8" }, "dependencies": { - "ably": "^1.2.39", + "@types/lodash": "^4.14.195", + "ably": "^1.2.40", + "lodash": "^4.17.21", "rxjs": "^7.8.1" } } diff --git a/src/Model.test.ts b/src/Model.test.ts index fcfc41c8..fa051de8 100644 --- a/src/Model.test.ts +++ b/src/Model.test.ts @@ -3,8 +3,8 @@ import { Realtime, Types } from 'ably/promises'; import { Subject, lastValueFrom } from 'rxjs'; import { take } from 'rxjs/operators'; -import { createMessage } from './utilities/test/messages'; -import Model, { ModelState, Versioned, Streams } from './Model'; +import { createMessage, customMessage } from './utilities/test/messages'; +import Model, { ModelState, Versioned, Streams, Mutation } from './Model'; import Stream from './Stream'; vi.mock('ably/promises'); @@ -25,6 +25,16 @@ type TestData = { }; }; +const simpleTestData: Versioned = { + version: 1, + data: { + foo: 'foobar', + bar: { + baz: 1, + }, + }, +}; + interface ModelTestContext { streams: Streams; } @@ -32,6 +42,16 @@ interface ModelTestContext { const modelStatePromise = (model: Model, state: ModelState) => new Promise((resolve) => model.whenState(state, model.state, resolve)); +const getNthEventPromise = (subject: Subject, n: number) => lastValueFrom(subject.pipe(take(n))); + +const getEventPromises = (subject: Subject, n: number) => { + const promises: Promise[] = []; + for (let i = 0; i < n; i++) { + promises.push(getNthEventPromise(subject, i + 1)); + } + return promises; +}; + describe('Model', () => { beforeEach((context) => { const client = new Realtime({}); @@ -61,22 +81,12 @@ describe('Model', () => { }); it('enters ready state after sync and subscribed to streams', async ({ streams }) => { - const data: Versioned = { - version: 1, - data: { - foo: 'foobar', - bar: { - baz: 1, - }, - }, - }; - // the promise returned by the subscribe method resolves when we have successfully attached to the channel let completeSync; const synchronised = new Promise((resolve) => (completeSync = resolve)); const sync = vi.fn(async () => { await synchronised; - return data; + return simpleTestData; }); streams.s1.subscribe = vi.fn(); @@ -92,7 +102,8 @@ describe('Model', () => { expect(streams.s1.subscribe).toHaveBeenCalledOnce(); expect(streams.s2.subscribe).toHaveBeenCalledOnce(); expect(sync).toHaveBeenCalledOnce(); - expect(model.data).toEqual(data); + expect(model.speculative).toEqual(simpleTestData.data); + expect(model.confirmed).toEqual(simpleTestData.data); }); it('pauses and resumes the model', async ({ streams }) => { @@ -102,7 +113,7 @@ describe('Model', () => { streams.s2.pause = vi.fn(); streams.s1.resume = vi.fn(); streams.s2.resume = vi.fn(); - const sync = vi.fn(); + const sync = vi.fn(async () => simpleTestData); const model = new Model('test', { streams, sync }); @@ -126,7 +137,7 @@ describe('Model', () => { streams.s2.subscribe = vi.fn(); streams.s1.unsubscribe = vi.fn(); streams.s2.unsubscribe = vi.fn(); - const sync = vi.fn(); + const sync = vi.fn(async () => simpleTestData); const model = new Model('test', { streams, sync }); @@ -144,7 +155,7 @@ describe('Model', () => { it('subscribes to updates', async ({ streams }) => { const data: Versioned = { version: 0, - data: 'foobar', + data: 'data_0', }; // event subjects used to invoke the stream subscription callbacks @@ -163,20 +174,11 @@ describe('Model', () => { const sync = vi.fn(async () => data); // defines initial version of model const model = new Model('test', { streams, sync }); - const update1 = vi.fn(async (state, event) => ({ - version: state.version + 1, - data: event.data, - })); + const update1 = vi.fn(async (state, event) => event.data); model.registerUpdate('s1', 'name_1', update1); - const update2 = vi.fn(async (state, event) => ({ - version: state.version + 1, - data: event.data, - })); + const update2 = vi.fn(async (state, event) => event.data); model.registerUpdate('s2', 'name_2', update2); - const update3 = vi.fn(async (state, event) => ({ - version: state.version + 1, - data: event.data, - })); + const update3 = vi.fn(async (state, event) => event.data); model.registerUpdate('s1', 'name_3', update3); model.registerUpdate('s2', 'name_3', update3); @@ -184,13 +186,15 @@ describe('Model', () => { expect(sync).toHaveBeenCalledOnce(); let subscription = new Subject(); - const subscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => subscription.next()); + const subscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => { + subscription.next(); + }); model.subscribe(subscriptionSpy); - const subscriptionCalled = () => lastValueFrom(subscription.pipe(take(1))); + const subscriptionCalls = getEventPromises(subscription, 4); events.e1.next(createMessage(1)); - await subscriptionCalled(); + await subscriptionCalls[0]; expect(update1).toHaveBeenCalledTimes(1); expect(update2).toHaveBeenCalledTimes(0); expect(update3).toHaveBeenCalledTimes(0); @@ -198,7 +202,7 @@ describe('Model', () => { expect(subscriptionSpy).toHaveBeenNthCalledWith(1, null, 'data_1'); events.e2.next(createMessage(2)); - await subscriptionCalled(); + await subscriptionCalls[1]; expect(update1).toHaveBeenCalledTimes(1); expect(update2).toHaveBeenCalledTimes(1); expect(update3).toHaveBeenCalledTimes(0); @@ -206,7 +210,7 @@ describe('Model', () => { expect(subscriptionSpy).toHaveBeenNthCalledWith(2, null, 'data_2'); events.e1.next(createMessage(3)); - await subscriptionCalled(); + await subscriptionCalls[2]; expect(update1).toHaveBeenCalledTimes(1); expect(update2).toHaveBeenCalledTimes(1); expect(update3).toHaveBeenCalledTimes(1); @@ -214,17 +218,15 @@ describe('Model', () => { expect(subscriptionSpy).toHaveBeenNthCalledWith(3, null, 'data_3'); events.e2.next(createMessage(3)); - await subscriptionCalled(); + await subscriptionCalls[3]; expect(update1).toHaveBeenCalledTimes(1); expect(update2).toHaveBeenCalledTimes(1); expect(update3).toHaveBeenCalledTimes(2); expect(subscriptionSpy).toHaveBeenCalledTimes(4); expect(subscriptionSpy).toHaveBeenNthCalledWith(4, null, 'data_3'); - expect(model.data).toEqual({ - version: 4, - data: 'data_3', - }); + expect(model.speculative).toEqual('data_3'); + expect(model.confirmed).toEqual('data_3'); }); it('executes a registered mutation', async ({ streams }) => { @@ -233,12 +235,14 @@ describe('Model', () => { const model = new Model('test', { streams, sync: async () => ({ version: 1, data: 'foobar' }) }); await modelStatePromise(model, ModelState.READY); - const mutation = vi.fn(); + const mutation: Mutation = { + mutate: vi.fn(async () => ({ result: 'test', expectedEvents: [] })), + }; model.registerMutation('foo', mutation); - expect(mutation).toHaveBeenCalledTimes(0); + expect(mutation.mutate).toHaveBeenCalledTimes(0); await model.mutate<[string, number], void>('foo', 'bar', 123); - expect(mutation).toHaveBeenCalledTimes(1); - expect(mutation).toHaveBeenCalledWith('bar', 123); + expect(mutation.mutate).toHaveBeenCalledTimes(1); + expect(mutation.mutate).toHaveBeenCalledWith('bar', 123); }); it('fails to register a duplicate mutation', async ({ streams }) => { @@ -247,13 +251,293 @@ describe('Model', () => { const model = new Model('test', { streams, sync: async () => ({ version: 1, data: 'foobar' }) }); await modelStatePromise(model, ModelState.READY); - const mutation = vi.fn(); + const mutation: Mutation = { mutate: vi.fn() }; model.registerMutation('foo', mutation); - expect(mutation).toHaveBeenCalledTimes(0); + expect(mutation.mutate).toHaveBeenCalledTimes(0); expect(() => model.registerMutation('foo', mutation)).toThrowError( `mutation with name 'foo' already registered on model 'test'`, ); }); - // TODO disposes of the model on stream failed + it('fails to execute mutation with unregistered stream', async ({ streams }) => { + streams.s1.subscribe = vi.fn(); + streams.s2.subscribe = vi.fn(); + const model = new Model('test', { streams, sync: async () => ({ version: 1, data: 'foobar' }) }); + await modelStatePromise(model, ModelState.READY); + + const mutation: Mutation = { + mutate: vi.fn(async () => ({ result: 'test', expectedEvents: [{ stream: 'unknown', name: 'foo' }] })), + }; + model.registerMutation('foo', mutation); + expect(mutation.mutate).toHaveBeenCalledTimes(0); + await expect(model.mutate('foo')).rejects.toThrow("stream with name 'unknown' not registered on model 'test'"); + }); + + it('updates model state with optimistic event', async ({ streams }) => { + streams.s1.subscribe = vi.fn(); + streams.s2.subscribe = vi.fn(); + const model = new Model('test', { streams, sync: async () => ({ version: 1, data: 'data_0' }) }); + await modelStatePromise(model, ModelState.READY); + + const update1 = vi.fn(async (state, event) => event.data); + model.registerUpdate('s1', 'testEvent', update1); + + const mutation: Mutation = { + mutate: vi.fn(async (stream: string) => ({ + result: 'test', + expectedEvents: [{ stream, name: 'testEvent', data: 'data_1' }], + })), + }; + model.registerMutation('foo', mutation); + + let optimisticSubscription = new Subject(); + const optimisticSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => + optimisticSubscription.next(), + ); + model.subscribe(optimisticSubscriptionSpy); + const optimisticSubscriptionCall = getNthEventPromise(optimisticSubscription, 1); + + let confirmedSubscription = new Subject(); + const confirmedSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => + confirmedSubscription.next(), + ); + model.subscribe(confirmedSubscriptionSpy, { optimistic: false }); + + await model.mutate<[string], void>('foo', 's1'); + + await optimisticSubscriptionCall; + expect(model.speculative).toEqual('data_1'); + expect(model.confirmed).toEqual('data_0'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledOnce(); + expect(optimisticSubscriptionSpy).toHaveBeenCalledWith(null, 'data_1'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(0); + }); + + it('confirms an optimistic event', async ({ streams }) => { + streams.s1.subscribe = vi.fn(); + streams.s2.subscribe = vi.fn(); + + const events = { e1: new Subject() }; + streams.s1.subscribe = vi.fn((callback) => { + events.e1.subscribe((message) => callback(null, message)); + }); + + const model = new Model('test', { streams, sync: async () => ({ version: 1, data: 'data_0' }) }); + await modelStatePromise(model, ModelState.READY); + + const update1 = vi.fn(async (state, event) => event.data); + model.registerUpdate('s1', 'testEvent', update1); + + const mutation: Mutation = { + mutate: vi.fn(async (stream: string) => ({ + result: 'test', + expectedEvents: [{ stream, name: 'testEvent', data: 'data_1' }], + })), + }; + model.registerMutation('foo', mutation); + + let optimisticSubscription = new Subject(); + const optimisticSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => + optimisticSubscription.next(), + ); + model.subscribe(optimisticSubscriptionSpy); + const optimisticSubscriptionCall = getNthEventPromise(optimisticSubscription, 1); + + let confirmedSubscription = new Subject(); + const confirmedSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => + confirmedSubscription.next(), + ); + model.subscribe(confirmedSubscriptionSpy, { optimistic: false }); + const confirmedSubscriptionCall = getNthEventPromise(confirmedSubscription, 1); + + await model.mutate<[string], void>('foo', 's1'); + + await optimisticSubscriptionCall; + expect(model.speculative).toEqual('data_1'); + expect(model.confirmed).toEqual('data_0'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledOnce(); + expect(optimisticSubscriptionSpy).toHaveBeenCalledWith(null, 'data_1'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(0); + + events.e1.next(customMessage('id_1', 'testEvent', 'data_1')); + await confirmedSubscriptionCall; + expect(confirmedSubscriptionSpy).toHaveBeenCalledOnce(); + expect(confirmedSubscriptionSpy).toHaveBeenCalledWith(null, 'data_1'); + expect(model.confirmed).toEqual('data_1'); + }); + + it('confirms optimistic events from multiple streams', async ({ streams }) => { + streams.s1.subscribe = vi.fn(); + streams.s2.subscribe = vi.fn(); + + const events = { + e1: new Subject(), + e2: new Subject(), + }; + streams.s1.subscribe = vi.fn((callback) => { + events.e1.subscribe((message) => callback(null, message)); + }); + streams.s2.subscribe = vi.fn((callback) => { + events.e2.subscribe((message) => callback(null, message)); + }); + + const model = new Model('test', { streams, sync: async () => ({ version: 1, data: '0' }) }); + await modelStatePromise(model, ModelState.READY); + + // Defines an update function which concatenates strings. + // This is a non-commutative operation which let's us inspect the order in + // in which updates are applied to the speculative vs confirmed states. + const update1 = vi.fn(async (state, event) => state + event.data); + model.registerUpdate('s1', 'testEvent', update1); + model.registerUpdate('s2', 'testEvent', update1); + + const mutation: Mutation = { + mutate: vi.fn(async (stream: string, data: string) => ({ + result: 'test', + expectedEvents: [{ stream, name: 'testEvent', data }], + })), + }; + model.registerMutation('foo', mutation); + + let optimisticSubscription = new Subject(); + const optimisticSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => { + optimisticSubscription.next(); + }); + model.subscribe(optimisticSubscriptionSpy); + const optimisticSubscriptionCall = getNthEventPromise(optimisticSubscription, 3); + + let confirmedSubscription = new Subject(); + const confirmedSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => { + confirmedSubscription.next(); + }); + model.subscribe(confirmedSubscriptionSpy, { optimistic: false }); + const confirmedSubscriptionCalls = getEventPromises(confirmedSubscription, 3); + + await model.mutate<[string, string], void>('foo', 's1', '1'); + await model.mutate<[string, string], void>('foo', 's2', '2'); // will be last to be confirmed + await model.mutate<[string, string], void>('foo', 's1', '3'); + + // optimistic updates are applied in the order the mutations were called + await optimisticSubscriptionCall; + expect(model.speculative).toEqual('0123'); + expect(model.confirmed).toEqual('0'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(3); + expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(1, null, '01'); + expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(2, null, '012'); + expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(3, null, '0123'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(0); + + // Optimistic updates must be confirmed in-order only in the context of a single stream, + // so here we confirm s2 in a different order to the order the mutation were optimistically applied, + // and assert that the confirmed state is constructed in the correct order (which differs from the + // order in which the speculative state is constructed). + + // confirm the first expected event + events.e1.next(customMessage('id_1', 'testEvent', '1')); + await confirmedSubscriptionCalls[0]; + expect(model.speculative).toEqual('0123'); + expect(model.confirmed).toEqual('01'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(1); + expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(1, null, '01'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(3); // unchanged + + // confirm the third expected event (second event on the first stream) + events.e1.next(customMessage('id_2', 'testEvent', '3')); + await confirmedSubscriptionCalls[1]; + expect(model.speculative).toEqual('0123'); + expect(model.confirmed).toEqual('013'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(2); + expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(2, null, '013'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(3); // unchanged + + // confirm the second expected event (first event on the second stream) + events.e2.next(customMessage('id_1', 'testEvent', '2')); + await confirmedSubscriptionCalls[2]; + expect(model.speculative).toEqual('0123'); + expect(model.confirmed).toEqual('0132'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(3); + expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(3, null, '0132'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(3); // unchanged + }); + + it('rebases optimistic events on top of confirmed state', async ({ streams }) => { + streams.s1.subscribe = vi.fn(); + streams.s2.subscribe = vi.fn(); + + const events = { e1: new Subject() }; + streams.s1.subscribe = vi.fn((callback) => { + events.e1.subscribe((message) => callback(null, message)); + }); + + const model = new Model('test', { streams, sync: async () => ({ version: 1, data: '0' }) }); + await modelStatePromise(model, ModelState.READY); + + const update1 = vi.fn(async (state, event) => state + event.data); + model.registerUpdate('s1', 'testEvent', update1); + + const mutation: Mutation = { + mutate: vi.fn(async (stream: string, data: string) => ({ + result: 'test', + expectedEvents: [{ stream, name: 'testEvent', data }], + })), + }; + model.registerMutation('foo', mutation); + + let optimisticSubscription = new Subject(); + const optimisticSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => { + optimisticSubscription.next(); + }); + model.subscribe(optimisticSubscriptionSpy); + const optimisticSubscriptionCall = getEventPromises(optimisticSubscription, 3); + + let confirmedSubscription = new Subject(); + const confirmedSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => { + confirmedSubscription.next(); + }); + model.subscribe(confirmedSubscriptionSpy, { optimistic: false }); + const confirmedSubscriptionCalls = getEventPromises(confirmedSubscription, 3); + + await model.mutate<[string, string], void>('foo', 's1', '1'); + await model.mutate<[string, string], void>('foo', 's1', '2'); + + // optimistic updates are applied in the order the mutations were called + await optimisticSubscriptionCall[0]; + await optimisticSubscriptionCall[1]; + expect(model.speculative).toEqual('012'); + expect(model.confirmed).toEqual('0'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(2); + expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(1, null, '01'); + expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(2, null, '012'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(0); + + // confirm the first expected event + events.e1.next(customMessage('id_1', 'testEvent', '1')); + await confirmedSubscriptionCalls[0]; + expect(model.speculative).toEqual('012'); + expect(model.confirmed).toEqual('01'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(1); + expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(1, null, '01'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(2); + + // an event is received which does not have a corresponding expected event, + // and the speculative updates are rebased on top of the incoming event + events.e1.next(customMessage('id_1', 'testEvent', '3')); + await confirmedSubscriptionCalls[1]; + await optimisticSubscriptionCall[2]; + expect(model.speculative).toEqual('0132'); + expect(model.confirmed).toEqual('013'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(2); + expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(2, null, '013'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(3); + expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(3, null, '0132'); + + // confirm the second expected event + events.e1.next(customMessage('id_1', 'testEvent', '2')); + await confirmedSubscriptionCalls[2]; + expect(model.speculative).toEqual('0132'); + expect(model.confirmed).toEqual('0132'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(3); + expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(3, null, '0132'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(3); + }); }); diff --git a/src/Model.ts b/src/Model.ts index eb37cca2..60c1cec4 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -3,6 +3,7 @@ import Stream from './Stream'; import EventEmitter from './utilities/EventEmitter'; import { StandardCallback } from './types/callbacks'; import { Subject, Subscription } from 'rxjs'; +import _ from 'lodash'; export enum ModelState { /** @@ -31,16 +32,31 @@ export enum ModelState { DISPOSED = 'disposed', } +export type Event = { + stream: string; + name: string; + data?: any; +}; + +type Confirmation = { + confirmed: boolean; +}; + export type SyncFunc = () => Promise>; -export type UpdateFunc = (state: T, event: Types.Message) => Promise; +export type UpdateFunc = (state: T, event: Event) => Promise; export type MutationResult = { result: R; - expected: Partial; + expectedEvents: Event[]; }; + export type MutationFunc = (...args: T) => Promise>; +export type Mutation = { + mutate: MutationFunc; +}; + export type Streams = { [name: string]: Stream; }; @@ -67,18 +83,29 @@ export type Versioned = { data: T; }; +function eventsAreEqual(e1: Event, e2: Event): boolean { + return e1.stream === e2.stream && e1.name === e2.name && _.isEqual(e1.data, e2.data); +} + +type SubscriptionOptions = { + optimistic: boolean; +}; + class Model extends EventEmitter> { private currentState: ModelState = ModelState.INITIALIZED; - private currentData: Versioned; + private speculativeData: T; + private confirmedData: T; private sync: SyncFunc; private streams: Streams; - private updateFuncs: UpdateFuncs> = {}; + private updateFuncs: UpdateFuncs = {}; + + private mutations: Record = {}; - private mutations: Record = {}; + private unconfirmed: Event[] = []; - private subscriptions = new Subject(); + private subscriptions = new Subject<{ confirmed: boolean; data: T }>(); private subscriptionMap: Map, Subscription> = new Map(); private streamSubscriptionsMap: Map> = new Map(); @@ -96,8 +123,12 @@ class Model extends EventEmitter> { return this.currentState; } - public get data() { - return this.currentData; + public get speculative() { + return this.speculativeData; + } + + public get confirmed() { + return this.confirmedData; } public async pause() { @@ -121,7 +152,7 @@ class Model extends EventEmitter> { return this.streams[name]; } - public registerUpdate(stream: string, event: string, update: UpdateFunc>) { + public registerUpdate(stream: string, event: string, update: UpdateFunc) { if (!this.streams[stream]) { throw new Error(`stream with name '${stream}' not registered on model '${this.name}'`); } @@ -134,28 +165,39 @@ class Model extends EventEmitter> { this.updateFuncs[stream][event].push(update); } - public registerMutation(name: string, mutation: MutationFunc) { + public registerMutation(name: string, mutation: Mutation) { if (this.mutations[name]) { throw new Error(`mutation with name '${name}' already registered on model '${this.name}'`); } this.mutations[name] = mutation; } - public async mutate(name: string, ...args: TArgs): Promise> { - const mutation = this.mutations[name]; + public async mutate(name: string, ...args: TArgs): Promise { + const mutation: Mutation = this.mutations[name]; if (!mutation) { throw new Error(`mutation with name '${name}' not registered on model '${this.name}'`); } - const result = await mutation(...args); + const { result, expectedEvents } = await mutation.mutate(...args); - // TODO: process the expected result optimistically + for (const event of expectedEvents) { + await this.onStreamEvent(null, { ...event, confirmed: false }); + } return result; } - public subscribe(callback: StandardCallback) { + public subscribe(callback: StandardCallback, options: SubscriptionOptions = { optimistic: true }) { const subscription = this.subscriptions.subscribe({ - next: (message) => callback(null, message), + next: (value) => { + if (options.optimistic && !value.confirmed) { + callback(null, value.data); + return; + } + if (!options.optimistic && value.confirmed) { + callback(null, value.data); + return; + } + }, error: (err) => callback(err), complete: () => this.unsubscribe(callback), }); @@ -194,30 +236,76 @@ class Model extends EventEmitter> { } as ModelStateChange); } - private setData(data: Versioned) { - this.currentData = data; - this.subscriptions.next(data.data); + private setSpeculativeData(data: T) { + this.speculativeData = data; + this.subscriptions.next({ confirmed: false, data }); + } + + private setConfirmedData(data: T) { + this.confirmedData = data; + this.subscriptions.next({ confirmed: true, data }); } - private async onStreamEvent(stream: string, err: Error | null | undefined, event: Types.Message | undefined) { + private async applyUpdates(initialData: T, event: Event): Promise { + let data = initialData; + for (let eventName in this.updateFuncs[event.stream]) { + if (event.name === eventName) { + for (let updator of this.updateFuncs[event.stream][eventName]) { + data = await updator(data, event); + } + } + } + return data; + } + + private async onStreamEvent(err: Error | null | undefined, event?: Event & Confirmation) { if (err) { - this.onError(err); + await this.onError(err); return; } - if (!this.streams[stream]) { - this.onError(new Error(`stream with name '${stream}' not registered on model '${this.name}'`)); + if (!event) { + await this.onError('received empty event'); return; } - if (!this.updateFuncs[stream]) { + if (!this.streams[event.stream]) { + await this.onError(new Error(`stream with name '${event.stream}' not registered on model '${this.name}'`)); return; } - for (let eventName in this.updateFuncs[stream]) { - if (event?.name === eventName) { - for (let updator of this.updateFuncs[stream][eventName]) { - this.setData(await updator(this.currentData, event)); - } + if (!this.updateFuncs[event.stream]) { + return; + } + + // eagerly apply optimistic updates + if (!event.confirmed) { + this.unconfirmed.push(event); + this.setSpeculativeData(await this.applyUpdates(this.speculativeData, event)); + return; + } + + // if the incoming confirmed event confirms the next expected optimistic event for the stream, it is + // discarded without applying it to the speculative state because its effect has already been optimistically applied + let unexpectedEvent = true; + for (let i = 0; i < this.unconfirmed.length; i++) { + let e = this.unconfirmed[i]; + if (eventsAreEqual(e, event)) { + this.unconfirmed.splice(i, 1); + this.setConfirmedData(await this.applyUpdates(this.confirmedData, event)); + unexpectedEvent = false; + break; } } + + // if the incoming confirmed event doesn't match any optimistic event, + // we need to roll back to the last-confirmed state, apply the incoming event, + // and rebase the optimistic updates on top + if (unexpectedEvent) { + let nextData = await this.applyUpdates(this.confirmedData, event); + this.setConfirmedData(nextData); + for (const e of this.unconfirmed) { + nextData = await this.applyUpdates(nextData, e); + } + this.setSpeculativeData(nextData); + } } private async onError(err) { @@ -228,12 +316,19 @@ class Model extends EventEmitter> { this.setState(ModelState.PREPARING); for (let streamName in this.streams) { const stream = this.streams[streamName]; - const callback = (err: Error | null | undefined, event: Types.Message | undefined) => - this.onStreamEvent(streamName, err, event); + const callback: StandardCallback = (err, event) => { + if (err) { + this.onStreamEvent(err); + return; + } + this.onStreamEvent(null, { ...event!, stream: streamName, confirmed: true }); + }; stream.subscribe(callback); this.streamSubscriptionsMap.set(stream, callback); } - this.currentData = await this.sync(); + const { data } = await this.sync(); + this.setSpeculativeData(data); + this.setConfirmedData(data); this.setState(ModelState.READY); } } diff --git a/src/utilities/test/messages.ts b/src/utilities/test/messages.ts index cb9639bf..f7494dab 100644 --- a/src/utilities/test/messages.ts +++ b/src/utilities/test/messages.ts @@ -41,3 +41,7 @@ export function createMessage(i: number): Types.Message { data: `data_${i}`, }; } + +export function customMessage(id: string, name: string, data: string): Types.Message { + return { ...baseMessage, id, name, data }; +}