From a90688f44d614be49f728137013ff95bba28bd0a Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Wed, 31 May 2023 15:24:25 +0100 Subject: [PATCH 1/2] models: add Model and EventStream classes --- src/EventStream.test.ts | 37 +++++++++++++++++++++ src/EventStream.ts | 31 ++++++++++++++++++ src/Model.test.ts | 50 +++++++++++++++++++++++++++++ src/Model.ts | 34 ++++++++++++++++---- src/Models.test.ts | 50 ++++++++++++++++++++++++++--- src/Models.ts | 36 +++++++++++++++------ src/options/EventStreamOptions.d.ts | 5 +++ src/options/ModelOptions.d.ts | 6 +++- 8 files changed, 226 insertions(+), 23 deletions(-) create mode 100644 src/EventStream.test.ts create mode 100644 src/EventStream.ts create mode 100644 src/Model.test.ts create mode 100644 src/options/EventStreamOptions.d.ts diff --git a/src/EventStream.test.ts b/src/EventStream.test.ts new file mode 100644 index 00000000..4828a995 --- /dev/null +++ b/src/EventStream.test.ts @@ -0,0 +1,37 @@ +import { it, describe, expect, expectTypeOf, beforeEach, afterEach } from 'vitest'; +import { Realtime, Types } from 'ably/promises'; +import { WebSocket } from 'mock-socket'; + +import EventStream from './EventStream'; + +import Server from './utilities/test/mock-server.js'; +import defaultClientConfig from './utilities/test/default-client-config.js'; + +interface EventStreamTestContext { + client: Types.RealtimePromise; + server: Server; +} + +describe('EventStream', () => { + beforeEach((context) => { + (Realtime as any).Platform.Config.WebSocket = WebSocket; + context.server = new Server('wss://realtime.ably.io/'); + context.client = new Realtime(defaultClientConfig); + }); + + afterEach((context) => { + context.server.stop(); + }); + + it('connects successfully with the Ably Client', async ({ client, server }) => { + server.start(); + const connectSuccess = await client.connection.whenState('connected'); + expect(connectSuccess.current).toBe('connected'); + }); + + it('expects the injected client to be of the type RealtimePromise', ({ client }) => { + const eventStream = new EventStream('test', client, { channel: 'foobar' }); + expect(eventStream.name).toEqual('test'); + expectTypeOf(eventStream.client).toMatchTypeOf(); + }); +}); diff --git a/src/EventStream.ts b/src/EventStream.ts new file mode 100644 index 00000000..1e864d8e --- /dev/null +++ b/src/EventStream.ts @@ -0,0 +1,31 @@ +import { Types } from 'ably'; +import EventStreamOptions from './options/EventStreamOptions'; +import EventEmitter from './utilities/EventEmitter'; + +const EVENT_STREAM_OPTIONS_DEFAULTS = {}; + +enum EventStreamState { + /** + * The event stream has been initialized but no attach has yet been attempted. + */ + INITIALIZED = 'initialized', + /** + * This state is entered if the event stream encounters a failure condition that it cannot recover from. + */ + FAILED = 'failed', +} + +class EventStream extends EventEmitter { + private options: EventStreamOptions; + private connectionId?: string; + private state: EventStreamState = EventStreamState.INITIALIZED; + private data: T; + + constructor(readonly name: string, readonly client: Types.RealtimePromise, options: EventStreamOptions) { + super(); + this.options = { ...EVENT_STREAM_OPTIONS_DEFAULTS, ...options }; + this.connectionId = this.client.connection.id; + } +} + +export default EventStream; diff --git a/src/Model.test.ts b/src/Model.test.ts new file mode 100644 index 00000000..7e43fdf1 --- /dev/null +++ b/src/Model.test.ts @@ -0,0 +1,50 @@ +import { it, describe, expect, expectTypeOf, beforeEach, afterEach } from 'vitest'; +import { Realtime, Types } from 'ably/promises'; +import { WebSocket } from 'mock-socket'; + +import Model from './Model'; +import EventStream from './EventStream'; + +import Server from './utilities/test/mock-server.js'; +import defaultClientConfig from './utilities/test/default-client-config.js'; + +interface ModelTestContext { + client: Types.RealtimePromise; + server: Server; +} + +describe('Model', () => { + beforeEach((context) => { + (Realtime as any).Platform.Config.WebSocket = WebSocket; + context.server = new Server('wss://realtime.ably.io/'); + context.client = new Realtime(defaultClientConfig); + }); + + afterEach((context) => { + context.server.stop(); + }); + + it('connects successfully with the Ably Client', async ({ client, server }) => { + server.start(); + const connectSuccess = await client.connection.whenState('connected'); + expect(connectSuccess.current).toBe('connected'); + }); + + it('expects the injected client to be of the type RealtimePromise', ({ client }) => { + const model = new Model('test', client); + expect(model.name).toEqual('test'); + expectTypeOf(model.client).toMatchTypeOf(); + }); + + it('expects model to be instantiated with the provided event streams', ({ client }) => { + const model = new Model('test', client, { + streams: [new EventStream('s1', client, { channel: 's1' }), new EventStream('s2', client, { channel: 's2' })], + }); + expect(model.name).toEqual('test'); + expect(model.stream('s1')).toBeTruthy(); + expect(model.stream('s1').name).toEqual('s1'); + expect(model.stream('s2')).toBeTruthy(); + expect(model.stream('s2').name).toEqual('s2'); + expect(() => model.stream('s3')).toThrowError("stream with name 's3' not registered on model 'test'"); + }); +}); diff --git a/src/Model.ts b/src/Model.ts index 5d46212c..8ff00676 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -1,18 +1,38 @@ import { Types } from 'ably'; - +import EventStream from './EventStream'; import ModelOptions from './options/ModelOptions'; import EventEmitter from './utilities/EventEmitter'; -const MODEL_OPTIONS_DEFAULTS = {}; +enum ModelState { + /** + * The model has been initialized but has not yet been synchronised. + */ + INITIALIZED = 'initialized', + /** + * An indefinite failure condition. This state is entered if a channel error has been received from the Ably service, such as an attempt to attach without the necessary access rights. + */ + FAILED = 'failed', +} -class Model extends EventEmitter { - private options: ModelOptions; - private connectionId?: string; +class Model extends EventEmitter { + private state: ModelState = ModelState.INITIALIZED; + private streams: Record> = {}; + private data: T; constructor(readonly name: string, readonly client: Types.RealtimePromise, options?: ModelOptions) { super(); - this.options = { ...MODEL_OPTIONS_DEFAULTS, ...options }; - this.connectionId = this.client.connection.id; + if (options) { + for (let stream of options.streams) { + this.streams[stream.name] = stream; + } + } + } + + stream(name: string): EventStream { + if (!this.streams[name]) { + throw new Error(`stream with name '${name}' not registered on model '${this.name}'`); + } + return this.streams[name]; } } diff --git a/src/Models.test.ts b/src/Models.test.ts index 44109ab2..406f87af 100644 --- a/src/Models.test.ts +++ b/src/Models.test.ts @@ -23,17 +23,17 @@ describe('Models', () => { context.server.stop(); }); - it('expects the injected client to be of the type RealtimePromise', ({ client }) => { - const models = new Models(client); - expectTypeOf(models.ably).toMatchTypeOf(); - }); - it('connects successfully with the Ably Client', async ({ client, server }) => { server.start(); const connectSuccess = await client.connection.whenState('connected'); expect(connectSuccess.current).toBe('connected'); }); + it('expects the injected client to be of the type RealtimePromise', ({ client }) => { + const models = new Models(client); + expectTypeOf(models.ably).toMatchTypeOf(); + }); + it('creates a client with default options when a key is passed in', () => { const models = new Models(defaultClientConfig.key); expect(models.ably['options'].key).toEqual(defaultClientConfig.key); @@ -65,4 +65,44 @@ describe('Models', () => { 'model-default-client', ]); }); + + it('creates a model that inherits the root class ably client', () => { + const models = new Models({ ...defaultClientConfig }); + const model = models.Model('test'); + expect(model.name).toEqual('test'); + expect(model.client['options']).toContain(defaultClientConfig); + }); + + it('getting a model with the same name returns the same instance', () => { + const models = new Models({ ...defaultClientConfig }); + const model1 = models.Model('test'); + expect(model1.name).toEqual('test'); + const model2 = models.Model('test'); + expect(model2.name).toEqual('test'); + expect(model1).toEqual(model2); + }); + + it('creates an event stream that inherits the root class ably client', () => { + const models = new Models({ ...defaultClientConfig }); + const eventStream = models.EventStream('test', { channel: 'foobar' }); + expect(eventStream.name).toEqual('test'); + expect(eventStream.client['options']).toContain(defaultClientConfig); + }); + + it('getting an event stream without options throws', () => { + const models = new Models({ ...defaultClientConfig }); + expect(() => models.EventStream('test')).toThrow('EventStream cannot be instantiated without options'); + }); + + it('getting an event stream with the same name returns the same instance', () => { + const models = new Models({ ...defaultClientConfig }); + const eventStream1 = models.EventStream('test', { channel: 'foobar' }); // first call requires options to instantiate + expect(eventStream1.name).toEqual('test'); + const eventStream2 = models.EventStream('test'); // subsequent calls do not require options + expect(eventStream2.name).toEqual('test'); + expect(eventStream1).toEqual(eventStream2); + const eventStream3 = models.EventStream('test', { channel: 'barbaz' }); // providing options to subsequent calls is allowed but ignored + expect(eventStream3.name).toEqual('test'); + expect(eventStream1).toEqual(eventStream3); + }); }); diff --git a/src/Models.ts b/src/Models.ts index e73d85b0..ba90dbe6 100644 --- a/src/Models.ts +++ b/src/Models.ts @@ -1,16 +1,19 @@ import { Types, Realtime } from 'ably'; import ModelOptions from './options/ModelOptions'; import Model from './Model'; +import EventStreamOptions from './options/EventStreamOptions'; +import EventStream from './EventStream'; class Models { - private models: Record; - private channel: Types.RealtimeChannelPromise; + private models: Record>; + private eventStreams: Record>; ably: Types.RealtimePromise; readonly version = '0.0.1'; constructor(optionsOrAbly: Types.RealtimePromise | Types.ClientOptions | string) { this.models = {}; + this.eventStreams = {}; if (optionsOrAbly['options']) { this.ably = optionsOrAbly as Types.RealtimePromise; this.addAgent(this.ably['options'], false); @@ -32,22 +35,35 @@ class Models { } } - async get(name: string, options?: ModelOptions): Promise { + Model = (name: string, options?: ModelOptions) => { if (typeof name !== 'string' || name.length === 0) { - throw new Error('Models must have a non-empty name'); + throw new Error('Model must have a non-empty name'); } if (this.models[name]) return this.models[name]; - if (this.ably.connection.state !== 'connected') { - await this.ably.connection.once('connected'); - } - - const model = new Model(name, this.ably, options); + const model = new Model(name, this.ably, options); this.models[name] = model; return model; - } + }; + + EventStream = (name: string, options?: EventStreamOptions) => { + if (typeof name !== 'string' || name.length === 0) { + throw new Error('EventStream must have a non-empty name'); + } + + if (this.eventStreams[name]) return this.eventStreams[name]; + + if (!options) { + throw new Error('EventStream cannot be instantiated without options'); + } + + const eventStream = new EventStream(name, this.ably, options); + this.eventStreams[name] = eventStream; + + return eventStream; + }; } export default Models; diff --git a/src/options/EventStreamOptions.d.ts b/src/options/EventStreamOptions.d.ts new file mode 100644 index 00000000..6e4596b2 --- /dev/null +++ b/src/options/EventStreamOptions.d.ts @@ -0,0 +1,5 @@ +type EventStreamOptions = { + channel: string; +}; + +export default EventStreamOptions; diff --git a/src/options/ModelOptions.d.ts b/src/options/ModelOptions.d.ts index 7a18628f..15b851f6 100644 --- a/src/options/ModelOptions.d.ts +++ b/src/options/ModelOptions.d.ts @@ -1,3 +1,7 @@ -type ModelOptions = {}; +import EventStream from '../EventStream'; + +type ModelOptions = { + streams: Array>; +}; export default ModelOptions; From 4b97a65c1f1082d5634e18d7619220ff8d568918 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Mon, 5 Jun 2023 11:03:50 +0100 Subject: [PATCH 2/2] stream: add simple stream implementation This initial Stream implementation adds a very simple wrapper around an Ably channel. Messages received on the channel are surfaced to the caller via a subscription callback. The Stream is an EventEmitter and exposes its state via events. Additionally the stream can be pauses, resumed and disposed. This will allow the Models SDK to manipulate the stream/channel according to the model's lifecycle. While not yet particularly useful, the idea is that the Stream will handle: - message de-duplication - message re-ordering - detect discontinuities (due to connection state recovery as well as due to failed connector publishes) --- __mocks__/ably/promises/index.ts | 78 ++---- package-lock.json | 69 +++-- package.json | 3 +- src/EventStream.test.ts | 37 --- src/EventStream.ts | 31 --- src/Model.test.ts | 4 +- src/Model.ts | 6 +- src/Models.test.ts | 28 +- src/Models.ts | 22 +- src/Stream.test.ts | 248 ++++++++++++++++++ src/Stream.ts | 143 ++++++++++ src/options/EventStreamOptions.d.ts | 5 - src/options/ModelOptions.d.ts | 4 +- src/options/StreamOptions.d.ts | 5 + src/types/callbacks.d.ts | 2 + src/utilities/test/messages.ts | 34 +++ .../test/mock-server-action-responses.ts | 53 ++-- src/utilities/test/protocol.ts | 43 +++ 18 files changed, 618 insertions(+), 197 deletions(-) delete mode 100644 src/EventStream.test.ts delete mode 100644 src/EventStream.ts create mode 100644 src/Stream.test.ts create mode 100644 src/Stream.ts delete mode 100644 src/options/EventStreamOptions.d.ts create mode 100644 src/options/StreamOptions.d.ts create mode 100644 src/types/callbacks.d.ts create mode 100644 src/utilities/test/messages.ts create mode 100644 src/utilities/test/protocol.ts diff --git a/__mocks__/ably/promises/index.ts b/__mocks__/ably/promises/index.ts index cfaa5706..a14b12e9 100644 --- a/__mocks__/ably/promises/index.ts +++ b/__mocks__/ably/promises/index.ts @@ -1,69 +1,33 @@ import { Types } from 'ably/promises'; -const MOCK_CLIENT_ID = 'MOCK_CLIENT_ID'; +const mockPromiseErrorNotImplemented = (name: string): Promise => new Promise((_, reject) => reject(new Error(`mock '${name}' not implemented`))); +const mockNotImplemented = (name: string): T => { throw new Error(`mock ${name} not implemented`) }; -const mockPromisify = (expectedReturnValue): Promise => new Promise((resolve) => resolve(expectedReturnValue)); -const methodReturningVoidPromise = () => mockPromisify((() => {})()); +type MockChannel = Partial; -const mockPresence = { - get: () => mockPromisify([]), - update: () => mockPromisify(undefined), - enter: methodReturningVoidPromise, - leave: methodReturningVoidPromise, - subscriptions: { - once: async (_, fn) => { - return await fn(); - }, - }, - subscribe: () => {}, -}; +const mockChannel: MockChannel = { + on: () => mockNotImplemented('on'), + attach: () => mockPromiseErrorNotImplemented('attach'), + detach: () => mockPromiseErrorNotImplemented('detach'), + subscribe: () => mockPromiseErrorNotImplemented('subscribe'), +} -const mockHistory = { - items: [], - first: () => mockPromisify(mockHistory), - next: () => mockPromisify(mockHistory), - current: () => mockPromisify(mockHistory), - hasNext: () => false, - isLast: () => true, -}; +type MockChannels = Partial>; -const mockEmitter = { - any: [], - events: {}, - anyOnce: [], - eventsOnce: {}, -}; +const mockChannels: MockChannels = { + get: () => mockChannel, + release: () => mockNotImplemented('release'), +} -const mockChannel = { - presence: mockPresence, - history: () => mockHistory, - subscribe: () => {}, - publish: () => {}, - subscriptions: mockEmitter, -}; +type MockConnection = Partial; -class MockRealtime { - public channels: { - get: () => typeof mockChannel; - }; - public auth: { - clientId: string; - }; - public connection: { - id?: string; - }; +const mockConnection: MockConnection = { + whenState: () => mockPromiseErrorNotImplemented('whenState') +} - constructor() { - this.channels = { - get: () => mockChannel, - }; - this.auth = { - clientId: MOCK_CLIENT_ID, - }; - this.connection = { - id: '1', - }; - } +class MockRealtime { + public channels = mockChannels; + public connection = mockConnection; } export { MockRealtime as Realtime }; diff --git a/package-lock.json b/package-lock.json index 5499071d..aa795599 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,8 @@ "version": "0.0.1", "license": "ISC", "dependencies": { - "ably": "^1.2.39" + "ably": "^1.2.39", + "rxjs": "^7.8.1" }, "devDependencies": { "@typescript-eslint/eslint-plugin": "^5.51.0", @@ -2112,6 +2113,18 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/globals/node_modules/type-fest": { + "version": "0.20.2", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", + "integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==", + "dev": true, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/globby": { "version": "11.1.0", "resolved": "https://registry.npmjs.org/globby/-/globby-11.1.0.tgz", @@ -3093,6 +3106,19 @@ "queue-microtask": "^1.2.2" } }, + "node_modules/rxjs": { + "version": "7.8.1", + "resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.8.1.tgz", + "integrity": "sha512-AA3TVj+0A2iuIoQkWEK/tqFjBq2j+6PO6Y0zJcvzLAFhEFIO3HL0vls9hWLncZbAAbK0mar7oZ4V079I/qPMxg==", + "dependencies": { + "tslib": "^2.1.0" + } + }, + "node_modules/rxjs/node_modules/tslib": { + "version": "2.5.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.5.2.tgz", + "integrity": "sha512-5svOrSA2w3iGFDs1HibEVBGbDrAY82bFQ3HZ3ixB+88nsbsWQoKqDRb5UBYAUPEzbBn6dAp5gRNXglySbx1MlA==" + }, "node_modules/safe-regex": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/safe-regex/-/safe-regex-2.1.1.tgz", @@ -3438,18 +3464,6 @@ "node": ">=4" } }, - "node_modules/type-fest": { - "version": "0.20.2", - "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", - "integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==", - "dev": true, - "engines": { - "node": ">=10" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "node_modules/typescript": { "version": "4.9.5", "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", @@ -5204,6 +5218,14 @@ "dev": true, "requires": { "type-fest": "^0.20.2" + }, + "dependencies": { + "type-fest": { + "version": "0.20.2", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", + "integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==", + "dev": true + } } }, "globby": { @@ -5889,6 +5911,21 @@ "queue-microtask": "^1.2.2" } }, + "rxjs": { + "version": "7.8.1", + "resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.8.1.tgz", + "integrity": "sha512-AA3TVj+0A2iuIoQkWEK/tqFjBq2j+6PO6Y0zJcvzLAFhEFIO3HL0vls9hWLncZbAAbK0mar7oZ4V079I/qPMxg==", + "requires": { + "tslib": "^2.1.0" + }, + "dependencies": { + "tslib": { + "version": "2.5.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.5.2.tgz", + "integrity": "sha512-5svOrSA2w3iGFDs1HibEVBGbDrAY82bFQ3HZ3ixB+88nsbsWQoKqDRb5UBYAUPEzbBn6dAp5gRNXglySbx1MlA==" + } + } + }, "safe-regex": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/safe-regex/-/safe-regex-2.1.1.tgz", @@ -6149,12 +6186,6 @@ "integrity": "sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==", "dev": true }, - "type-fest": { - "version": "0.20.2", - "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", - "integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==", - "dev": true - }, "typescript": { "version": "4.9.5", "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", diff --git a/package.json b/package.json index 3eab4c33..e0046f9e 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "vitest": "^0.29.8" }, "dependencies": { - "ably": "^1.2.39" + "ably": "^1.2.39", + "rxjs": "^7.8.1" } } diff --git a/src/EventStream.test.ts b/src/EventStream.test.ts deleted file mode 100644 index 4828a995..00000000 --- a/src/EventStream.test.ts +++ /dev/null @@ -1,37 +0,0 @@ -import { it, describe, expect, expectTypeOf, beforeEach, afterEach } from 'vitest'; -import { Realtime, Types } from 'ably/promises'; -import { WebSocket } from 'mock-socket'; - -import EventStream from './EventStream'; - -import Server from './utilities/test/mock-server.js'; -import defaultClientConfig from './utilities/test/default-client-config.js'; - -interface EventStreamTestContext { - client: Types.RealtimePromise; - server: Server; -} - -describe('EventStream', () => { - beforeEach((context) => { - (Realtime as any).Platform.Config.WebSocket = WebSocket; - context.server = new Server('wss://realtime.ably.io/'); - context.client = new Realtime(defaultClientConfig); - }); - - afterEach((context) => { - context.server.stop(); - }); - - it('connects successfully with the Ably Client', async ({ client, server }) => { - server.start(); - const connectSuccess = await client.connection.whenState('connected'); - expect(connectSuccess.current).toBe('connected'); - }); - - it('expects the injected client to be of the type RealtimePromise', ({ client }) => { - const eventStream = new EventStream('test', client, { channel: 'foobar' }); - expect(eventStream.name).toEqual('test'); - expectTypeOf(eventStream.client).toMatchTypeOf(); - }); -}); diff --git a/src/EventStream.ts b/src/EventStream.ts deleted file mode 100644 index 1e864d8e..00000000 --- a/src/EventStream.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { Types } from 'ably'; -import EventStreamOptions from './options/EventStreamOptions'; -import EventEmitter from './utilities/EventEmitter'; - -const EVENT_STREAM_OPTIONS_DEFAULTS = {}; - -enum EventStreamState { - /** - * The event stream has been initialized but no attach has yet been attempted. - */ - INITIALIZED = 'initialized', - /** - * This state is entered if the event stream encounters a failure condition that it cannot recover from. - */ - FAILED = 'failed', -} - -class EventStream extends EventEmitter { - private options: EventStreamOptions; - private connectionId?: string; - private state: EventStreamState = EventStreamState.INITIALIZED; - private data: T; - - constructor(readonly name: string, readonly client: Types.RealtimePromise, options: EventStreamOptions) { - super(); - this.options = { ...EVENT_STREAM_OPTIONS_DEFAULTS, ...options }; - this.connectionId = this.client.connection.id; - } -} - -export default EventStream; diff --git a/src/Model.test.ts b/src/Model.test.ts index 7e43fdf1..0c9e67ca 100644 --- a/src/Model.test.ts +++ b/src/Model.test.ts @@ -3,7 +3,7 @@ import { Realtime, Types } from 'ably/promises'; import { WebSocket } from 'mock-socket'; import Model from './Model'; -import EventStream from './EventStream'; +import Stream from './Stream'; import Server from './utilities/test/mock-server.js'; import defaultClientConfig from './utilities/test/default-client-config.js'; @@ -38,7 +38,7 @@ describe('Model', () => { it('expects model to be instantiated with the provided event streams', ({ client }) => { const model = new Model('test', client, { - streams: [new EventStream('s1', client, { channel: 's1' }), new EventStream('s2', client, { channel: 's2' })], + streams: [new Stream('s1', client, { channel: 's1' }), new Stream('s2', client, { channel: 's2' })], }); expect(model.name).toEqual('test'); expect(model.stream('s1')).toBeTruthy(); diff --git a/src/Model.ts b/src/Model.ts index 8ff00676..0af866a8 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -1,5 +1,5 @@ import { Types } from 'ably'; -import EventStream from './EventStream'; +import Stream from './Stream'; import ModelOptions from './options/ModelOptions'; import EventEmitter from './utilities/EventEmitter'; @@ -16,7 +16,7 @@ enum ModelState { class Model extends EventEmitter { private state: ModelState = ModelState.INITIALIZED; - private streams: Record> = {}; + private streams: Record> = {}; private data: T; constructor(readonly name: string, readonly client: Types.RealtimePromise, options?: ModelOptions) { @@ -28,7 +28,7 @@ class Model extends EventEmitter { } } - stream(name: string): EventStream { + stream(name: string): Stream { if (!this.streams[name]) { throw new Error(`stream with name '${name}' not registered on model '${this.name}'`); } diff --git a/src/Models.test.ts b/src/Models.test.ts index 406f87af..e91af243 100644 --- a/src/Models.test.ts +++ b/src/Models.test.ts @@ -82,27 +82,27 @@ describe('Models', () => { expect(model1).toEqual(model2); }); - it('creates an event stream that inherits the root class ably client', () => { + it('creates a stream that inherits the root class ably client', () => { const models = new Models({ ...defaultClientConfig }); - const eventStream = models.EventStream('test', { channel: 'foobar' }); - expect(eventStream.name).toEqual('test'); - expect(eventStream.client['options']).toContain(defaultClientConfig); + const stream = models.Stream('test', { channel: 'foobar' }); + expect(stream.name).toEqual('test'); + expect(stream.ably['options']).toContain(defaultClientConfig); }); - it('getting an event stream without options throws', () => { + it('getting a stream without options throws', () => { const models = new Models({ ...defaultClientConfig }); - expect(() => models.EventStream('test')).toThrow('EventStream cannot be instantiated without options'); + expect(() => models.Stream('test')).toThrow('Stream cannot be instantiated without options'); }); it('getting an event stream with the same name returns the same instance', () => { const models = new Models({ ...defaultClientConfig }); - const eventStream1 = models.EventStream('test', { channel: 'foobar' }); // first call requires options to instantiate - expect(eventStream1.name).toEqual('test'); - const eventStream2 = models.EventStream('test'); // subsequent calls do not require options - expect(eventStream2.name).toEqual('test'); - expect(eventStream1).toEqual(eventStream2); - const eventStream3 = models.EventStream('test', { channel: 'barbaz' }); // providing options to subsequent calls is allowed but ignored - expect(eventStream3.name).toEqual('test'); - expect(eventStream1).toEqual(eventStream3); + const stream1 = models.Stream('test', { channel: 'foobar' }); // first call requires options to instantiate + expect(stream1.name).toEqual('test'); + const stream2 = models.Stream('test'); // subsequent calls do not require options + expect(stream2.name).toEqual('test'); + expect(stream1).toEqual(stream2); + const stream3 = models.Stream('test', { channel: 'barbaz' }); // providing options to subsequent calls is allowed but ignored + expect(stream3.name).toEqual('test'); + expect(stream1).toEqual(stream3); }); }); diff --git a/src/Models.ts b/src/Models.ts index ba90dbe6..864923c8 100644 --- a/src/Models.ts +++ b/src/Models.ts @@ -1,19 +1,19 @@ import { Types, Realtime } from 'ably'; import ModelOptions from './options/ModelOptions'; import Model from './Model'; -import EventStreamOptions from './options/EventStreamOptions'; -import EventStream from './EventStream'; +import StreamOptions from './options/StreamOptions'; +import Stream from './Stream'; class Models { private models: Record>; - private eventStreams: Record>; + private streams: Record>; ably: Types.RealtimePromise; readonly version = '0.0.1'; constructor(optionsOrAbly: Types.RealtimePromise | Types.ClientOptions | string) { this.models = {}; - this.eventStreams = {}; + this.streams = {}; if (optionsOrAbly['options']) { this.ably = optionsOrAbly as Types.RealtimePromise; this.addAgent(this.ably['options'], false); @@ -48,21 +48,21 @@ class Models { return model; }; - EventStream = (name: string, options?: EventStreamOptions) => { + Stream = (name: string, options?: StreamOptions) => { if (typeof name !== 'string' || name.length === 0) { - throw new Error('EventStream must have a non-empty name'); + throw new Error('Stream must have a non-empty name'); } - if (this.eventStreams[name]) return this.eventStreams[name]; + if (this.streams[name]) return this.streams[name]; if (!options) { - throw new Error('EventStream cannot be instantiated without options'); + throw new Error('Stream cannot be instantiated without options'); } - const eventStream = new EventStream(name, this.ably, options); - this.eventStreams[name] = eventStream; + const stream = new Stream(name, this.ably, options); + this.streams[name] = stream; - return eventStream; + return stream; }; } diff --git a/src/Stream.test.ts b/src/Stream.test.ts new file mode 100644 index 00000000..ccab8899 --- /dev/null +++ b/src/Stream.test.ts @@ -0,0 +1,248 @@ +import { it, describe, expect, beforeEach, afterEach, vi } from 'vitest'; +import { Realtime, Types } from 'ably/promises'; +import { Subject } from 'rxjs'; + +import { baseMessage } from './utilities/test/messages'; +import Stream, { StreamState } from './Stream'; + +vi.mock('ably/promises'); + +interface StreamTestContext { + client: Types.RealtimePromise; + channel: Types.RealtimeChannelPromise; +} + +interface TestEvent {} + +const streamStatePromise = (stream: Stream, state: StreamState) => + new Promise((resolve) => stream.whenState(state, stream.state, resolve)); + +function createMessage(i: number): Types.Message { + return { + ...baseMessage, + id: `id_${i}`, + name: `name_${i}`, + data: `data_${i}`, + }; +} + +describe('Stream', () => { + beforeEach((context) => { + const client = new Realtime({}); + client.connection.whenState = vi.fn<[Types.ConnectionState], Promise>(async () => { + return { + current: 'connected', + previous: 'initialized', + }; + }); + + const channel = client.channels.get('foobar'); + channel.on = vi.fn(); // all tests call `channel.on('fail')` + + context.client = client; + context.channel = channel; + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('enters ready state when successfully attached to the channel', async ({ client, channel }) => { + // the promise returned by the subscribe method resolves when we have successfully attached to the channel + let attach; + const attachment = new Promise((resolve) => (attach = resolve)); + channel.subscribe = vi.fn().mockImplementation(async () => { + await attachment; + }); + + const stream = new Stream('test', client, { channel: 'foobar' }); + + await streamStatePromise(stream, StreamState.PREPARING); + attach(); + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + it('pauses the stream', async ({ client, channel }) => { + channel.subscribe = vi.fn(); + channel.detach = vi.fn(); + + const stream = new Stream('test', client, { channel: channel.name }); + + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + + stream.pause(); + await streamStatePromise(stream, StreamState.PAUSED); + expect(channel.detach).toHaveBeenCalledOnce(); + }); + + it('resumes the stream', async ({ client, channel }) => { + channel.subscribe = vi.fn(); + channel.detach = vi.fn(); + channel.attach = vi.fn(); + + const stream = new Stream('test', client, { channel: channel.name }); + + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + + stream.pause(); + await streamStatePromise(stream, StreamState.PAUSED); + expect(channel.detach).toHaveBeenCalledOnce(); + + stream.resume(); + await streamStatePromise(stream, StreamState.READY); + expect(channel.attach).toHaveBeenCalledOnce(); + }); + + it('disposes of the stream', async ({ client, channel }) => { + client.channels.release = vi.fn(); + + const stream = new Stream('test', client, { channel: channel.name }); + + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + + stream.dispose(); + await streamStatePromise(stream, StreamState.DISPOSED); + expect(client.channels.release).toHaveBeenCalledOnce(); + }); + + it('disposes of the stream on channel failed', async ({ client, channel }) => { + let fail; + channel.on = vi.fn(async (name: string, callback) => { + if (name === 'failed') { + fail = callback; + } + }); + + client.channels.release = vi.fn(); + + const stream = new Stream('test', client, { channel: channel.name }); + + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + + fail({ reason: 'test' }); + await streamStatePromise(stream, StreamState.DISPOSED); + expect(client.channels.release).toHaveBeenCalledOnce(); + }); + + it('subscribes to messages', async ({ client, channel }) => { + let messages = new Subject(); + channel.subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream('test', client, { channel: channel.name }); + await streamStatePromise(stream, StreamState.READY); + + const subscriptionSpy = vi.fn(); + stream.subscribe(subscriptionSpy); + + for (let i = 0; i < 10; i++) { + messages.next(createMessage(i)); + } + + expect(subscriptionSpy).toHaveBeenCalledTimes(10); + for (let i = 0; i < 10; i++) { + expect(subscriptionSpy).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + it('subscribes with multiple listeners', async ({ client, channel }) => { + let messages = new Subject(); + channel.subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream('test', client, { channel: channel.name }); + await streamStatePromise(stream, StreamState.READY); + + const subscriptionSpy1 = vi.fn(); + stream.subscribe(subscriptionSpy1); + + const subscriptionSpy2 = vi.fn(); + stream.subscribe(subscriptionSpy2); + + for (let i = 0; i < 10; i++) { + messages.next(createMessage(i)); + } + + expect(subscriptionSpy1).toHaveBeenCalledTimes(10); + expect(subscriptionSpy2).toHaveBeenCalledTimes(10); + for (let i = 0; i < 10; i++) { + expect(subscriptionSpy1).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + expect(subscriptionSpy2).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + it('unsubscribes to messages', async ({ client, channel }) => { + let messages = new Subject(); + channel.subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream('test', client, { channel: channel.name }); + await streamStatePromise(stream, StreamState.READY); + + const subscriptionSpy = vi.fn(); + stream.subscribe(subscriptionSpy); + + for (let i = 0; i < 10; i++) { + if (i == 5) { + stream.unsubscribe(subscriptionSpy); + } + messages.next(createMessage(i)); + } + + expect(subscriptionSpy).toHaveBeenCalledTimes(5); + for (let i = 0; i < 5; i++) { + expect(subscriptionSpy).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + it('unsubscribes one of two listeners', async ({ client, channel }) => { + let messages = new Subject(); + channel.subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream('test', client, { channel: channel.name }); + await streamStatePromise(stream, StreamState.READY); + + const subscriptionSpy1 = vi.fn(); + stream.subscribe(subscriptionSpy1); + + const subscriptionSpy2 = vi.fn(); + stream.subscribe(subscriptionSpy2); + + for (let i = 0; i < 10; i++) { + if (i == 5) { + stream.unsubscribe(subscriptionSpy1); + } + messages.next(createMessage(i)); + } + + expect(subscriptionSpy1).toHaveBeenCalledTimes(5); + expect(subscriptionSpy2).toHaveBeenCalledTimes(10); + for (let i = 0; i < 10; i++) { + if (i < 5) { + expect(subscriptionSpy1).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + expect(subscriptionSpy2).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + // TODO discontinuity + // TODO reauth https://ably.com/docs/realtime/channels?lang=nodejs#fatal-errors +}); diff --git a/src/Stream.ts b/src/Stream.ts new file mode 100644 index 00000000..c46af330 --- /dev/null +++ b/src/Stream.ts @@ -0,0 +1,143 @@ +import { Types } from 'ably'; +import StreamOptions from './options/StreamOptions'; +import EventEmitter, { EventListener } from './utilities/EventEmitter'; +import { StandardCallback } from './types/callbacks'; + +const STREAM_OPTIONS_DEFAULTS = {}; + +export enum StreamState { + /** + * The stream has been initialized but no attach has yet been attempted. + */ + INITIALIZED = 'initialized', + /** + * The stream is attempting to establish a realtime connection and attach to the channel. + * The preparing state is entered as soon as the library has completed initialization, + * and is reentered each time connection is re-attempted following detachment or disconnection. + */ + PREPARING = 'preparing', + /** + * The stream has a realtime connection, is attached to the channel and is delivering messages. + */ + READY = 'ready', + /** + * The user has paused the stream. + */ + PAUSED = 'paused', + /** + * The stream has been disposed, either by the user disposing it or an unrecoverable error, + * and its resources are available for garbage collection. + */ + DISPOSED = 'disposed', +} + +type StreamStateChange = { + current: StreamState; + previous: StreamState; + reason?: Types.ErrorInfo | string; +}; + +type SubscriptionMessageEvent = { + message: Types.Message; + error?: never; +}; + +type SubscriptionErrorEvent = { + message?: never; + error: Types.ErrorInfo; +}; + +type SubscriptionEvent = SubscriptionMessageEvent | SubscriptionErrorEvent; + +type ListenerPair = { + message: EventListener; + error: EventListener; +}; + +class Stream extends EventEmitter> { + private options: StreamOptions; + private currentState: StreamState = StreamState.INITIALIZED; + private ablyChannel: Types.RealtimeChannelPromise; + private subscriptions = new EventEmitter(); + private subscriptionMap: Map, ListenerPair> = new Map(); + + constructor(readonly name: string, readonly ably: Types.RealtimePromise, options: StreamOptions) { + super(); + this.options = { ...STREAM_OPTIONS_DEFAULTS, ...options }; + this.ablyChannel = this.ably.channels.get(this.options.channel); + this.ablyChannel.on('failed', (change) => this.dispose(change.reason)); + this.init(); + } + + get state() { + return this.currentState; + } + + get channel() { + return this.options.channel; + } + + setState(state: StreamState, reason?: Types.ErrorInfo | string) { + const previous = this.currentState; + this.currentState = state; + this.emit(state, { + current: this.currentState, + previous, + reason, + } as StreamStateChange); + } + + async init() { + this.setState(StreamState.PREPARING); + await this.ably.connection.whenState('connected'); + await this.ablyChannel.subscribe(this.onMessage.bind(this)); + this.setState(StreamState.READY); + } + + async pause() { + this.setState(StreamState.PAUSED); + await this.ablyChannel.detach(); + } + + async resume() { + await this.ably.connection.whenState('connected'); + await this.ablyChannel.attach(); + this.setState(StreamState.READY); + } + + subscribe(callback: StandardCallback) { + if (this.currentState !== StreamState.READY) { + callback(new Error(`stream is not in ready state (state = ${this.currentState})`)); + return; + } + const listenerPair: ListenerPair = { + message: (message) => callback(null, message as T), + error: callback, + }; + this.subscriptions.on('message', listenerPair.message); + this.subscriptions.on('error', listenerPair.error); + this.subscriptionMap.set(callback, listenerPair); + } + + unsubscribe(callback: StandardCallback) { + const listeners = this.subscriptionMap.get(callback); + if (listeners) { + this.subscriptions.off('message', listeners.message); + this.subscriptions.off('error', listeners.error); + this.subscriptionMap.delete(callback); + } + } + + dispose(reason?: Types.ErrorInfo | string) { + this.setState(StreamState.DISPOSED, reason); + this.subscriptions.off('message'); + this.subscriptions.off('error'); + this.ably.channels.release(this.ablyChannel.name); + } + + onMessage(message: Types.Message) { + this.subscriptions.emit('message', message); + } +} + +export default Stream; diff --git a/src/options/EventStreamOptions.d.ts b/src/options/EventStreamOptions.d.ts deleted file mode 100644 index 6e4596b2..00000000 --- a/src/options/EventStreamOptions.d.ts +++ /dev/null @@ -1,5 +0,0 @@ -type EventStreamOptions = { - channel: string; -}; - -export default EventStreamOptions; diff --git a/src/options/ModelOptions.d.ts b/src/options/ModelOptions.d.ts index 15b851f6..6cf49315 100644 --- a/src/options/ModelOptions.d.ts +++ b/src/options/ModelOptions.d.ts @@ -1,7 +1,7 @@ -import EventStream from '../EventStream'; +import Stream from '../Stream'; type ModelOptions = { - streams: Array>; + streams: Array>; }; export default ModelOptions; diff --git a/src/options/StreamOptions.d.ts b/src/options/StreamOptions.d.ts new file mode 100644 index 00000000..35a1b9b0 --- /dev/null +++ b/src/options/StreamOptions.d.ts @@ -0,0 +1,5 @@ +type StreamOptions = { + channel: string; +}; + +export default StreamOptions; diff --git a/src/types/callbacks.d.ts b/src/types/callbacks.d.ts new file mode 100644 index 00000000..d62711ff --- /dev/null +++ b/src/types/callbacks.d.ts @@ -0,0 +1,2 @@ +export type StandardCallback = (err?: Error | null, result?: T) => void; +export type ErrCallback = (err?: Error | null) => void; diff --git a/src/utilities/test/messages.ts b/src/utilities/test/messages.ts new file mode 100644 index 00000000..b4424e1d --- /dev/null +++ b/src/utilities/test/messages.ts @@ -0,0 +1,34 @@ +import { Types } from 'ably/promises'; +import { Flags } from './protocol'; + +export const baseProtocolMessage = { + flags: Flags.PRESENCE | Flags.PUBLISH | Flags.SUBSCRIBE | Flags.PRESENCE_SUBSCRIBE, + id: 'PROTOCOL_MESSAGE_ID', + timestamp: 1, + count: 1, + connectionId: 'CONNECTION_ID', + channel: 'foobar', + channelSerial: 'CHANNEL_SERIAL', + msgSerial: 1, + connectionDetails: { + clientId: '', + connectionKey: 'randomKey', + maxMessageSize: 131000, + maxInboundRate: 1000, + maxOutboundRate: 1000, + maxFrameSize: 262144, + connectionStateTtl: 120000, + maxIdleInterval: 15000, + }, +}; + +export const baseMessage: Types.Message = { + id: '1', + data: null, + name: 'foo', + clientId: 'RND-CLIENTID', + connectionId: 'CONNECTION_ID', + encoding: 'utf-8', + extras: {}, + timestamp: 1, +}; diff --git a/src/utilities/test/mock-server-action-responses.ts b/src/utilities/test/mock-server-action-responses.ts index 704f06b3..bd92b035 100644 --- a/src/utilities/test/mock-server-action-responses.ts +++ b/src/utilities/test/mock-server-action-responses.ts @@ -1,21 +1,44 @@ +import { Actions } from './protocol'; +import { baseProtocolMessage } from './messages'; + const authAction = (override) => { return { - action: 4, - connectionId: 'CONNDESC', - connectionKey: 'CONNECTIONKEY', - connectionSerial: -1, - connectionDetails: { - clientId: 'RND-CLIENTID', - connectionKey: 'randomKey', - maxMessageSize: 131000, - maxInboundRate: 1000, - maxOutboundRate: 1000, - maxFrameSize: 262144, - connectionStateTtl: 120000, - maxIdleInterval: 15000, - }, + ...baseProtocolMessage, + action: Actions.CONNECTED, + ...override, + }; +}; + +const attachedAction = (override) => { + return { + ...baseProtocolMessage, + action: Actions.ATTACHED, + ...override, + }; +}; + +const detachedAction = (override) => { + return { + ...baseProtocolMessage, + action: Actions.DETACHED, + ...override, + }; +}; + +const ackAction = (override) => { + return { + ...baseProtocolMessage, + action: Actions.ACK, + ...override, + }; +}; + +const messageAction = (override) => { + return { + ...baseProtocolMessage, + action: Actions.MESSAGE, ...override, }; }; -export { authAction }; +export { authAction, attachedAction, detachedAction, ackAction, messageAction }; diff --git a/src/utilities/test/protocol.ts b/src/utilities/test/protocol.ts new file mode 100644 index 00000000..9b387d6f --- /dev/null +++ b/src/utilities/test/protocol.ts @@ -0,0 +1,43 @@ +/** + * AblyJS ProtocolMessage actions + * {@link https://github.com/ably/ably-js/blob/main/src/common/lib/types/protocolmessage.ts#L7} + */ +export enum Actions { + HEARTBEAT = 0, + ACK, + NACK, + CONNECT, + CONNECTED, + DISCONNECT, + DISCONNECTED, + CLOSE, + CLOSED, + ERROR, + ATTACH, + ATTACHED, + DETACH, + DETACHED, + PRESENCE, + MESSAGE, + SYNC, + AUTH, + ACTIVATE, +} + +/** + * AblyJS ProtocolMessage flags + * {@link https://github.com/ably/ably-js/blob/main/src/common/lib/types/protocolmessage.ts#L34} + */ +export enum Flags { + /* Channel attach state flags */ + HAS_PRESENCE = 1 << 0, + HAS_BACKLOG = 1 << 1, + RESUMED = 1 << 2, + TRANSIENT = 1 << 4, + ATTACH_RESUME = 1 << 5, + /* Channel mode flags */ + PRESENCE = 1 << 16, + PUBLISH = 1 << 17, + SUBSCRIBE = 1 << 18, + PRESENCE_SUBSCRIBE = 1 << 19, +}