Skip to content

Commit

Permalink
stream: add simple stream implementation
Browse files Browse the repository at this point in the history
This initial Stream implementation adds a very simple wrapper around
an Ably channel. Messages received on the channel are surfaced to the
caller via a subscription callback.

The Stream is an EventEmitter and exposes its state via events.
Additionally the stream can be pauses, resumed and disposed. This
will allow the Models SDK to manipulate the stream/channel according to
the model's lifecycle.

While not yet particularly useful, the idea is that the Stream will
handle:
- message de-duplication
- message re-ordering
- detect discontinuities (due to connection state recovery as well as
  due to failed connector publishes)
  • Loading branch information
mschristensen committed Jun 5, 2023
1 parent a90688f commit fc940a4
Show file tree
Hide file tree
Showing 18 changed files with 619 additions and 195 deletions.
78 changes: 21 additions & 57 deletions __mocks__/ably/promises/index.ts
Original file line number Diff line number Diff line change
@@ -1,69 +1,33 @@
import { Types } from 'ably/promises';

const MOCK_CLIENT_ID = 'MOCK_CLIENT_ID';
const mockPromiseErrorNotImplemented = <T>(name: string): Promise<T> => new Promise((_, reject) => reject(new Error(`mock '${name}' not implemented`)));
const mockNotImplemented = () => { throw new Error('not implemented') };

const mockPromisify = <T>(expectedReturnValue): Promise<T> => new Promise((resolve) => resolve(expectedReturnValue));
const methodReturningVoidPromise = () => mockPromisify<void>((() => {})());
type MockChannel = Partial<Types.RealtimeChannelPromise>;

const mockPresence = {
get: () => mockPromisify<Types.PresenceMessage[]>([]),
update: () => mockPromisify<void>(undefined),
enter: methodReturningVoidPromise,
leave: methodReturningVoidPromise,
subscriptions: {
once: async (_, fn) => {
return await fn();
},
},
subscribe: () => {},
};
const mockChannel: MockChannel = {
on: mockNotImplemented,
attach: () => mockPromiseErrorNotImplemented<void>('attach'),
detach: () => mockPromiseErrorNotImplemented<void>('detach'),
subscribe: () => mockPromiseErrorNotImplemented<void>('subscribe'),
}

const mockHistory = {
items: [],
first: () => mockPromisify(mockHistory),
next: () => mockPromisify(mockHistory),
current: () => mockPromisify(mockHistory),
hasNext: () => false,
isLast: () => true,
};
type MockChannels = Partial<Types.Channels<MockChannel>>;

const mockEmitter = {
any: [],
events: {},
anyOnce: [],
eventsOnce: {},
};
const mockChannels: MockChannels = {
get: () => mockChannel,
release: () => {},
}

const mockChannel = {
presence: mockPresence,
history: () => mockHistory,
subscribe: () => {},
publish: () => {},
subscriptions: mockEmitter,
};
type MockConnection = Partial<Types.ConnectionPromise>;

class MockRealtime {
public channels: {
get: () => typeof mockChannel;
};
public auth: {
clientId: string;
};
public connection: {
id?: string;
};
const mockConnection: MockConnection = {
whenState: () => mockPromiseErrorNotImplemented<Types.ConnectionStateChange>('whenState')
}

constructor() {
this.channels = {
get: () => mockChannel,
};
this.auth = {
clientId: MOCK_CLIENT_ID,
};
this.connection = {
id: '1',
};
}
class MockRealtime {
public channels = mockChannels;
public connection = mockConnection;
}

export { MockRealtime as Realtime };
69 changes: 50 additions & 19 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"vitest": "^0.29.8"
},
"dependencies": {
"ably": "^1.2.39"
"ably": "^1.2.39",
"rxjs": "^7.8.1"
}
}
37 changes: 0 additions & 37 deletions src/EventStream.test.ts

This file was deleted.

31 changes: 0 additions & 31 deletions src/EventStream.ts

This file was deleted.

