diff --git a/__mocks__/ably/promises/index.ts b/__mocks__/ably/promises/index.ts index cfaa5706..de47175d 100644 --- a/__mocks__/ably/promises/index.ts +++ b/__mocks__/ably/promises/index.ts @@ -1,69 +1,33 @@ import { Types } from 'ably/promises'; -const MOCK_CLIENT_ID = 'MOCK_CLIENT_ID'; +const mockPromiseErrorNotImplemented = (name: string): Promise => new Promise((_, reject) => reject(new Error(`mock '${name}' not implemented`))); +const mockNotImplemented = () => { throw new Error('not implemented') }; -const mockPromisify = (expectedReturnValue): Promise => new Promise((resolve) => resolve(expectedReturnValue)); -const methodReturningVoidPromise = () => mockPromisify((() => {})()); +type MockChannel = Partial; -const mockPresence = { - get: () => mockPromisify([]), - update: () => mockPromisify(undefined), - enter: methodReturningVoidPromise, - leave: methodReturningVoidPromise, - subscriptions: { - once: async (_, fn) => { - return await fn(); - }, - }, - subscribe: () => {}, -}; +const mockChannel: MockChannel = { + on: mockNotImplemented, + attach: () => mockPromiseErrorNotImplemented('attach'), + detach: () => mockPromiseErrorNotImplemented('detach'), + subscribe: () => mockPromiseErrorNotImplemented('subscribe'), +} -const mockHistory = { - items: [], - first: () => mockPromisify(mockHistory), - next: () => mockPromisify(mockHistory), - current: () => mockPromisify(mockHistory), - hasNext: () => false, - isLast: () => true, -}; +type MockChannels = Partial>; -const mockEmitter = { - any: [], - events: {}, - anyOnce: [], - eventsOnce: {}, -}; +const mockChannels: MockChannels = { + get: () => mockChannel, + release: () => {}, +} -const mockChannel = { - presence: mockPresence, - history: () => mockHistory, - subscribe: () => {}, - publish: () => {}, - subscriptions: mockEmitter, -}; +type MockConnection = Partial; -class MockRealtime { - public channels: { - get: () => typeof mockChannel; - }; - public auth: { - clientId: string; - }; - public connection: { - id?: string; - }; +const mockConnection: MockConnection = { + whenState: () => mockPromiseErrorNotImplemented('whenState') +} - constructor() { - this.channels = { - get: () => mockChannel, - }; - this.auth = { - clientId: MOCK_CLIENT_ID, - }; - this.connection = { - id: '1', - }; - } +class MockRealtime { + public channels = mockChannels; + public connection = mockConnection; } export { MockRealtime as Realtime }; diff --git a/package-lock.json b/package-lock.json index 5499071d..aa795599 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,8 @@ "version": "0.0.1", "license": "ISC", "dependencies": { - "ably": "^1.2.39" + "ably": "^1.2.39", + "rxjs": "^7.8.1" }, "devDependencies": { "@typescript-eslint/eslint-plugin": "^5.51.0", @@ -2112,6 +2113,18 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/globals/node_modules/type-fest": { + "version": "0.20.2", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", + "integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==", + "dev": true, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/globby": { "version": "11.1.0", "resolved": "https://registry.npmjs.org/globby/-/globby-11.1.0.tgz", @@ -3093,6 +3106,19 @@ "queue-microtask": "^1.2.2" } }, + "node_modules/rxjs": { + "version": "7.8.1", + "resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.8.1.tgz", + "integrity": "sha512-AA3TVj+0A2iuIoQkWEK/tqFjBq2j+6PO6Y0zJcvzLAFhEFIO3HL0vls9hWLncZbAAbK0mar7oZ4V079I/qPMxg==", + "dependencies": { + "tslib": "^2.1.0" + } + }, + "node_modules/rxjs/node_modules/tslib": { + "version": "2.5.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.5.2.tgz", + "integrity": "sha512-5svOrSA2w3iGFDs1HibEVBGbDrAY82bFQ3HZ3ixB+88nsbsWQoKqDRb5UBYAUPEzbBn6dAp5gRNXglySbx1MlA==" + }, "node_modules/safe-regex": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/safe-regex/-/safe-regex-2.1.1.tgz", @@ -3438,18 +3464,6 @@ "node": ">=4" } }, - "node_modules/type-fest": { - "version": "0.20.2", - "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", - "integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==", - "dev": true, - "engines": { - "node": ">=10" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "node_modules/typescript": { "version": "4.9.5", "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", @@ -5204,6 +5218,14 @@ "dev": true, "requires": { "type-fest": "^0.20.2" + }, + "dependencies": { + "type-fest": { + "version": "0.20.2", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", + "integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==", + "dev": true + } } }, "globby": { @@ -5889,6 +5911,21 @@ "queue-microtask": "^1.2.2" } }, + "rxjs": { + "version": "7.8.1", + "resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.8.1.tgz", + "integrity": "sha512-AA3TVj+0A2iuIoQkWEK/tqFjBq2j+6PO6Y0zJcvzLAFhEFIO3HL0vls9hWLncZbAAbK0mar7oZ4V079I/qPMxg==", + "requires": { + "tslib": "^2.1.0" + }, + "dependencies": { + "tslib": { + "version": "2.5.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.5.2.tgz", + "integrity": "sha512-5svOrSA2w3iGFDs1HibEVBGbDrAY82bFQ3HZ3ixB+88nsbsWQoKqDRb5UBYAUPEzbBn6dAp5gRNXglySbx1MlA==" + } + } + }, "safe-regex": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/safe-regex/-/safe-regex-2.1.1.tgz", @@ -6149,12 +6186,6 @@ "integrity": "sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==", "dev": true }, - "type-fest": { - "version": "0.20.2", - "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", - "integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==", - "dev": true - }, "typescript": { "version": "4.9.5", "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", diff --git a/package.json b/package.json index 3eab4c33..e0046f9e 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "vitest": "^0.29.8" }, "dependencies": { - "ably": "^1.2.39" + "ably": "^1.2.39", + "rxjs": "^7.8.1" } } diff --git a/src/EventStream.test.ts b/src/EventStream.test.ts deleted file mode 100644 index 4828a995..00000000 --- a/src/EventStream.test.ts +++ /dev/null @@ -1,37 +0,0 @@ -import { it, describe, expect, expectTypeOf, beforeEach, afterEach } from 'vitest'; -import { Realtime, Types } from 'ably/promises'; -import { WebSocket } from 'mock-socket'; - -import EventStream from './EventStream'; - -import Server from './utilities/test/mock-server.js'; -import defaultClientConfig from './utilities/test/default-client-config.js'; - -interface EventStreamTestContext { - client: Types.RealtimePromise; - server: Server; -} - -describe('EventStream', () => { - beforeEach((context) => { - (Realtime as any).Platform.Config.WebSocket = WebSocket; - context.server = new Server('wss://realtime.ably.io/'); - context.client = new Realtime(defaultClientConfig); - }); - - afterEach((context) => { - context.server.stop(); - }); - - it('connects successfully with the Ably Client', async ({ client, server }) => { - server.start(); - const connectSuccess = await client.connection.whenState('connected'); - expect(connectSuccess.current).toBe('connected'); - }); - - it('expects the injected client to be of the type RealtimePromise', ({ client }) => { - const eventStream = new EventStream('test', client, { channel: 'foobar' }); - expect(eventStream.name).toEqual('test'); - expectTypeOf(eventStream.client).toMatchTypeOf(); - }); -}); diff --git a/src/EventStream.ts b/src/EventStream.ts deleted file mode 100644 index 1e864d8e..00000000 --- a/src/EventStream.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { Types } from 'ably'; -import EventStreamOptions from './options/EventStreamOptions'; -import EventEmitter from './utilities/EventEmitter'; - -const EVENT_STREAM_OPTIONS_DEFAULTS = {}; - -enum EventStreamState { - /** - * The event stream has been initialized but no attach has yet been attempted. - */ - INITIALIZED = 'initialized', - /** - * This state is entered if the event stream encounters a failure condition that it cannot recover from. - */ - FAILED = 'failed', -} - -class EventStream extends EventEmitter { - private options: EventStreamOptions; - private connectionId?: string; - private state: EventStreamState = EventStreamState.INITIALIZED; - private data: T; - - constructor(readonly name: string, readonly client: Types.RealtimePromise, options: EventStreamOptions) { - super(); - this.options = { ...EVENT_STREAM_OPTIONS_DEFAULTS, ...options }; - this.connectionId = this.client.connection.id; - } -} - -export default EventStream; diff --git a/src/Model.test.ts b/src/Model.test.ts index 7e43fdf1..0c9e67ca 100644 --- a/src/Model.test.ts +++ b/src/Model.test.ts @@ -3,7 +3,7 @@ import { Realtime, Types } from 'ably/promises'; import { WebSocket } from 'mock-socket'; import Model from './Model'; -import EventStream from './EventStream'; +import Stream from './Stream'; import Server from './utilities/test/mock-server.js'; import defaultClientConfig from './utilities/test/default-client-config.js'; @@ -38,7 +38,7 @@ describe('Model', () => { it('expects model to be instantiated with the provided event streams', ({ client }) => { const model = new Model('test', client, { - streams: [new EventStream('s1', client, { channel: 's1' }), new EventStream('s2', client, { channel: 's2' })], + streams: [new Stream('s1', client, { channel: 's1' }), new Stream('s2', client, { channel: 's2' })], }); expect(model.name).toEqual('test'); expect(model.stream('s1')).toBeTruthy(); diff --git a/src/Model.ts b/src/Model.ts index 8ff00676..0af866a8 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -1,5 +1,5 @@ import { Types } from 'ably'; -import EventStream from './EventStream'; +import Stream from './Stream'; import ModelOptions from './options/ModelOptions'; import EventEmitter from './utilities/EventEmitter'; @@ -16,7 +16,7 @@ enum ModelState { class Model extends EventEmitter { private state: ModelState = ModelState.INITIALIZED; - private streams: Record> = {}; + private streams: Record> = {}; private data: T; constructor(readonly name: string, readonly client: Types.RealtimePromise, options?: ModelOptions) { @@ -28,7 +28,7 @@ class Model extends EventEmitter { } } - stream(name: string): EventStream { + stream(name: string): Stream { if (!this.streams[name]) { throw new Error(`stream with name '${name}' not registered on model '${this.name}'`); } diff --git a/src/Models.test.ts b/src/Models.test.ts index 406f87af..b36d830f 100644 --- a/src/Models.test.ts +++ b/src/Models.test.ts @@ -84,25 +84,25 @@ describe('Models', () => { it('creates an event stream that inherits the root class ably client', () => { const models = new Models({ ...defaultClientConfig }); - const eventStream = models.EventStream('test', { channel: 'foobar' }); - expect(eventStream.name).toEqual('test'); - expect(eventStream.client['options']).toContain(defaultClientConfig); + const stream = models.Stream('test', { channel: 'foobar' }); + expect(stream.name).toEqual('test'); + expect(stream.ably['options']).toContain(defaultClientConfig); }); it('getting an event stream without options throws', () => { const models = new Models({ ...defaultClientConfig }); - expect(() => models.EventStream('test')).toThrow('EventStream cannot be instantiated without options'); + expect(() => models.Stream('test')).toThrow('Stream cannot be instantiated without options'); }); it('getting an event stream with the same name returns the same instance', () => { const models = new Models({ ...defaultClientConfig }); - const eventStream1 = models.EventStream('test', { channel: 'foobar' }); // first call requires options to instantiate - expect(eventStream1.name).toEqual('test'); - const eventStream2 = models.EventStream('test'); // subsequent calls do not require options - expect(eventStream2.name).toEqual('test'); - expect(eventStream1).toEqual(eventStream2); - const eventStream3 = models.EventStream('test', { channel: 'barbaz' }); // providing options to subsequent calls is allowed but ignored - expect(eventStream3.name).toEqual('test'); - expect(eventStream1).toEqual(eventStream3); + const stream1 = models.Stream('test', { channel: 'foobar' }); // first call requires options to instantiate + expect(stream1.name).toEqual('test'); + const stream2 = models.Stream('test'); // subsequent calls do not require options + expect(stream2.name).toEqual('test'); + expect(stream1).toEqual(stream2); + const stream3 = models.Stream('test', { channel: 'barbaz' }); // providing options to subsequent calls is allowed but ignored + expect(stream3.name).toEqual('test'); + expect(stream1).toEqual(stream3); }); }); diff --git a/src/Models.ts b/src/Models.ts index ba90dbe6..864923c8 100644 --- a/src/Models.ts +++ b/src/Models.ts @@ -1,19 +1,19 @@ import { Types, Realtime } from 'ably'; import ModelOptions from './options/ModelOptions'; import Model from './Model'; -import EventStreamOptions from './options/EventStreamOptions'; -import EventStream from './EventStream'; +import StreamOptions from './options/StreamOptions'; +import Stream from './Stream'; class Models { private models: Record>; - private eventStreams: Record>; + private streams: Record>; ably: Types.RealtimePromise; readonly version = '0.0.1'; constructor(optionsOrAbly: Types.RealtimePromise | Types.ClientOptions | string) { this.models = {}; - this.eventStreams = {}; + this.streams = {}; if (optionsOrAbly['options']) { this.ably = optionsOrAbly as Types.RealtimePromise; this.addAgent(this.ably['options'], false); @@ -48,21 +48,21 @@ class Models { return model; }; - EventStream = (name: string, options?: EventStreamOptions) => { + Stream = (name: string, options?: StreamOptions) => { if (typeof name !== 'string' || name.length === 0) { - throw new Error('EventStream must have a non-empty name'); + throw new Error('Stream must have a non-empty name'); } - if (this.eventStreams[name]) return this.eventStreams[name]; + if (this.streams[name]) return this.streams[name]; if (!options) { - throw new Error('EventStream cannot be instantiated without options'); + throw new Error('Stream cannot be instantiated without options'); } - const eventStream = new EventStream(name, this.ably, options); - this.eventStreams[name] = eventStream; + const stream = new Stream(name, this.ably, options); + this.streams[name] = stream; - return eventStream; + return stream; }; } diff --git a/src/Stream.test.ts b/src/Stream.test.ts new file mode 100644 index 00000000..3f17826f --- /dev/null +++ b/src/Stream.test.ts @@ -0,0 +1,251 @@ +import { it, describe, expect, beforeEach, afterEach, vi } from 'vitest'; +import { Realtime, Types } from 'ably/promises'; +import { Subject } from 'rxjs'; + +import { baseMessage } from './utilities/test/messages'; +import Stream, { StreamState } from './Stream'; + +vi.mock('ably/promises'); + +interface StreamTestContext { + client: Types.RealtimePromise; + channel: Types.RealtimeChannelPromise; +} + +interface TestEvent {} + +const streamStatePromise = (stream: Stream, state: StreamState) => + new Promise((resolve) => stream.whenState(state, stream.state, resolve)); + +function createMessage(i: number): Types.Message { + return { + ...baseMessage, + id: `id_${i}`, + name: `name_${i}`, + data: `data_${i}`, + }; +} + +describe('Stream', () => { + beforeEach((context) => { + const client = new Realtime({}); + client.connection.whenState = vi.fn<[Types.ConnectionState], Promise>(async () => { + return { + current: 'connected', + previous: 'initialized', + }; + }); + + const channel = client.channels.get('foobar'); + channel.on = vi.fn(); // all tests call `channel.on('fail')` + + context.client = client; + context.channel = channel; + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it.only('enters ready state when successfully attached to the channel', async ({ + client, + channel, + }) => { + // the promise returned by the subscribe method resolves when we have successfully attached to the channel + let attach; + const attachment = new Promise((resolve) => (attach = resolve)); + channel.subscribe = vi.fn().mockImplementation(async () => { + await attachment; + }); + + const stream = new Stream('test', client, { channel: 'foobar' }); + + await streamStatePromise(stream, StreamState.PREPARING); + attach(); + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + it.only('pauses the stream', async ({ client, channel }) => { + channel.subscribe = vi.fn(); + channel.detach = vi.fn(); + + const stream = new Stream('test', client, { channel: channel.name }); + + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + + stream.pause(); + await streamStatePromise(stream, StreamState.PAUSED); + expect(channel.detach).toHaveBeenCalledOnce(); + }); + + it.only('resumes the stream', async ({ client, channel }) => { + channel.subscribe = vi.fn(); + channel.detach = vi.fn(); + channel.attach = vi.fn(); + + const stream = new Stream('test', client, { channel: channel.name }); + + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + + stream.pause(); + await streamStatePromise(stream, StreamState.PAUSED); + expect(channel.detach).toHaveBeenCalledOnce(); + + stream.resume(); + await streamStatePromise(stream, StreamState.READY); + expect(channel.attach).toHaveBeenCalledOnce(); + }); + + it.only('disposes of the stream', async ({ client, channel }) => { + const releaseSpy = vi.spyOn(client.channels, 'release'); + + const stream = new Stream('test', client, { channel: channel.name }); + + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + + stream.dispose(); + await streamStatePromise(stream, StreamState.DISPOSED); + expect(releaseSpy).toHaveBeenCalledOnce(); + }); + + it.only('disposes of the stream on channel failed', async ({ client, channel }) => { + let fail; + channel.on = vi.fn(async (name: string, callback) => { + if (name === 'failed') { + fail = callback; + } + }); + + const releaseSpy = vi.spyOn(client.channels, 'release'); + + const stream = new Stream('test', client, { channel: channel.name }); + + await streamStatePromise(stream, StreamState.READY); + expect(channel.subscribe).toHaveBeenCalledOnce(); + + fail({ reason: 'test' }); + await streamStatePromise(stream, StreamState.DISPOSED); + expect(releaseSpy).toHaveBeenCalledOnce(); + }); + + it.only('subscribes to messages', async ({ client, channel }) => { + let messages = new Subject(); + channel.subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream('test', client, { channel: channel.name }); + await streamStatePromise(stream, StreamState.READY); + + const subscriptionSpy = vi.fn(); + stream.subscribe(subscriptionSpy); + + for (let i = 0; i < 10; i++) { + messages.next(createMessage(i)); + } + + expect(subscriptionSpy).toHaveBeenCalledTimes(10); + for (let i = 0; i < 10; i++) { + expect(subscriptionSpy).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + it.only('subscribes with multiple listeners', async ({ client, channel }) => { + let messages = new Subject(); + channel.subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream('test', client, { channel: channel.name }); + await streamStatePromise(stream, StreamState.READY); + + const subscriptionSpy1 = vi.fn(); + stream.subscribe(subscriptionSpy1); + + const subscriptionSpy2 = vi.fn(); + stream.subscribe(subscriptionSpy2); + + for (let i = 0; i < 10; i++) { + messages.next(createMessage(i)); + } + + expect(subscriptionSpy1).toHaveBeenCalledTimes(10); + expect(subscriptionSpy2).toHaveBeenCalledTimes(10); + for (let i = 0; i < 10; i++) { + expect(subscriptionSpy1).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + expect(subscriptionSpy2).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + it.only('unsubscribes to messages', async ({ client, channel }) => { + let messages = new Subject(); + channel.subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream('test', client, { channel: channel.name }); + await streamStatePromise(stream, StreamState.READY); + + const subscriptionSpy = vi.fn(); + stream.subscribe(subscriptionSpy); + + for (let i = 0; i < 10; i++) { + if (i == 5) { + stream.unsubscribe(subscriptionSpy); + } + messages.next(createMessage(i)); + } + + expect(subscriptionSpy).toHaveBeenCalledTimes(5); + for (let i = 0; i < 5; i++) { + expect(subscriptionSpy).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + it.only('unsubscribes one of two listeners', async ({ client, channel }) => { + let messages = new Subject(); + channel.subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream('test', client, { channel: channel.name }); + await streamStatePromise(stream, StreamState.READY); + + const subscriptionSpy1 = vi.fn(); + stream.subscribe(subscriptionSpy1); + + const subscriptionSpy2 = vi.fn(); + stream.subscribe(subscriptionSpy2); + + for (let i = 0; i < 10; i++) { + if (i == 5) { + stream.unsubscribe(subscriptionSpy1); + } + messages.next(createMessage(i)); + } + + expect(subscriptionSpy1).toHaveBeenCalledTimes(5); + expect(subscriptionSpy2).toHaveBeenCalledTimes(10); + for (let i = 0; i < 10; i++) { + if (i < 5) { + expect(subscriptionSpy1).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + expect(subscriptionSpy2).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(channel.subscribe).toHaveBeenCalledOnce(); + }); + + // TODO discontinuity + // TODO reauth https://ably.com/docs/realtime/channels?lang=nodejs#fatal-errors +}); diff --git a/src/Stream.ts b/src/Stream.ts new file mode 100644 index 00000000..c46af330 --- /dev/null +++ b/src/Stream.ts @@ -0,0 +1,143 @@ +import { Types } from 'ably'; +import StreamOptions from './options/StreamOptions'; +import EventEmitter, { EventListener } from './utilities/EventEmitter'; +import { StandardCallback } from './types/callbacks'; + +const STREAM_OPTIONS_DEFAULTS = {}; + +export enum StreamState { + /** + * The stream has been initialized but no attach has yet been attempted. + */ + INITIALIZED = 'initialized', + /** + * The stream is attempting to establish a realtime connection and attach to the channel. + * The preparing state is entered as soon as the library has completed initialization, + * and is reentered each time connection is re-attempted following detachment or disconnection. + */ + PREPARING = 'preparing', + /** + * The stream has a realtime connection, is attached to the channel and is delivering messages. + */ + READY = 'ready', + /** + * The user has paused the stream. + */ + PAUSED = 'paused', + /** + * The stream has been disposed, either by the user disposing it or an unrecoverable error, + * and its resources are available for garbage collection. + */ + DISPOSED = 'disposed', +} + +type StreamStateChange = { + current: StreamState; + previous: StreamState; + reason?: Types.ErrorInfo | string; +}; + +type SubscriptionMessageEvent = { + message: Types.Message; + error?: never; +}; + +type SubscriptionErrorEvent = { + message?: never; + error: Types.ErrorInfo; +}; + +type SubscriptionEvent = SubscriptionMessageEvent | SubscriptionErrorEvent; + +type ListenerPair = { + message: EventListener; + error: EventListener; +}; + +class Stream extends EventEmitter> { + private options: StreamOptions; + private currentState: StreamState = StreamState.INITIALIZED; + private ablyChannel: Types.RealtimeChannelPromise; + private subscriptions = new EventEmitter(); + private subscriptionMap: Map, ListenerPair> = new Map(); + + constructor(readonly name: string, readonly ably: Types.RealtimePromise, options: StreamOptions) { + super(); + this.options = { ...STREAM_OPTIONS_DEFAULTS, ...options }; + this.ablyChannel = this.ably.channels.get(this.options.channel); + this.ablyChannel.on('failed', (change) => this.dispose(change.reason)); + this.init(); + } + + get state() { + return this.currentState; + } + + get channel() { + return this.options.channel; + } + + setState(state: StreamState, reason?: Types.ErrorInfo | string) { + const previous = this.currentState; + this.currentState = state; + this.emit(state, { + current: this.currentState, + previous, + reason, + } as StreamStateChange); + } + + async init() { + this.setState(StreamState.PREPARING); + await this.ably.connection.whenState('connected'); + await this.ablyChannel.subscribe(this.onMessage.bind(this)); + this.setState(StreamState.READY); + } + + async pause() { + this.setState(StreamState.PAUSED); + await this.ablyChannel.detach(); + } + + async resume() { + await this.ably.connection.whenState('connected'); + await this.ablyChannel.attach(); + this.setState(StreamState.READY); + } + + subscribe(callback: StandardCallback) { + if (this.currentState !== StreamState.READY) { + callback(new Error(`stream is not in ready state (state = ${this.currentState})`)); + return; + } + const listenerPair: ListenerPair = { + message: (message) => callback(null, message as T), + error: callback, + }; + this.subscriptions.on('message', listenerPair.message); + this.subscriptions.on('error', listenerPair.error); + this.subscriptionMap.set(callback, listenerPair); + } + + unsubscribe(callback: StandardCallback) { + const listeners = this.subscriptionMap.get(callback); + if (listeners) { + this.subscriptions.off('message', listeners.message); + this.subscriptions.off('error', listeners.error); + this.subscriptionMap.delete(callback); + } + } + + dispose(reason?: Types.ErrorInfo | string) { + this.setState(StreamState.DISPOSED, reason); + this.subscriptions.off('message'); + this.subscriptions.off('error'); + this.ably.channels.release(this.ablyChannel.name); + } + + onMessage(message: Types.Message) { + this.subscriptions.emit('message', message); + } +} + +export default Stream; diff --git a/src/options/EventStreamOptions.d.ts b/src/options/EventStreamOptions.d.ts deleted file mode 100644 index 6e4596b2..00000000 --- a/src/options/EventStreamOptions.d.ts +++ /dev/null @@ -1,5 +0,0 @@ -type EventStreamOptions = { - channel: string; -}; - -export default EventStreamOptions; diff --git a/src/options/ModelOptions.d.ts b/src/options/ModelOptions.d.ts index 15b851f6..6cf49315 100644 --- a/src/options/ModelOptions.d.ts +++ b/src/options/ModelOptions.d.ts @@ -1,7 +1,7 @@ -import EventStream from '../EventStream'; +import Stream from '../Stream'; type ModelOptions = { - streams: Array>; + streams: Array>; }; export default ModelOptions; diff --git a/src/options/StreamOptions.d.ts b/src/options/StreamOptions.d.ts new file mode 100644 index 00000000..35a1b9b0 --- /dev/null +++ b/src/options/StreamOptions.d.ts @@ -0,0 +1,5 @@ +type StreamOptions = { + channel: string; +}; + +export default StreamOptions; diff --git a/src/types/callbacks.d.ts b/src/types/callbacks.d.ts new file mode 100644 index 00000000..d62711ff --- /dev/null +++ b/src/types/callbacks.d.ts @@ -0,0 +1,2 @@ +export type StandardCallback = (err?: Error | null, result?: T) => void; +export type ErrCallback = (err?: Error | null) => void; diff --git a/src/utilities/test/messages.ts b/src/utilities/test/messages.ts new file mode 100644 index 00000000..b4424e1d --- /dev/null +++ b/src/utilities/test/messages.ts @@ -0,0 +1,34 @@ +import { Types } from 'ably/promises'; +import { Flags } from './protocol'; + +export const baseProtocolMessage = { + flags: Flags.PRESENCE | Flags.PUBLISH | Flags.SUBSCRIBE | Flags.PRESENCE_SUBSCRIBE, + id: 'PROTOCOL_MESSAGE_ID', + timestamp: 1, + count: 1, + connectionId: 'CONNECTION_ID', + channel: 'foobar', + channelSerial: 'CHANNEL_SERIAL', + msgSerial: 1, + connectionDetails: { + clientId: '', + connectionKey: 'randomKey', + maxMessageSize: 131000, + maxInboundRate: 1000, + maxOutboundRate: 1000, + maxFrameSize: 262144, + connectionStateTtl: 120000, + maxIdleInterval: 15000, + }, +}; + +export const baseMessage: Types.Message = { + id: '1', + data: null, + name: 'foo', + clientId: 'RND-CLIENTID', + connectionId: 'CONNECTION_ID', + encoding: 'utf-8', + extras: {}, + timestamp: 1, +}; diff --git a/src/utilities/test/mock-server-action-responses.ts b/src/utilities/test/mock-server-action-responses.ts index 704f06b3..bd92b035 100644 --- a/src/utilities/test/mock-server-action-responses.ts +++ b/src/utilities/test/mock-server-action-responses.ts @@ -1,21 +1,44 @@ +import { Actions } from './protocol'; +import { baseProtocolMessage } from './messages'; + const authAction = (override) => { return { - action: 4, - connectionId: 'CONNDESC', - connectionKey: 'CONNECTIONKEY', - connectionSerial: -1, - connectionDetails: { - clientId: 'RND-CLIENTID', - connectionKey: 'randomKey', - maxMessageSize: 131000, - maxInboundRate: 1000, - maxOutboundRate: 1000, - maxFrameSize: 262144, - connectionStateTtl: 120000, - maxIdleInterval: 15000, - }, + ...baseProtocolMessage, + action: Actions.CONNECTED, + ...override, + }; +}; + +const attachedAction = (override) => { + return { + ...baseProtocolMessage, + action: Actions.ATTACHED, + ...override, + }; +}; + +const detachedAction = (override) => { + return { + ...baseProtocolMessage, + action: Actions.DETACHED, + ...override, + }; +}; + +const ackAction = (override) => { + return { + ...baseProtocolMessage, + action: Actions.ACK, + ...override, + }; +}; + +const messageAction = (override) => { + return { + ...baseProtocolMessage, + action: Actions.MESSAGE, ...override, }; }; -export { authAction }; +export { authAction, attachedAction, detachedAction, ackAction, messageAction }; diff --git a/src/utilities/test/protocol.ts b/src/utilities/test/protocol.ts new file mode 100644 index 00000000..9b387d6f --- /dev/null +++ b/src/utilities/test/protocol.ts @@ -0,0 +1,43 @@ +/** + * AblyJS ProtocolMessage actions + * {@link https://github.com/ably/ably-js/blob/main/src/common/lib/types/protocolmessage.ts#L7} + */ +export enum Actions { + HEARTBEAT = 0, + ACK, + NACK, + CONNECT, + CONNECTED, + DISCONNECT, + DISCONNECTED, + CLOSE, + CLOSED, + ERROR, + ATTACH, + ATTACHED, + DETACH, + DETACHED, + PRESENCE, + MESSAGE, + SYNC, + AUTH, + ACTIVATE, +} + +/** + * AblyJS ProtocolMessage flags + * {@link https://github.com/ably/ably-js/blob/main/src/common/lib/types/protocolmessage.ts#L34} + */ +export enum Flags { + /* Channel attach state flags */ + HAS_PRESENCE = 1 << 0, + HAS_BACKLOG = 1 << 1, + RESUMED = 1 << 2, + TRANSIENT = 1 << 4, + ATTACH_RESUME = 1 << 5, + /* Channel mode flags */ + PRESENCE = 1 << 16, + PUBLISH = 1 << 17, + SUBSCRIBE = 1 << 18, + PRESENCE_SUBSCRIBE = 1 << 19, +}