From a4ad1ffbff59da39574f1cbe3a4e3cd0d549c0d6 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Thu, 22 Jun 2023 14:22:46 +0100 Subject: [PATCH] model: maintain speculative and confirmed state Speculative state is computed eagerly by optimistically applying the expected events returned from mutations when they are executed. Confirmed state is computed from change events consumed from the backend via the model's input streams. Implements logic to rebase speculative updates on top of confirmed state as events arrive. --- package-lock.json | 36 +++- package.json | 4 +- src/Model.test.ts | 376 +++++++++++++++++++++++++++++---- src/Model.ts | 159 +++++++++++--- src/utilities/test/messages.ts | 4 + 5 files changed, 493 insertions(+), 86 deletions(-) diff --git a/package-lock.json b/package-lock.json index 3914ba1e..9d187e05 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,9 @@ "version": "0.0.1", "license": "ISC", "dependencies": { - "ably": "^1.2.39", + "@types/lodash": "^4.14.195", + "ably": "^1.2.40", + "lodash": "^4.17.21", "rxjs": "^7.8.1" }, "devDependencies": { @@ -637,6 +639,11 @@ "@types/node": "*" } }, + "node_modules/@types/lodash": { + "version": "4.14.195", + "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.14.195.tgz", + "integrity": "sha512-Hwx9EUgdwf2GLarOjQp5ZH8ZmblzcbTBC2wtQWNKARBSxM9ezRIAUpeDTgoQRAFB0+8CNWXVA9+MaSOzOF3nPg==" + }, "node_modules/@types/node": { "version": "18.11.18", "resolved": "https://registry.npmjs.org/@types/node/-/node-18.11.18.tgz", @@ -1037,9 +1044,9 @@ } }, "node_modules/ably": { - "version": "1.2.39", - "resolved": "https://registry.npmjs.org/ably/-/ably-1.2.39.tgz", - "integrity": "sha512-xL57mIHGZsdJlgQv2DqZUFxgvh3QmUZOOZioP4dBbzCu6hgnoTlWHiZT40uwr0D8/OpAyvhidjhoVcqMk3TIAQ==", + "version": "1.2.40", + "resolved": "https://registry.npmjs.org/ably/-/ably-1.2.40.tgz", + "integrity": "sha512-Rmc/IBW9BQeDqaVZdeYD2HMakfzoumHw8Ag3JtqxdC8xcc50k69FDY3Y1t9Mp7nrDQDjDYXvHgiOkVp99jNjHA==", "dependencies": { "@ably/msgpack-js": "^0.4.0", "got": "^11.8.5", @@ -2454,6 +2461,11 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/lodash": { + "version": "4.17.21", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", + "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==" + }, "node_modules/lodash.merge": { "version": "4.6.2", "resolved": "https://registry.npmjs.org/lodash.merge/-/lodash.merge-4.6.2.tgz", @@ -4086,6 +4098,11 @@ "@types/node": "*" } }, + "@types/lodash": { + "version": "4.14.195", + "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.14.195.tgz", + "integrity": "sha512-Hwx9EUgdwf2GLarOjQp5ZH8ZmblzcbTBC2wtQWNKARBSxM9ezRIAUpeDTgoQRAFB0+8CNWXVA9+MaSOzOF3nPg==" + }, "@types/node": { "version": "18.11.18", "resolved": "https://registry.npmjs.org/@types/node/-/node-18.11.18.tgz", @@ -4354,9 +4371,9 @@ } }, "ably": { - "version": "1.2.39", - "resolved": "https://registry.npmjs.org/ably/-/ably-1.2.39.tgz", - "integrity": "sha512-xL57mIHGZsdJlgQv2DqZUFxgvh3QmUZOOZioP4dBbzCu6hgnoTlWHiZT40uwr0D8/OpAyvhidjhoVcqMk3TIAQ==", + "version": "1.2.40", + "resolved": "https://registry.npmjs.org/ably/-/ably-1.2.40.tgz", + "integrity": "sha512-Rmc/IBW9BQeDqaVZdeYD2HMakfzoumHw8Ag3JtqxdC8xcc50k69FDY3Y1t9Mp7nrDQDjDYXvHgiOkVp99jNjHA==", "requires": { "@ably/msgpack-js": "^0.4.0", "got": "^11.8.5", @@ -5411,6 +5428,11 @@ "p-locate": "^5.0.0" } }, + "lodash": { + "version": "4.17.21", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", + "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==" + }, "lodash.merge": { "version": "4.6.2", "resolved": "https://registry.npmjs.org/lodash.merge/-/lodash.merge-4.6.2.tgz", diff --git a/package.json b/package.json index e0046f9e..7b4b0c60 100644 --- a/package.json +++ b/package.json @@ -49,7 +49,9 @@ "vitest": "^0.29.8" }, "dependencies": { - "ably": "^1.2.39", + "@types/lodash": "^4.14.195", + "ably": "^1.2.40", + "lodash": "^4.17.21", "rxjs": "^7.8.1" } } diff --git a/src/Model.test.ts b/src/Model.test.ts index fcfc41c8..fa051de8 100644 --- a/src/Model.test.ts +++ b/src/Model.test.ts @@ -3,8 +3,8 @@ import { Realtime, Types } from 'ably/promises'; import { Subject, lastValueFrom } from 'rxjs'; import { take } from 'rxjs/operators'; -import { createMessage } from './utilities/test/messages'; -import Model, { ModelState, Versioned, Streams } from './Model'; +import { createMessage, customMessage } from './utilities/test/messages'; +import Model, { ModelState, Versioned, Streams, Mutation } from './Model'; import Stream from './Stream'; vi.mock('ably/promises'); @@ -25,6 +25,16 @@ type TestData = { }; }; +const simpleTestData: Versioned = { + version: 1, + data: { + foo: 'foobar', + bar: { + baz: 1, + }, + }, +}; + interface ModelTestContext { streams: Streams; } @@ -32,6 +42,16 @@ interface ModelTestContext { const modelStatePromise = (model: Model, state: ModelState) => new Promise((resolve) => model.whenState(state, model.state, resolve)); +const getNthEventPromise = (subject: Subject, n: number) => lastValueFrom(subject.pipe(take(n))); + +const getEventPromises = (subject: Subject, n: number) => { + const promises: Promise[] = []; + for (let i = 0; i < n; i++) { + promises.push(getNthEventPromise(subject, i + 1)); + } + return promises; +}; + describe('Model', () => { beforeEach((context) => { const client = new Realtime({}); @@ -61,22 +81,12 @@ describe('Model', () => { }); it('enters ready state after sync and subscribed to streams', async ({ streams }) => { - const data: Versioned = { - version: 1, - data: { - foo: 'foobar', - bar: { - baz: 1, - }, - }, - }; - // the promise returned by the subscribe method resolves when we have successfully attached to the channel let completeSync; const synchronised = new Promise((resolve) => (completeSync = resolve)); const sync = vi.fn(async () => { await synchronised; - return data; + return simpleTestData; }); streams.s1.subscribe = vi.fn(); @@ -92,7 +102,8 @@ describe('Model', () => { expect(streams.s1.subscribe).toHaveBeenCalledOnce(); expect(streams.s2.subscribe).toHaveBeenCalledOnce(); expect(sync).toHaveBeenCalledOnce(); - expect(model.data).toEqual(data); + expect(model.speculative).toEqual(simpleTestData.data); + expect(model.confirmed).toEqual(simpleTestData.data); }); it('pauses and resumes the model', async ({ streams }) => { @@ -102,7 +113,7 @@ describe('Model', () => { streams.s2.pause = vi.fn(); streams.s1.resume = vi.fn(); streams.s2.resume = vi.fn(); - const sync = vi.fn(); + const sync = vi.fn(async () => simpleTestData); const model = new Model('test', { streams, sync }); @@ -126,7 +137,7 @@ describe('Model', () => { streams.s2.subscribe = vi.fn(); streams.s1.unsubscribe = vi.fn(); streams.s2.unsubscribe = vi.fn(); - const sync = vi.fn(); + const sync = vi.fn(async () => simpleTestData); const model = new Model('test', { streams, sync }); @@ -144,7 +155,7 @@ describe('Model', () => { it('subscribes to updates', async ({ streams }) => { const data: Versioned = { version: 0, - data: 'foobar', + data: 'data_0', }; // event subjects used to invoke the stream subscription callbacks @@ -163,20 +174,11 @@ describe('Model', () => { const sync = vi.fn(async () => data); // defines initial version of model const model = new Model('test', { streams, sync }); - const update1 = vi.fn(async (state, event) => ({ - version: state.version + 1, - data: event.data, - })); + const update1 = vi.fn(async (state, event) => event.data); model.registerUpdate('s1', 'name_1', update1); - const update2 = vi.fn(async (state, event) => ({ - version: state.version + 1, - data: event.data, - })); + const update2 = vi.fn(async (state, event) => event.data); model.registerUpdate('s2', 'name_2', update2); - const update3 = vi.fn(async (state, event) => ({ - version: state.version + 1, - data: event.data, - })); + const update3 = vi.fn(async (state, event) => event.data); model.registerUpdate('s1', 'name_3', update3); model.registerUpdate('s2', 'name_3', update3); @@ -184,13 +186,15 @@ describe('Model', () => { expect(sync).toHaveBeenCalledOnce(); let subscription = new Subject(); - const subscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => subscription.next()); + const subscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => { + subscription.next(); + }); model.subscribe(subscriptionSpy); - const subscriptionCalled = () => lastValueFrom(subscription.pipe(take(1))); + const subscriptionCalls = getEventPromises(subscription, 4); events.e1.next(createMessage(1)); - await subscriptionCalled(); + await subscriptionCalls[0]; expect(update1).toHaveBeenCalledTimes(1); expect(update2).toHaveBeenCalledTimes(0); expect(update3).toHaveBeenCalledTimes(0); @@ -198,7 +202,7 @@ describe('Model', () => { expect(subscriptionSpy).toHaveBeenNthCalledWith(1, null, 'data_1'); events.e2.next(createMessage(2)); - await subscriptionCalled(); + await subscriptionCalls[1]; expect(update1).toHaveBeenCalledTimes(1); expect(update2).toHaveBeenCalledTimes(1); expect(update3).toHaveBeenCalledTimes(0); @@ -206,7 +210,7 @@ describe('Model', () => { expect(subscriptionSpy).toHaveBeenNthCalledWith(2, null, 'data_2'); events.e1.next(createMessage(3)); - await subscriptionCalled(); + await subscriptionCalls[2]; expect(update1).toHaveBeenCalledTimes(1); expect(update2).toHaveBeenCalledTimes(1); expect(update3).toHaveBeenCalledTimes(1); @@ -214,17 +218,15 @@ describe('Model', () => { expect(subscriptionSpy).toHaveBeenNthCalledWith(3, null, 'data_3'); events.e2.next(createMessage(3)); - await subscriptionCalled(); + await subscriptionCalls[3]; expect(update1).toHaveBeenCalledTimes(1); expect(update2).toHaveBeenCalledTimes(1); expect(update3).toHaveBeenCalledTimes(2); expect(subscriptionSpy).toHaveBeenCalledTimes(4); expect(subscriptionSpy).toHaveBeenNthCalledWith(4, null, 'data_3'); - expect(model.data).toEqual({ - version: 4, - data: 'data_3', - }); + expect(model.speculative).toEqual('data_3'); + expect(model.confirmed).toEqual('data_3'); }); it('executes a registered mutation', async ({ streams }) => { @@ -233,12 +235,14 @@ describe('Model', () => { const model = new Model('test', { streams, sync: async () => ({ version: 1, data: 'foobar' }) }); await modelStatePromise(model, ModelState.READY); - const mutation = vi.fn(); + const mutation: Mutation = { + mutate: vi.fn(async () => ({ result: 'test', expectedEvents: [] })), + }; model.registerMutation('foo', mutation); - expect(mutation).toHaveBeenCalledTimes(0); + expect(mutation.mutate).toHaveBeenCalledTimes(0); await model.mutate<[string, number], void>('foo', 'bar', 123); - expect(mutation).toHaveBeenCalledTimes(1); - expect(mutation).toHaveBeenCalledWith('bar', 123); + expect(mutation.mutate).toHaveBeenCalledTimes(1); + expect(mutation.mutate).toHaveBeenCalledWith('bar', 123); }); it('fails to register a duplicate mutation', async ({ streams }) => { @@ -247,13 +251,293 @@ describe('Model', () => { const model = new Model('test', { streams, sync: async () => ({ version: 1, data: 'foobar' }) }); await modelStatePromise(model, ModelState.READY); - const mutation = vi.fn(); + const mutation: Mutation = { mutate: vi.fn() }; model.registerMutation('foo', mutation); - expect(mutation).toHaveBeenCalledTimes(0); + expect(mutation.mutate).toHaveBeenCalledTimes(0); expect(() => model.registerMutation('foo', mutation)).toThrowError( `mutation with name 'foo' already registered on model 'test'`, ); }); - // TODO disposes of the model on stream failed + it('fails to execute mutation with unregistered stream', async ({ streams }) => { + streams.s1.subscribe = vi.fn(); + streams.s2.subscribe = vi.fn(); + const model = new Model('test', { streams, sync: async () => ({ version: 1, data: 'foobar' }) }); + await modelStatePromise(model, ModelState.READY); + + const mutation: Mutation = { + mutate: vi.fn(async () => ({ result: 'test', expectedEvents: [{ stream: 'unknown', name: 'foo' }] })), + }; + model.registerMutation('foo', mutation); + expect(mutation.mutate).toHaveBeenCalledTimes(0); + await expect(model.mutate('foo')).rejects.toThrow("stream with name 'unknown' not registered on model 'test'"); + }); + + it('updates model state with optimistic event', async ({ streams }) => { + streams.s1.subscribe = vi.fn(); + streams.s2.subscribe = vi.fn(); + const model = new Model('test', { streams, sync: async () => ({ version: 1, data: 'data_0' }) }); + await modelStatePromise(model, ModelState.READY); + + const update1 = vi.fn(async (state, event) => event.data); + model.registerUpdate('s1', 'testEvent', update1); + + const mutation: Mutation = { + mutate: vi.fn(async (stream: string) => ({ + result: 'test', + expectedEvents: [{ stream, name: 'testEvent', data: 'data_1' }], + })), + }; + model.registerMutation('foo', mutation); + + let optimisticSubscription = new Subject(); + const optimisticSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => + optimisticSubscription.next(), + ); + model.subscribe(optimisticSubscriptionSpy); + const optimisticSubscriptionCall = getNthEventPromise(optimisticSubscription, 1); + + let confirmedSubscription = new Subject(); + const confirmedSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => + confirmedSubscription.next(), + ); + model.subscribe(confirmedSubscriptionSpy, { optimistic: false }); + + await model.mutate<[string], void>('foo', 's1'); + + await optimisticSubscriptionCall; + expect(model.speculative).toEqual('data_1'); + expect(model.confirmed).toEqual('data_0'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledOnce(); + expect(optimisticSubscriptionSpy).toHaveBeenCalledWith(null, 'data_1'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(0); + }); + + it('confirms an optimistic event', async ({ streams }) => { + streams.s1.subscribe = vi.fn(); + streams.s2.subscribe = vi.fn(); + + const events = { e1: new Subject() }; + streams.s1.subscribe = vi.fn((callback) => { + events.e1.subscribe((message) => callback(null, message)); + }); + + const model = new Model('test', { streams, sync: async () => ({ version: 1, data: 'data_0' }) }); + await modelStatePromise(model, ModelState.READY); + + const update1 = vi.fn(async (state, event) => event.data); + model.registerUpdate('s1', 'testEvent', update1); + + const mutation: Mutation = { + mutate: vi.fn(async (stream: string) => ({ + result: 'test', + expectedEvents: [{ stream, name: 'testEvent', data: 'data_1' }], + })), + }; + model.registerMutation('foo', mutation); + + let optimisticSubscription = new Subject(); + const optimisticSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => + optimisticSubscription.next(), + ); + model.subscribe(optimisticSubscriptionSpy); + const optimisticSubscriptionCall = getNthEventPromise(optimisticSubscription, 1); + + let confirmedSubscription = new Subject(); + const confirmedSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => + confirmedSubscription.next(), + ); + model.subscribe(confirmedSubscriptionSpy, { optimistic: false }); + const confirmedSubscriptionCall = getNthEventPromise(confirmedSubscription, 1); + + await model.mutate<[string], void>('foo', 's1'); + + await optimisticSubscriptionCall; + expect(model.speculative).toEqual('data_1'); + expect(model.confirmed).toEqual('data_0'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledOnce(); + expect(optimisticSubscriptionSpy).toHaveBeenCalledWith(null, 'data_1'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(0); + + events.e1.next(customMessage('id_1', 'testEvent', 'data_1')); + await confirmedSubscriptionCall; + expect(confirmedSubscriptionSpy).toHaveBeenCalledOnce(); + expect(confirmedSubscriptionSpy).toHaveBeenCalledWith(null, 'data_1'); + expect(model.confirmed).toEqual('data_1'); + }); + + it('confirms optimistic events from multiple streams', async ({ streams }) => { + streams.s1.subscribe = vi.fn(); + streams.s2.subscribe = vi.fn(); + + const events = { + e1: new Subject(), + e2: new Subject(), + }; + streams.s1.subscribe = vi.fn((callback) => { + events.e1.subscribe((message) => callback(null, message)); + }); + streams.s2.subscribe = vi.fn((callback) => { + events.e2.subscribe((message) => callback(null, message)); + }); + + const model = new Model('test', { streams, sync: async () => ({ version: 1, data: '0' }) }); + await modelStatePromise(model, ModelState.READY); + + // Defines an update function which concatenates strings. + // This is a non-commutative operation which let's us inspect the order in + // in which updates are applied to the speculative vs confirmed states. + const update1 = vi.fn(async (state, event) => state + event.data); + model.registerUpdate('s1', 'testEvent', update1); + model.registerUpdate('s2', 'testEvent', update1); + + const mutation: Mutation = { + mutate: vi.fn(async (stream: string, data: string) => ({ + result: 'test', + expectedEvents: [{ stream, name: 'testEvent', data }], + })), + }; + model.registerMutation('foo', mutation); + + let optimisticSubscription = new Subject(); + const optimisticSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => { + optimisticSubscription.next(); + }); + model.subscribe(optimisticSubscriptionSpy); + const optimisticSubscriptionCall = getNthEventPromise(optimisticSubscription, 3); + + let confirmedSubscription = new Subject(); + const confirmedSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => { + confirmedSubscription.next(); + }); + model.subscribe(confirmedSubscriptionSpy, { optimistic: false }); + const confirmedSubscriptionCalls = getEventPromises(confirmedSubscription, 3); + + await model.mutate<[string, string], void>('foo', 's1', '1'); + await model.mutate<[string, string], void>('foo', 's2', '2'); // will be last to be confirmed + await model.mutate<[string, string], void>('foo', 's1', '3'); + + // optimistic updates are applied in the order the mutations were called + await optimisticSubscriptionCall; + expect(model.speculative).toEqual('0123'); + expect(model.confirmed).toEqual('0'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(3); + expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(1, null, '01'); + expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(2, null, '012'); + expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(3, null, '0123'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(0); + + // Optimistic updates must be confirmed in-order only in the context of a single stream, + // so here we confirm s2 in a different order to the order the mutation were optimistically applied, + // and assert that the confirmed state is constructed in the correct order (which differs from the + // order in which the speculative state is constructed). + + // confirm the first expected event + events.e1.next(customMessage('id_1', 'testEvent', '1')); + await confirmedSubscriptionCalls[0]; + expect(model.speculative).toEqual('0123'); + expect(model.confirmed).toEqual('01'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(1); + expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(1, null, '01'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(3); // unchanged + + // confirm the third expected event (second event on the first stream) + events.e1.next(customMessage('id_2', 'testEvent', '3')); + await confirmedSubscriptionCalls[1]; + expect(model.speculative).toEqual('0123'); + expect(model.confirmed).toEqual('013'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(2); + expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(2, null, '013'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(3); // unchanged + + // confirm the second expected event (first event on the second stream) + events.e2.next(customMessage('id_1', 'testEvent', '2')); + await confirmedSubscriptionCalls[2]; + expect(model.speculative).toEqual('0123'); + expect(model.confirmed).toEqual('0132'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(3); + expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(3, null, '0132'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(3); // unchanged + }); + + it('rebases optimistic events on top of confirmed state', async ({ streams }) => { + streams.s1.subscribe = vi.fn(); + streams.s2.subscribe = vi.fn(); + + const events = { e1: new Subject() }; + streams.s1.subscribe = vi.fn((callback) => { + events.e1.subscribe((message) => callback(null, message)); + }); + + const model = new Model('test', { streams, sync: async () => ({ version: 1, data: '0' }) }); + await modelStatePromise(model, ModelState.READY); + + const update1 = vi.fn(async (state, event) => state + event.data); + model.registerUpdate('s1', 'testEvent', update1); + + const mutation: Mutation = { + mutate: vi.fn(async (stream: string, data: string) => ({ + result: 'test', + expectedEvents: [{ stream, name: 'testEvent', data }], + })), + }; + model.registerMutation('foo', mutation); + + let optimisticSubscription = new Subject(); + const optimisticSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => { + optimisticSubscription.next(); + }); + model.subscribe(optimisticSubscriptionSpy); + const optimisticSubscriptionCall = getEventPromises(optimisticSubscription, 3); + + let confirmedSubscription = new Subject(); + const confirmedSubscriptionSpy = vi.fn<[Error | null | undefined, string | undefined]>(() => { + confirmedSubscription.next(); + }); + model.subscribe(confirmedSubscriptionSpy, { optimistic: false }); + const confirmedSubscriptionCalls = getEventPromises(confirmedSubscription, 3); + + await model.mutate<[string, string], void>('foo', 's1', '1'); + await model.mutate<[string, string], void>('foo', 's1', '2'); + + // optimistic updates are applied in the order the mutations were called + await optimisticSubscriptionCall[0]; + await optimisticSubscriptionCall[1]; + expect(model.speculative).toEqual('012'); + expect(model.confirmed).toEqual('0'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(2); + expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(1, null, '01'); + expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(2, null, '012'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(0); + + // confirm the first expected event + events.e1.next(customMessage('id_1', 'testEvent', '1')); + await confirmedSubscriptionCalls[0]; + expect(model.speculative).toEqual('012'); + expect(model.confirmed).toEqual('01'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(1); + expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(1, null, '01'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(2); + + // an event is received which does not have a corresponding expected event, + // and the speculative updates are rebased on top of the incoming event + events.e1.next(customMessage('id_1', 'testEvent', '3')); + await confirmedSubscriptionCalls[1]; + await optimisticSubscriptionCall[2]; + expect(model.speculative).toEqual('0132'); + expect(model.confirmed).toEqual('013'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(2); + expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(2, null, '013'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(3); + expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(3, null, '0132'); + + // confirm the second expected event + events.e1.next(customMessage('id_1', 'testEvent', '2')); + await confirmedSubscriptionCalls[2]; + expect(model.speculative).toEqual('0132'); + expect(model.confirmed).toEqual('0132'); + expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(3); + expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(3, null, '0132'); + expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(3); + }); }); diff --git a/src/Model.ts b/src/Model.ts index eb37cca2..60c1cec4 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -3,6 +3,7 @@ import Stream from './Stream'; import EventEmitter from './utilities/EventEmitter'; import { StandardCallback } from './types/callbacks'; import { Subject, Subscription } from 'rxjs'; +import _ from 'lodash'; export enum ModelState { /** @@ -31,16 +32,31 @@ export enum ModelState { DISPOSED = 'disposed', } +export type Event = { + stream: string; + name: string; + data?: any; +}; + +type Confirmation = { + confirmed: boolean; +}; + export type SyncFunc = () => Promise>; -export type UpdateFunc = (state: T, event: Types.Message) => Promise; +export type UpdateFunc = (state: T, event: Event) => Promise; export type MutationResult = { result: R; - expected: Partial; + expectedEvents: Event[]; }; + export type MutationFunc = (...args: T) => Promise>; +export type Mutation = { + mutate: MutationFunc; +}; + export type Streams = { [name: string]: Stream; }; @@ -67,18 +83,29 @@ export type Versioned = { data: T; }; +function eventsAreEqual(e1: Event, e2: Event): boolean { + return e1.stream === e2.stream && e1.name === e2.name && _.isEqual(e1.data, e2.data); +} + +type SubscriptionOptions = { + optimistic: boolean; +}; + class Model extends EventEmitter> { private currentState: ModelState = ModelState.INITIALIZED; - private currentData: Versioned; + private speculativeData: T; + private confirmedData: T; private sync: SyncFunc; private streams: Streams; - private updateFuncs: UpdateFuncs> = {}; + private updateFuncs: UpdateFuncs = {}; + + private mutations: Record = {}; - private mutations: Record = {}; + private unconfirmed: Event[] = []; - private subscriptions = new Subject(); + private subscriptions = new Subject<{ confirmed: boolean; data: T }>(); private subscriptionMap: Map, Subscription> = new Map(); private streamSubscriptionsMap: Map> = new Map(); @@ -96,8 +123,12 @@ class Model extends EventEmitter> { return this.currentState; } - public get data() { - return this.currentData; + public get speculative() { + return this.speculativeData; + } + + public get confirmed() { + return this.confirmedData; } public async pause() { @@ -121,7 +152,7 @@ class Model extends EventEmitter> { return this.streams[name]; } - public registerUpdate(stream: string, event: string, update: UpdateFunc>) { + public registerUpdate(stream: string, event: string, update: UpdateFunc) { if (!this.streams[stream]) { throw new Error(`stream with name '${stream}' not registered on model '${this.name}'`); } @@ -134,28 +165,39 @@ class Model extends EventEmitter> { this.updateFuncs[stream][event].push(update); } - public registerMutation(name: string, mutation: MutationFunc) { + public registerMutation(name: string, mutation: Mutation) { if (this.mutations[name]) { throw new Error(`mutation with name '${name}' already registered on model '${this.name}'`); } this.mutations[name] = mutation; } - public async mutate(name: string, ...args: TArgs): Promise> { - const mutation = this.mutations[name]; + public async mutate(name: string, ...args: TArgs): Promise { + const mutation: Mutation = this.mutations[name]; if (!mutation) { throw new Error(`mutation with name '${name}' not registered on model '${this.name}'`); } - const result = await mutation(...args); + const { result, expectedEvents } = await mutation.mutate(...args); - // TODO: process the expected result optimistically + for (const event of expectedEvents) { + await this.onStreamEvent(null, { ...event, confirmed: false }); + } return result; } - public subscribe(callback: StandardCallback) { + public subscribe(callback: StandardCallback, options: SubscriptionOptions = { optimistic: true }) { const subscription = this.subscriptions.subscribe({ - next: (message) => callback(null, message), + next: (value) => { + if (options.optimistic && !value.confirmed) { + callback(null, value.data); + return; + } + if (!options.optimistic && value.confirmed) { + callback(null, value.data); + return; + } + }, error: (err) => callback(err), complete: () => this.unsubscribe(callback), }); @@ -194,30 +236,76 @@ class Model extends EventEmitter> { } as ModelStateChange); } - private setData(data: Versioned) { - this.currentData = data; - this.subscriptions.next(data.data); + private setSpeculativeData(data: T) { + this.speculativeData = data; + this.subscriptions.next({ confirmed: false, data }); + } + + private setConfirmedData(data: T) { + this.confirmedData = data; + this.subscriptions.next({ confirmed: true, data }); } - private async onStreamEvent(stream: string, err: Error | null | undefined, event: Types.Message | undefined) { + private async applyUpdates(initialData: T, event: Event): Promise { + let data = initialData; + for (let eventName in this.updateFuncs[event.stream]) { + if (event.name === eventName) { + for (let updator of this.updateFuncs[event.stream][eventName]) { + data = await updator(data, event); + } + } + } + return data; + } + + private async onStreamEvent(err: Error | null | undefined, event?: Event & Confirmation) { if (err) { - this.onError(err); + await this.onError(err); return; } - if (!this.streams[stream]) { - this.onError(new Error(`stream with name '${stream}' not registered on model '${this.name}'`)); + if (!event) { + await this.onError('received empty event'); return; } - if (!this.updateFuncs[stream]) { + if (!this.streams[event.stream]) { + await this.onError(new Error(`stream with name '${event.stream}' not registered on model '${this.name}'`)); return; } - for (let eventName in this.updateFuncs[stream]) { - if (event?.name === eventName) { - for (let updator of this.updateFuncs[stream][eventName]) { - this.setData(await updator(this.currentData, event)); - } + if (!this.updateFuncs[event.stream]) { + return; + } + + // eagerly apply optimistic updates + if (!event.confirmed) { + this.unconfirmed.push(event); + this.setSpeculativeData(await this.applyUpdates(this.speculativeData, event)); + return; + } + + // if the incoming confirmed event confirms the next expected optimistic event for the stream, it is + // discarded without applying it to the speculative state because its effect has already been optimistically applied + let unexpectedEvent = true; + for (let i = 0; i < this.unconfirmed.length; i++) { + let e = this.unconfirmed[i]; + if (eventsAreEqual(e, event)) { + this.unconfirmed.splice(i, 1); + this.setConfirmedData(await this.applyUpdates(this.confirmedData, event)); + unexpectedEvent = false; + break; } } + + // if the incoming confirmed event doesn't match any optimistic event, + // we need to roll back to the last-confirmed state, apply the incoming event, + // and rebase the optimistic updates on top + if (unexpectedEvent) { + let nextData = await this.applyUpdates(this.confirmedData, event); + this.setConfirmedData(nextData); + for (const e of this.unconfirmed) { + nextData = await this.applyUpdates(nextData, e); + } + this.setSpeculativeData(nextData); + } } private async onError(err) { @@ -228,12 +316,19 @@ class Model extends EventEmitter> { this.setState(ModelState.PREPARING); for (let streamName in this.streams) { const stream = this.streams[streamName]; - const callback = (err: Error | null | undefined, event: Types.Message | undefined) => - this.onStreamEvent(streamName, err, event); + const callback: StandardCallback = (err, event) => { + if (err) { + this.onStreamEvent(err); + return; + } + this.onStreamEvent(null, { ...event!, stream: streamName, confirmed: true }); + }; stream.subscribe(callback); this.streamSubscriptionsMap.set(stream, callback); } - this.currentData = await this.sync(); + const { data } = await this.sync(); + this.setSpeculativeData(data); + this.setConfirmedData(data); this.setState(ModelState.READY); } } diff --git a/src/utilities/test/messages.ts b/src/utilities/test/messages.ts index cb9639bf..f7494dab 100644 --- a/src/utilities/test/messages.ts +++ b/src/utilities/test/messages.ts @@ -41,3 +41,7 @@ export function createMessage(i: number): Types.Message { data: `data_${i}`, }; } + +export function customMessage(id: string, name: string, data: string): Types.Message { + return { ...baseMessage, id, name, data }; +}