4 changes: 2 additions & 2 deletions src/Model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Realtime, Types } from 'ably/promises';
import { WebSocket } from 'mock-socket';

import Model from './Model';
import EventStream from './EventStream';
import Stream from './Stream';

import Server from './utilities/test/mock-server.js';
import defaultClientConfig from './utilities/test/default-client-config.js';
Expand Down Expand Up @@ -38,7 +38,7 @@ describe('Model', () => {

it<ModelTestContext>('expects model to be instantiated with the provided event streams', ({ client }) => {
const model = new Model('test', client, {
streams: [new EventStream('s1', client, { channel: 's1' }), new EventStream('s2', client, { channel: 's2' })],
streams: [new Stream('s1', client, { channel: 's1' }), new Stream('s2', client, { channel: 's2' })],
});
expect(model.name).toEqual('test');
expect(model.stream('s1')).toBeTruthy();
Expand Down
6 changes: 3 additions & 3 deletions src/Model.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Types } from 'ably';
import EventStream from './EventStream';
import Stream from './Stream';
import ModelOptions from './options/ModelOptions';
import EventEmitter from './utilities/EventEmitter';

Expand All @@ -16,7 +16,7 @@ enum ModelState {

class Model<T> extends EventEmitter<any> {
private state: ModelState = ModelState.INITIALIZED;
private streams: Record<string, EventStream<any>> = {};
private streams: Record<string, Stream<any>> = {};
private data: T;

constructor(readonly name: string, readonly client: Types.RealtimePromise, options?: ModelOptions) {
Expand All @@ -28,7 +28,7 @@ class Model<T> extends EventEmitter<any> {
}
}

stream(name: string): EventStream<any> {
stream(name: string): Stream<any> {
if (!this.streams[name]) {
throw new Error(`stream with name '${name}' not registered on model '${this.name}'`);
}
Expand Down
24 changes: 12 additions & 12 deletions src/Models.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,25 @@ describe('Models', () => {

it<ModelsTestContext>('creates an event stream that inherits the root class ably client', () => {
const models = new Models({ ...defaultClientConfig });
const eventStream = models.EventStream('test', { channel: 'foobar' });
expect(eventStream.name).toEqual('test');
expect(eventStream.client['options']).toContain(defaultClientConfig);
const stream = models.Stream('test', { channel: 'foobar' });
expect(stream.name).toEqual('test');
expect(stream.ably['options']).toContain(defaultClientConfig);
});

it<ModelsTestContext>('getting an event stream without options throws', () => {
const models = new Models({ ...defaultClientConfig });
expect(() => models.EventStream('test')).toThrow('EventStream cannot be instantiated without options');
expect(() => models.Stream('test')).toThrow('Stream cannot be instantiated without options');
});

it<ModelsTestContext>('getting an event stream with the same name returns the same instance', () => {
const models = new Models({ ...defaultClientConfig });
const eventStream1 = models.EventStream('test', { channel: 'foobar' }); // first call requires options to instantiate
expect(eventStream1.name).toEqual('test');
const eventStream2 = models.EventStream('test'); // subsequent calls do not require options
expect(eventStream2.name).toEqual('test');
expect(eventStream1).toEqual(eventStream2);
const eventStream3 = models.EventStream('test', { channel: 'barbaz' }); // providing options to subsequent calls is allowed but ignored
expect(eventStream3.name).toEqual('test');
expect(eventStream1).toEqual(eventStream3);
const stream1 = models.Stream('test', { channel: 'foobar' }); // first call requires options to instantiate
expect(stream1.name).toEqual('test');
const stream2 = models.Stream('test'); // subsequent calls do not require options
expect(stream2.name).toEqual('test');
expect(stream1).toEqual(stream2);
const stream3 = models.Stream('test', { channel: 'barbaz' }); // providing options to subsequent calls is allowed but ignored
expect(stream3.name).toEqual('test');
expect(stream1).toEqual(stream3);
});
});
Loading

0 comments on commit fc940a4

Please sign in to comment.