From fc940a4f09591f06ba024649a7f068221226ccf8 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Mon, 5 Jun 2023 11:03:50 +0100 Subject: [PATCH] stream: add simple stream implementation This initial Stream implementation adds a very simple wrapper around an Ably channel. Messages received on the channel are surfaced to the caller via a subscription callback. The Stream is an EventEmitter and exposes its state via events. Additionally the stream can be pauses, resumed and disposed. This will allow the Models SDK to manipulate the stream/channel according to the model's lifecycle. While not yet particularly useful, the idea is that the Stream will handle: - message de-duplication - message re-ordering - detect discontinuities (due to connection state recovery as well as due to failed connector publishes) --- __mocks__/ably/promises/index.ts | 78 ++---- package-lock.json | 69 +++-- package.json | 3 +- src/EventStream.test.ts | 37 --- src/EventStream.ts | 31 --- src/Model.test.ts | 4 +- src/Model.ts | 6 +- src/Models.test.ts | 24 +- src/Models.ts | 22 +- src/Stream.test.ts | 251 ++++++++++++++++++ src/Stream.ts | 143 ++++++++++ src/options/EventStreamOptions.d.ts | 5 - src/options/ModelOptions.d.ts | 4 +- src/options/StreamOptions.d.ts | 5 + src/types/callbacks.d.ts | 2 + src/utilities/test/messages.ts | 34 +++ .../test/mock-server-action-responses.ts | 53 ++-- src/utilities/test/protocol.ts | 43 +++ 18 files changed, 619 insertions(+), 195 deletions(-) delete mode 100644 src/EventStream.test.ts delete mode 100644 src/EventStream.ts create mode 100644 src/Stream.test.ts create mode 100644 src/Stream.ts delete mode 100644 src/options/EventStreamOptions.d.ts create mode 100644 src/options/StreamOptions.d.ts create mode 100644 src/types/callbacks.d.ts create mode 100644 src/utilities/test/messages.ts create mode 100644 src/utilities/test/protocol.ts diff --git a/__mocks__/ably/promises/index.ts b/__mocks__/ably/promises/index.ts index cfaa5706..de47175d 100644 --- a/__mocks__/ably/promises/index.ts +++ b/__mocks__/ably/promises/index.ts @@ -1,69 +1,33 @@ import { Types } from 'ably/promises'; -const MOCK_CLIENT_ID = 'MOCK_CLIENT_ID'; +const mockPromiseErrorNotImplemented = (name: string): Promise => new Promise((_, reject) => reject(new Error(`mock '${name}' not implemented`))); +const mockNotImplemented = () => { throw new Error('not implemented') }; -const mockPromisify = (expectedReturnValue): Promise => new Promise((resolve) => resolve(expectedReturnValue)); -const methodReturningVoidPromise = () => mockPromisify((() => {})()); +type MockChannel = Partial; -const mockPresence = { - get: () => mockPromisify([]), - update: () => mockPromisify(undefined), - enter: methodReturningVoidPromise, - leave: methodReturningVoidPromise, - subscriptions: { - once: async (_, fn) => { - return await fn(); - }, - }, - subscribe: () => {}, -}; +const mockChannel: MockChannel = { + on: mockNotImplemented, + attach: () => mockPromiseErrorNotImplemented('attach'), + detach: () => mockPromiseErrorNotImplemented('detach'), + subscribe: () => mockPromiseErrorNotImplemented('subscribe'), +} -const mockHistory = { - items: [], - first: () => mockPromisify(mockHistory), - next: () => mockPromisify(mockHistory), - current: () => mockPromisify(mockHistory), - hasNext: () => false, - isLast: () => true, -}; +type MockChannels = Partial>; -const mockEmitter = { - any: [], - events: {}, - anyOnce: [], - eventsOnce: {}, -}; +const mockChannels: MockChannels = { + get: () => mockChannel, + release: () => {}, +} -const mockChannel = { - presence: mockPresence, - history: () => mockHistory, - subscribe: () => {}, - publish: () => {}, - subscriptions: mockEmitter, -}; +type MockConnection = Partial; -class MockRealtime { - public channels: { - get: () => typeof mockChannel; - }; - public auth: { - clientId: string; - }; - public connection: { - id?: string; - }; +const mockConnection: MockConnection = { + whenState: () => mockPromiseErrorNotImplemented('whenState') +} - constructor() { - this.channels = { - get: () => mockChannel, - }; - this.auth = { - clientId: MOCK_CLIENT_ID, - }; - this.connection = { - id: '1', - }; - } +class MockRealtime { + public channels = mockChannels; + public connection = mockConnection; } export { MockRealtime as Realtime }; diff --git a/package-lock.json b/package-lock.json index 5499071d..aa795599 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,8 @@ "version": "0.0.1", "license": "ISC", "dependencies": { - "ably": "^1.2.39" + "ably": "^1.2.39", + "rxjs": "^7.8.1" }, "devDependencies": { "@typescript-eslint/eslint-plugin": "^5.51.0", @@ -2112,6 +2113,18 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/globals/node_modules/type-fest": { + "version": "0.20.2", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", + "integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==", + "dev": true, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/globby": { "version": "11.1.0", "resolved": "https://registry.npmjs.org/globby/-/globby-11.1.0.tgz", @@ -3093,6 +3106,19 @@ "queue-microtask": "^1.2.2" } }, + "node_modules/rxjs": { + "version": "7.8.1", + "resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.8.1.tgz", + "integrity": "sha512-AA3TVj+0A2iuIoQkWEK/tqFjBq2j+6PO6Y0zJcvzLAFhEFIO3HL0vls9hWLncZbAAbK0mar7oZ4V079I/qPMxg==", + "dependencies": { + "tslib": "^2.1.0" + } + }, + "node_modules/rxjs/node_modules/tslib": { + "version": "2.5.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.5.2.tgz", + "integrity": "sha512-5svOrSA2w3iGFDs1HibEVBGbDrAY82bFQ3HZ3ixB+88nsbsWQoKqDRb5UBYAUPEzbBn6dAp5gRNXglySbx1MlA==" + }, "node_modules/safe-regex": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/safe-regex/-/safe-regex-2.1.1.tgz", @@ -3438,18 +3464,6 @@ "node": ">=4" } }, - "node_modules/type-fest": { - "version": "0.20.2", - "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", - "integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==", - "dev": true, - "engines": { - "node": ">=10" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "node_modules/typescript": { "version": "4.9.5", "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", @@ -5204,6 +5218,14 @@ "dev": true, "requires": { "type-fest": "^0.20.2" + }, + "dependencies": { + "type-fest": { + "version": "0.20.2", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", + "integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==", + "dev": true + } } }, "globby": { @@ -5889,6 +5911,21 @@ "queue-microtask": "^1.2.2" } }, + "rxjs": { + "version": "7.8.1", + "resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.8.1.tgz", + "integrity": "sha512-AA3TVj+0A2iuIoQkWEK/tqFjBq2j+6PO6Y0zJcvzLAFhEFIO3HL0vls9hWLncZbAAbK0mar7oZ4V079I/qPMxg==", + "requires": { + "tslib": "^2.1.0" + }, + "dependencies": { + "tslib": { + "version": "2.5.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.5.2.tgz", + "integrity": "sha512-5svOrSA2w3iGFDs1HibEVBGbDrAY82bFQ3HZ3ixB+88nsbsWQoKqDRb5UBYAUPEzbBn6dAp5gRNXglySbx1MlA==" + } + } + }, "safe-regex": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/safe-regex/-/safe-regex-2.1.1.tgz", @@ -6149,12 +6186,6 @@ "integrity": "sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==", "dev": true }, - "type-fest": { - "version": "0.20.2", - "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", - "integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==", - "dev": true - }, "typescript": { "version": "4.9.5", "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", diff --git a/package.json b/package.json index 3eab4c33..e0046f9e 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "vitest": "^0.29.8" }, "dependencies": { - "ably": "^1.2.39" + "ably": "^1.2.39", + "rxjs": "^7.8.1" } } diff --git a/src/EventStream.test.ts b/src/EventStream.test.ts deleted file mode 100644 index 4828a995..00000000 --- a/src/EventStream.test.ts +++ /dev/null @@ -1,37 +0,0 @@ -import { it, describe, expect, expectTypeOf, beforeEach, afterEach } from 'vitest'; -import { Realtime, Types } from 'ably/promises'; -import { WebSocket } from 'mock-socket'; - -import EventStream from './EventStream'; - -import Server from './utilities/test/mock-server.js'; -import defaultClientConfig from './utilities/test/default-client-config.js'; - -interface EventStreamTestContext { - client: Types.RealtimePromise; - server: Server; -} - -describe('EventStream', () => { - beforeEach((context) => { - (Realtime as any).Platform.Config.WebSocket = WebSocket; - context.server = new Server('wss://realtime.ably.io/'); - context.client = new Realtime(defaultClientConfig); - }); - - afterEach((context) => { - context.server.stop(); - }); - - it('connects successfully with the Ably Client', async ({ client, server }) => { - server.start(); - const connectSuccess = await client.connection.whenState('connected'); - expect(connectSuccess.current).toBe('connected'); - }); - - it('expects the injected client to be of the type RealtimePromise', ({ client }) => { - const eventStream = new EventStream('test', client, { channel: 'foobar' }); - expect(eventStream.name).toEqual('test'); - expectTypeOf(eventStream.client).toMatchTypeOf(); - }); -}); diff --git a/src/EventStream.ts b/src/EventStream.ts deleted file mode 100644 index 1e864d8e..00000000 --- a/src/EventStream.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { Types } from 'ably'; -import EventStreamOptions from './options/EventStreamOptions'; -import EventEmitter from './utilities/EventEmitter'; - -const EVENT_STREAM_OPTIONS_DEFAULTS = {}; - -enum EventStreamState { - /** - * The event stream has been initialized but no attach has yet been attempted. - */ - INITIALIZED = 'initialized', - /** - * This state is entered if the event stream encounters a failure condition that it cannot recover from. - */ - FAILED = 'failed', -} - -class EventStream extends EventEmitter { - private options: EventStreamOptions; - private connectionId?: string; - private state: EventStreamState = EventStreamState.INITIALIZED; - private data: T; - - constructor(readonly name: string, readonly client: Types.RealtimePromise, options: EventStreamOptions) { - super(); - this.options = { ...EVENT_STREAM_OPTIONS_DEFAULTS, ...options }; - this.connectionId = this.client.connection.id; - } -} - -export default EventStream; diff --git a/src/Model.test.ts b/src/Model.test.ts index 7e43fdf1..0c9e67ca 100644 --- a/src/Model.test.ts +++ b/src/Model.test.ts @@ -3,7 +3,7 @@ import { Realtime, Types } from 'ably/promises'; import { WebSocket } from 'mock-socket'; import Model from './Model'; -import EventStream from './EventStream'; +import Stream from './Stream'; import Server from './utilities/test/mock-server.js'; import defaultClientConfig from './utilities/test/default-client-config.js'; @@ -38,7 +38,7 @@ describe('Model', () => { it('expects model to be instantiated with the provided event streams', ({ client }) => { const model = new Model('test', client, { - streams: [new EventStream('s1', client, { channel: 's1' }), new EventStream('s2', client, { channel: 's2' })], + streams: [new Stream('s1', client, { channel: 's1' }), new Stream('s2', client, { channel: 's2' })], }); expect(model.name).toEqual('test'); expect(model.stream('s1')).toBeTruthy(); diff --git a/src/Model.ts b/src/Model.ts index 8ff00676..0af866a8 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -1,5 +1,5 @@ import { Types } from 'ably'; -import EventStream from './EventStream'; +import Stream from './Stream'; import ModelOptions from './options/ModelOptions'; import EventEmitter from './utilities/EventEmitter'; @@ -16,7 +16,7 @@ enum ModelState { class Model extends EventEmitter { private state: ModelState = ModelState.INITIALIZED; - private streams: Record> = {}; + private streams: Record> = {}; private data: T; constructor(readonly name: string, readonly client: Types.RealtimePromise, options?: ModelOptions) { @@ -28,7 +28,7 @@ class Model extends EventEmitter { } } - stream(name: string): EventStream { + stream(name: string): Stream { if (!this.streams[name]) { throw new Error(`stream with name '${name}' not registered on model '${this.name}'`); } diff --git a/src/Models.test.ts b/src/Models.test.ts index 406f87af..b36d830f 100644 --- a/src/Models.test.ts +++ b/src/Models.test.ts @@ -84,25 +84,25 @@ describe('Models', () => { it('creates an event stream that inherits the root class ably client', () => { const models = new Models({ ...defaultClientConfig }); - const eventStream = models.EventStream('test', { channel: 'foobar' }); - expect(eventStream.name).toEqual('test'); - expect(eventStream.client['options']).toContain(defaultClientConfig); + const stream = models.Stream('test', { channel: 'foobar' }); + expect(stream.name).toEqual('test'); + expect(stream.ably['options']).toContain(defaultClientConfig); }); it('getting an event stream without options throws', () => { const models = new Models({ ...defaultClientConfig }); - expect(() => models.EventStream('test')).toThrow('EventStream cannot be instantiated without options'); + expect(() => models.Stream('test')).toThrow('Stream cannot be instantiated without options'); }); it('getting an event stream with the same name returns the same instance', () => { const models = new Models({ ...defaultClientConfig }); - const eventStream1 = models.EventStream('test', { channel: 'foobar' }); // first call requires options to instantiate - expect(eventStream1.name).toEqual('test'); - const eventStream2 = models.EventStream('test'); // subsequent calls do not require options - expect(eventStream2.name).toEqual('test'); - expect(eventStream1).toEqual(eventStream2); - const eventStream3 = models.EventStream('test', { channel: 'barbaz' }); // providing options to subsequent calls is allowed but ignored - expect(eventStream3.name).toEqual('test'); - expect(eventStream1).toEqual(eventStream3); + const stream1 = models.Stream('test', { channel: 'foobar' }); // first call requires options to instantiate + expect(stream1.name).toEqual('test'); + const stream2 = models.Stream('test'); // subsequent calls do not require options + expect(stream2.name).toEqual('test'); + expect(stream1).toEqual(stream2); + const stream3 = models.Stream('test', { channel: 'barbaz' }); // providing options to subsequent calls is allowed but ignored + expect(stream3.name).toEqual('test'); + expect(stream1).toEqual(stream3); }); }); diff --git a/src/Models.ts b/src/Models.ts index ba90dbe6..864923c8 100644 --- a/src/Models.ts +++ b/src/Models.ts @@ -1,19 +1,19 @@ import { Types, Realtime } from 'ably'; import ModelOptions from './options/ModelOptions'; import Model from './Model'; -import EventStreamOptions from './options/EventStreamOptions'; -import EventStream from './EventStream'; +import StreamOptions from './options/StreamOptions'; +import Stream from './Stream'; class Models { private models: Record>; - private eventStreams: Record>; + private streams: Record>; ably: Types.RealtimePromise; readonly version = '0.0.1'; constructor(optionsOrAbly: Types.RealtimePromise | Types.ClientOptions | string) { this.models = {}; - this.eventStreams = {}; + this.streams = {}; if (optionsOrAbly['options']) { this.ably = optionsOrAbly as Types.RealtimePromise; this.addAgent(this.ably['options'], false); @@ -48,21 +48,21 @@ class Models { return model; }; - EventStream = (name: string, options?: EventStreamOptions) => { + Stream = (name: string, options?: StreamOptions) => { if (typeof name !== 'string' || name.length === 0) { - throw new Error('EventStream must have a non-empty name'); + throw new Error('Stream must have a non-empty name'); } - if (this.eventStreams[name]) return this.eventStreams[name]; + if (this.streams[name]) return this.streams[name]; if (!options) { - throw new Error('EventStream cannot be instantiated without options'); + throw new Error('Stream cannot be instantiated without options'); } - const eventStream = new EventStream(name, this.ably, options); - this.eventStreams[name] = eventStream; + const stream = new Stream(name, this.ably, options); + this.streams[name] = stream; - return eventStream; + return stream; }; } diff --git a/src/Stream.test.ts b/src/Stream.test.ts new file mode 100644 index 00000000..3f17826f --- /dev/null +++ b/src/Stream.test.ts @@ -0,0 +1,251 @@ +import { it, describe, expect, beforeEach, afterEach, vi } from 'vitest'; +import { Realtime, Types } from 'ably/promises'; +import { Subject } from 'rxjs'; + +import { baseMessage } from './utilities/test/messages'; +import Stream, { StreamState } from './Stream'; + +vi.mock('ably/promises'); + +interface StreamTestContext { + client: Types.RealtimePromise; + channel: Types.RealtimeChannelPromise; +} + +interface TestEvent {} + +const streamStatePromise = (stream: Stream, state: StreamState) => + new Promise((resolve) => stream.whenState(state, stream.state, resolve)); + +function createMessage(i: number): Types.Message { + return { + ...baseMessage, + id: `id_${i}`, + name: `name_${i}`, + data: `data_${i}`, + }; +} + +describe('Stream', () => { + beforeEach((context) => { + const client = new Realtime({}); + client.connection.whenState = vi.fn<[Types.ConnectionState], Promise>(async () => { + return { + current: 'connected', + previous: 'initialized', + }; + }); + + const channel = client.channels.get('foobar'); + channel.on = vi.fn(); // all tests call `channel.on('fail')` + + context.client = client; + context.channel = channel; + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it.only('enters ready state when successfully attached to the channel', async ({ + client, + channel, + }) => { + // the promise returned by the subscribe method resolves when we have successfully attached to the channel + let attach; + const attachment = new Promise((resolve) => (attach = resolve)); + channel.subscribe = vi.fn().mockImplementation(async () => { + await attachment; + }); + + const stream = new Stream('test', client, { channel: 'foobar' }); + + await streamStatePromise(stream, StreamState.PREPARING); + attach(); + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + it.only('pauses the stream', async ({ client, channel }) => { + channel.subscribe = vi.fn(); + channel.detach = vi.fn(); + + const stream = new Stream('test', client, { channel: channel.name }); + + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + + stream.pause(); + await streamStatePromise(stream, StreamState.PAUSED); + expect(channel.detach).toHaveBeenCalledOnce(); + }); + + it.only('resumes the stream', async ({ client, channel }) => { + channel.subscribe = vi.fn(); + channel.detach = vi.fn(); + channel.attach = vi.fn(); + + const stream = new Stream('test', client, { channel: channel.name }); + + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + + stream.pause(); + await streamStatePromise(stream, StreamState.PAUSED); + expect(channel.detach).toHaveBeenCalledOnce(); + + stream.resume(); + await streamStatePromise(stream, StreamState.READY); + expect(channel.attach).toHaveBeenCalledOnce(); + }); + + it.only('disposes of the stream', async ({ client, channel }) => { + const releaseSpy = vi.spyOn(client.channels, 'release'); + + const stream = new Stream('test', client, { channel: channel.name }); + + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + + stream.dispose(); + await streamStatePromise(stream, StreamState.DISPOSED); + expect(releaseSpy).toHaveBeenCalledOnce(); + }); + + it.only('disposes of the stream on channel failed', async ({ client, channel }) => { + let fail; + channel.on = vi.fn(async (name: string, callback) => { + if (name === 'failed') { + fail = callback; + } + }); + + const releaseSpy = vi.spyOn(client.channels, 'release'); + + const stream = new Stream('test', client, { channel: channel.name }); + + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + + fail({ reason: 'test' }); + await streamStatePromise(stream, StreamState.DISPOSED); + expect(releaseSpy).toHaveBeenCalledOnce(); + }); + + it.only('subscribes to messages', async ({ client, channel }) => { + let messages = new Subject(); + channel.subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream('test', client, { channel: channel.name }); + await streamStatePromise(stream, StreamState.READY); + + const subscriptionSpy = vi.fn(); + stream.subscribe(subscriptionSpy); + + for (let i = 0; i < 10; i++) { + messages.next(createMessage(i)); + } + + expect(subscriptionSpy).toHaveBeenCalledTimes(10); + for (let i = 0; i < 10; i++) { + expect(subscriptionSpy).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + it.only('subscribes with multiple listeners', async ({ client, channel }) => { + let messages = new Subject(); + channel.subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream('test', client, { channel: channel.name }); + await streamStatePromise(stream, StreamState.READY); + + const subscriptionSpy1 = vi.fn(); + stream.subscribe(subscriptionSpy1); + + const subscriptionSpy2 = vi.fn(); + stream.subscribe(subscriptionSpy2); + + for (let i = 0; i < 10; i++) { + messages.next(createMessage(i)); + } + + expect(subscriptionSpy1).toHaveBeenCalledTimes(10); + expect(subscriptionSpy2).toHaveBeenCalledTimes(10); + for (let i = 0; i < 10; i++) { + expect(subscriptionSpy1).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + expect(subscriptionSpy2).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + it.only('unsubscribes to messages', async ({ client, channel }) => { + let messages = new Subject(); + channel.subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream('test', client, { channel: channel.name }); + await streamStatePromise(stream, StreamState.READY); + + const subscriptionSpy = vi.fn(); + stream.subscribe(subscriptionSpy); + + for (let i = 0; i < 10; i++) { + if (i == 5) { + stream.unsubscribe(subscriptionSpy); + } + messages.next(createMessage(i)); + } + + expect(subscriptionSpy).toHaveBeenCalledTimes(5); + for (let i = 0; i < 5; i++) { + expect(subscriptionSpy).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + it.only('unsubscribes one of two listeners', async ({ client, channel }) => { + let messages = new Subject(); + channel.subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream('test', client, { channel: channel.name }); + await streamStatePromise(stream, StreamState.READY); + + const subscriptionSpy1 = vi.fn(); + stream.subscribe(subscriptionSpy1); + + const subscriptionSpy2 = vi.fn(); + stream.subscribe(subscriptionSpy2); + + for (let i = 0; i < 10; i++) { + if (i == 5) { + stream.unsubscribe(subscriptionSpy1); + } + messages.next(createMessage(i)); + } + + expect(subscriptionSpy1).toHaveBeenCalledTimes(5); + expect(subscriptionSpy2).toHaveBeenCalledTimes(10); + for (let i = 0; i < 10; i++) { + if (i < 5) { + expect(subscriptionSpy1).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + expect(subscriptionSpy2).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + // TODO discontinuity + // TODO reauth https://ably.com/docs/realtime/channels?lang=nodejs#fatal-errors +}); diff --git a/src/Stream.ts b/src/Stream.ts new file mode 100644 index 00000000..c46af330 --- /dev/null +++ b/src/Stream.ts @@ -0,0 +1,143 @@ +import { Types } from 'ably'; +import StreamOptions from './options/StreamOptions'; +import EventEmitter, { EventListener } from './utilities/EventEmitter'; +import { StandardCallback } from './types/callbacks'; + +const STREAM_OPTIONS_DEFAULTS = {}; + +export enum StreamState { + /** + * The stream has been initialized but no attach has yet been attempted. + */ + INITIALIZED = 'initialized', + /** + * The stream is attempting to establish a realtime connection and attach to the channel. + * The preparing state is entered as soon as the library has completed initialization, + * and is reentered each time connection is re-attempted following detachment or disconnection. + */ + PREPARING = 'preparing', + /** + * The stream has a realtime connection, is attached to the channel and is delivering messages. + */ + READY = 'ready', + /** + * The user has paused the stream. + */ + PAUSED = 'paused', + /** + * The stream has been disposed, either by the user disposing it or an unrecoverable error, + * and its resources are available for garbage collection. + */ + DISPOSED = 'disposed', +} + +type StreamStateChange = { + current: StreamState; + previous: StreamState; + reason?: Types.ErrorInfo | string; +}; + +type SubscriptionMessageEvent = { + message: Types.Message; + error?: never; +}; + +type SubscriptionErrorEvent = { + message?: never; + error: Types.ErrorInfo; +}; + +type SubscriptionEvent = SubscriptionMessageEvent | SubscriptionErrorEvent; + +type ListenerPair = { + message: EventListener; + error: EventListener; +}; + +class Stream extends EventEmitter> { + private options: StreamOptions; + private currentState: StreamState = StreamState.INITIALIZED; + private ablyChannel: Types.RealtimeChannelPromise; + private subscriptions = new EventEmitter(); + private subscriptionMap: Map, ListenerPair> = new Map(); + + constructor(readonly name: string, readonly ably: Types.RealtimePromise, options: StreamOptions) { + super(); + this.options = { ...STREAM_OPTIONS_DEFAULTS, ...options }; + this.ablyChannel = this.ably.channels.get(this.options.channel); + this.ablyChannel.on('failed', (change) => this.dispose(change.reason)); + this.init(); + } + + get state() { + return this.currentState; + } + + get channel() { + return this.options.channel; + } + + setState(state: StreamState, reason?: Types.ErrorInfo | string) { + const previous = this.currentState; + this.currentState = state; + this.emit(state, { + current: this.currentState, + previous, + reason, + } as StreamStateChange); + } + + async init() { + this.setState(StreamState.PREPARING); + await this.ably.connection.whenState('connected'); + await this.ablyChannel.subscribe(this.onMessage.bind(this)); + this.setState(StreamState.READY); + } + + async pause() { + this.setState(StreamState.PAUSED); + await this.ablyChannel.detach(); + } + + async resume() { + await this.ably.connection.whenState('connected'); + await this.ablyChannel.attach(); + this.setState(StreamState.READY); + } + + subscribe(callback: StandardCallback) { + if (this.currentState !== StreamState.READY) { + callback(new Error(`stream is not in ready state (state = ${this.currentState})`)); + return; + } + const listenerPair: ListenerPair = { + message: (message) => callback(null, message as T), + error: callback, + }; + this.subscriptions.on('message', listenerPair.message); + this.subscriptions.on('error', listenerPair.error); + this.subscriptionMap.set(callback, listenerPair); + } + + unsubscribe(callback: StandardCallback) { + const listeners = this.subscriptionMap.get(callback); + if (listeners) { + this.subscriptions.off('message', listeners.message); + this.subscriptions.off('error', listeners.error); + this.subscriptionMap.delete(callback); + } + } + + dispose(reason?: Types.ErrorInfo | string) { + this.setState(StreamState.DISPOSED, reason); + this.subscriptions.off('message'); + this.subscriptions.off('error'); + this.ably.channels.release(this.ablyChannel.name); + } + + onMessage(message: Types.Message) { + this.subscriptions.emit('message', message); + } +} + +export default Stream; diff --git a/src/options/EventStreamOptions.d.ts b/src/options/EventStreamOptions.d.ts deleted file mode 100644 index 6e4596b2..00000000 --- a/src/options/EventStreamOptions.d.ts +++ /dev/null @@ -1,5 +0,0 @@ -type EventStreamOptions = { - channel: string; -}; - -export default EventStreamOptions; diff --git a/src/options/ModelOptions.d.ts b/src/options/ModelOptions.d.ts index 15b851f6..6cf49315 100644 --- a/src/options/ModelOptions.d.ts +++ b/src/options/ModelOptions.d.ts @@ -1,7 +1,7 @@ -import EventStream from '../EventStream'; +import Stream from '../Stream'; type ModelOptions = { - streams: Array>; + streams: Array>; }; export default ModelOptions; diff --git a/src/options/StreamOptions.d.ts b/src/options/StreamOptions.d.ts new file mode 100644 index 00000000..35a1b9b0 --- /dev/null +++ b/src/options/StreamOptions.d.ts @@ -0,0 +1,5 @@ +type StreamOptions = { + channel: string; +}; + +export default StreamOptions; diff --git a/src/types/callbacks.d.ts b/src/types/callbacks.d.ts new file mode 100644 index 00000000..d62711ff --- /dev/null +++ b/src/types/callbacks.d.ts @@ -0,0 +1,2 @@ +export type StandardCallback = (err?: Error | null, result?: T) => void; +export type ErrCallback = (err?: Error | null) => void; diff --git a/src/utilities/test/messages.ts b/src/utilities/test/messages.ts new file mode 100644 index 00000000..b4424e1d --- /dev/null +++ b/src/utilities/test/messages.ts @@ -0,0 +1,34 @@ +import { Types } from 'ably/promises'; +import { Flags } from './protocol'; + +export const baseProtocolMessage = { + flags: Flags.PRESENCE | Flags.PUBLISH | Flags.SUBSCRIBE | Flags.PRESENCE_SUBSCRIBE, + id: 'PROTOCOL_MESSAGE_ID', + timestamp: 1, + count: 1, + connectionId: 'CONNECTION_ID', + channel: 'foobar', + channelSerial: 'CHANNEL_SERIAL', + msgSerial: 1, + connectionDetails: { + clientId: '', + connectionKey: 'randomKey', + maxMessageSize: 131000, + maxInboundRate: 1000, + maxOutboundRate: 1000, + maxFrameSize: 262144, + connectionStateTtl: 120000, + maxIdleInterval: 15000, + }, +}; + +export const baseMessage: Types.Message = { + id: '1', + data: null, + name: 'foo', + clientId: 'RND-CLIENTID', + connectionId: 'CONNECTION_ID', + encoding: 'utf-8', + extras: {}, + timestamp: 1, +}; diff --git a/src/utilities/test/mock-server-action-responses.ts b/src/utilities/test/mock-server-action-responses.ts index 704f06b3..bd92b035 100644 --- a/src/utilities/test/mock-server-action-responses.ts +++ b/src/utilities/test/mock-server-action-responses.ts @@ -1,21 +1,44 @@ +import { Actions } from './protocol'; +import { baseProtocolMessage } from './messages'; + const authAction = (override) => { return { - action: 4, - connectionId: 'CONNDESC', - connectionKey: 'CONNECTIONKEY', - connectionSerial: -1, - connectionDetails: { - clientId: 'RND-CLIENTID', - connectionKey: 'randomKey', - maxMessageSize: 131000, - maxInboundRate: 1000, - maxOutboundRate: 1000, - maxFrameSize: 262144, - connectionStateTtl: 120000, - maxIdleInterval: 15000, - }, + ...baseProtocolMessage, + action: Actions.CONNECTED, + ...override, + }; +}; + +const attachedAction = (override) => { + return { + ...baseProtocolMessage, + action: Actions.ATTACHED, + ...override, + }; +}; + +const detachedAction = (override) => { + return { + ...baseProtocolMessage, + action: Actions.DETACHED, + ...override, + }; +}; + +const ackAction = (override) => { + return { + ...baseProtocolMessage, + action: Actions.ACK, + ...override, + }; +}; + +const messageAction = (override) => { + return { + ...baseProtocolMessage, + action: Actions.MESSAGE, ...override, }; }; -export { authAction }; +export { authAction, attachedAction, detachedAction, ackAction, messageAction }; diff --git a/src/utilities/test/protocol.ts b/src/utilities/test/protocol.ts new file mode 100644 index 00000000..9b387d6f --- /dev/null +++ b/src/utilities/test/protocol.ts @@ -0,0 +1,43 @@ +/** + * AblyJS ProtocolMessage actions + * {@link https://github.com/ably/ably-js/blob/main/src/common/lib/types/protocolmessage.ts#L7} + */ +export enum Actions { + HEARTBEAT = 0, + ACK, + NACK, + CONNECT, + CONNECTED, + DISCONNECT, + DISCONNECTED, + CLOSE, + CLOSED, + ERROR, + ATTACH, + ATTACHED, + DETACH, + DETACHED, + PRESENCE, + MESSAGE, + SYNC, + AUTH, + ACTIVATE, +} + +/** + * AblyJS ProtocolMessage flags + * {@link https://github.com/ably/ably-js/blob/main/src/common/lib/types/protocolmessage.ts#L34} + */ +export enum Flags { + /* Channel attach state flags */ + HAS_PRESENCE = 1 << 0, + HAS_BACKLOG = 1 << 1, + RESUMED = 1 << 2, + TRANSIENT = 1 << 4, + ATTACH_RESUME = 1 << 5, + /* Channel mode flags */ + PRESENCE = 1 << 16, + PUBLISH = 1 << 17, + SUBSCRIBE = 1 << 18, + PRESENCE_SUBSCRIBE = 1 << 19, +}