diff --git a/src/stream/Middleware.test.ts b/src/stream/Middleware.test.ts index f8ff4bb..60c8dbf 100644 --- a/src/stream/Middleware.test.ts +++ b/src/stream/Middleware.test.ts @@ -181,13 +181,13 @@ describe('OrderedHistoryResumer', () => { }); it('orders numerically', () => { - const sequenceId = 0; + const sequenceId = 1; const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0); const subscription = vi.fn(); middleware.subscribe(subscription); // construct history page newest to oldest - let history: Types.Message[] = [createMessage(10), createMessage(2), createMessage(1), createMessage(0)]; + let history: Types.Message[] = [createMessage(10), createMessage(3), createMessage(2), createMessage(1)]; // shuffle as the middleware should be resilient to some out-of-orderiness by sequenceId due to CGO expect(middleware.addHistoricalMessages(shuffle(history))).toBe(true); expect(() => middleware.addHistoricalMessages(history)).toThrowError( @@ -201,13 +201,13 @@ describe('OrderedHistoryResumer', () => { }); it('orders lexicographically', () => { - const sequenceId = 0; + const sequenceId = 1; const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0, lexicographicOrderer); const subscription = vi.fn(); middleware.subscribe(subscription); // construct history page newest to oldest - let history: Types.Message[] = [createMessage(10), createMessage(2), createMessage(1), createMessage(0)]; + let history: Types.Message[] = [createMessage(10), createMessage(3), createMessage(2), createMessage(1)]; // shuffle as the middleware should be resilient to some out-of-orderiness by sequenceId due to CGO expect(middleware.addHistoricalMessages(shuffle(history))).toBe(true); expect(() => middleware.addHistoricalMessages(history)).toThrowError( @@ -215,9 +215,9 @@ describe('OrderedHistoryResumer', () => { ); expect(subscription).toHaveBeenCalledTimes(3); - expect(subscription).toHaveBeenNthCalledWith(1, null, history[2]); - expect(subscription).toHaveBeenNthCalledWith(2, null, history[0]); - expect(subscription).toHaveBeenNthCalledWith(3, null, history[1]); + expect(subscription).toHaveBeenNthCalledWith(1, null, history[0]); // id: 10 + expect(subscription).toHaveBeenNthCalledWith(2, null, history[2]); // id: 2 + expect(subscription).toHaveBeenNthCalledWith(3, null, history[1]); // id: 3 }); it('emits messages after the boundary with sparse sequence', () => { @@ -266,25 +266,26 @@ describe('OrderedHistoryResumer', () => { expect(subscription).toHaveBeenNthCalledWith(2, null, history[0]); }); - it('flushes when empty history page reached', () => { - const sequenceId = 0; // out of reach + it('flushes but still in seeking state when empty history page reached', () => { + const sequenceId = 1; // out of reach const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0); const subscription = vi.fn(); middleware.subscribe(subscription); let history: Types.Message[] = [ + createMessage(6), createMessage(5), createMessage(4), createMessage(3), createMessage(2), - createMessage(1), ]; const page1 = history; const page2 = []; expect(middleware.addHistoricalMessages(shuffle(page1))).toBe(false); - expect(middleware.addHistoricalMessages(shuffle(page2))).toBe(true); + expect(middleware.addHistoricalMessages(page2)).toBe(true); + expect(middleware.state).toBe('seeking'); expect(subscription).toHaveBeenCalledTimes(5); expect(subscription).toHaveBeenNthCalledWith(1, null, history[4]); expect(subscription).toHaveBeenNthCalledWith(2, null, history[3]); @@ -293,6 +294,37 @@ describe('OrderedHistoryResumer', () => { expect(subscription).toHaveBeenNthCalledWith(5, null, history[0]); }); + it('successfully applies history when messages is empty but sequenceId is 0', () => { + const sequenceId = 0; + const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0); + const subscription = vi.fn(); + middleware.subscribe(subscription); + + let history: Types.Message[] = [createMessage(3), createMessage(2), createMessage(1)]; + const page1 = history; + const page2 = []; + + expect(middleware.addHistoricalMessages(shuffle(page1))).toBe(false); + expect(middleware.addHistoricalMessages(page2)).toBe(true); + + expect(middleware.state).toEqual('success'); + expect(subscription).toHaveBeenCalledTimes(3); + expect(subscription).toHaveBeenNthCalledWith(1, null, history[2]); + expect(subscription).toHaveBeenNthCalledWith(2, null, history[1]); + expect(subscription).toHaveBeenNthCalledWith(3, null, history[0]); + }); + + it('state is successful with empty page and no history', () => { + const sequenceId = 1; + const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0); + const subscription = vi.fn(); + middleware.subscribe(subscription); + + expect(middleware.addHistoricalMessages([])).toBe(true); + expect(middleware.state).toEqual('success'); + expect(subscription).not.toHaveBeenCalled(); + }); + it('merges historical messages with live messages', () => { const sequenceId = 3; const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0); @@ -354,4 +386,15 @@ describe('OrderedHistoryResumer', () => { expect(subscription).toHaveBeenNthCalledWith(5, null, live[2]); expect(subscription).toHaveBeenNthCalledWith(6, null, live[3]); }); + + it('no seeking preformed if sequenceId is 0', () => { + const sequenceId = 0; + const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0); + const subscription = vi.fn(); + middleware.subscribe(subscription); + + let history: Types.Message[] = [createMessage(1), createMessage(0)]; + expect(middleware.addHistoricalMessages(shuffle(history))).toBe(false); + expect(subscription).not.toHaveBeenCalled(); + }); }); diff --git a/src/stream/Middleware.ts b/src/stream/Middleware.ts index f141ffb..2d0cda1 100644 --- a/src/stream/Middleware.ts +++ b/src/stream/Middleware.ts @@ -133,6 +133,15 @@ export class OrderedHistoryResumer extends MiddlewareBase { return this.eventOrderer(a, b) <= 0; } + public applyHistory() { + if (this.historicalMessages.length === 0 || this.currentState === 'success') { + return; + } + + this.flush(); + this.currentState = 'success'; + } + public get state() { return this.currentState; } @@ -145,10 +154,12 @@ export class OrderedHistoryResumer extends MiddlewareBase { // the messages expired before the next page was requested. if (messages.length === 0) { // If there were some messages in history then there have definitely been changes to the state - // and we can't reach back far enough to resume from the correct point. + // and we can't reach back far enough to resume from the correct point. If the sequenceId is + // '0' then we assume this due to SQL coalesce and no actual message will ever be found so + // flush() will reply what history is there. const noHistory = this.historicalMessages.length === 0; this.flush(); - if (noHistory) { + if (noHistory || this.sequenceId === '0') { this.currentState = 'success'; } return true; @@ -169,6 +180,13 @@ export class OrderedHistoryResumer extends MiddlewareBase { // This is sufficiently low likelihood that this can be ignored for now. this.historicalMessages.sort((a, b) => this.reverseOrderer(a.id, b.id)); + // A sequenceId of 0 is a cursor that represents the position before the start of the stream. + // There is no such historical message to seek to, instead we should paginate through all the + // of history to obtain all messages that were published since the sequenceId was obtained. + if (this.sequenceId === '0') { + return false; + } + // Seek backwards through history until we reach a message id <= the specified sequenceId. // Discard anything older (>= sequenceId) and flush out the remaining messages. for (let i = 0; i < this.historicalMessages.length; i++) { diff --git a/src/stream/Stream.test.ts b/src/stream/Stream.test.ts index 150c0ab..fafa80a 100644 --- a/src/stream/Stream.test.ts +++ b/src/stream/Stream.test.ts @@ -270,6 +270,120 @@ describe('Stream', () => { }); }); + it('successfully to syncs if sequenceId is 0 with multiple pages of history', async ({ + ably, + logger, + channelName, + }) => { + const subscribeListener = vi.fn(); + const channel = ably.channels.get(channelName); + ably.channels.release = vi.fn(); + channel.subscribe = vi.fn( + async (): Promise => ({ + current: 'attached', + previous: 'attaching', + resumed: false, + hasBacklog: false, + }), + ); + let i = 0; + channel.history = vi.fn(async (): Promise>> => { + i++; + if (i === 1) { + return { + items: [createMessage(7), createMessage(6), createMessage(5)], + hasNext: () => true, + }; + } + return { + items: [createMessage(4), createMessage(3), createMessage(2)], + hasNext: () => false, + }; + }); + + const stream = new Stream({ + ably, + logger, + channelName: 'foobar', + syncOptions: defaultSyncOptions, + eventBufferOptions: defaultEventBufferOptions, + }); + stream.subscribe(subscribeListener); + let replayPromise = stream.replay('0'); + + await statePromise(stream, 'seeking'); + await expect(replayPromise).resolves.toBeUndefined(); + + expect(channel.subscribe).toHaveBeenCalledOnce(); + expect(channel.history).toHaveBeenCalledTimes(2); + expect(channel.history).toHaveBeenNthCalledWith(1, { + untilAttach: true, + limit: defaultSyncOptions.historyPageSize, + }); + expect(channel.history).toHaveBeenNthCalledWith(2, { + untilAttach: true, + limit: defaultSyncOptions.historyPageSize, + }); + expect(subscribeListener).toHaveBeenCalledTimes(6); + }); + + it('successfully to syncs if sequenceId is 0 with 2 pages of history, second one empty', async ({ + ably, + logger, + channelName, + }) => { + const subscribeListener = vi.fn(); + const channel = ably.channels.get(channelName); + ably.channels.release = vi.fn(); + channel.subscribe = vi.fn( + async (): Promise => ({ + current: 'attached', + previous: 'attaching', + resumed: false, + hasBacklog: false, + }), + ); + let i = 0; + channel.history = vi.fn(async (): Promise>> => { + i++; + if (i === 1) { + return { + items: [createMessage(7), createMessage(6), createMessage(5)], + hasNext: () => true, + }; + } + return { + items: [], + hasNext: () => false, + }; + }); + + const stream = new Stream({ + ably, + logger, + channelName: 'foobar', + syncOptions: defaultSyncOptions, + eventBufferOptions: defaultEventBufferOptions, + }); + stream.subscribe(subscribeListener); + let replayPromise = stream.replay('0'); + + await statePromise(stream, 'seeking'); + await expect(replayPromise).resolves.toBeUndefined(); + + expect(channel.subscribe).toHaveBeenCalledOnce(); + expect(channel.history).toHaveBeenCalledTimes(2); + expect(channel.history).toHaveBeenNthCalledWith(1, { + untilAttach: true, + limit: defaultSyncOptions.historyPageSize, + }); + expect(channel.history).toHaveBeenNthCalledWith(2, { + untilAttach: true, + limit: defaultSyncOptions.historyPageSize, + }); + expect(subscribeListener).toHaveBeenCalledTimes(3); + }); + it('subscribes to messages', async ({ ably, logger, channelName }) => { const channel = ably.channels.get(channelName); channel.history = vi.fn( diff --git a/src/stream/Stream.ts b/src/stream/Stream.ts index b3869fa..b7514c2 100644 --- a/src/stream/Stream.ts +++ b/src/stream/Stream.ts @@ -197,6 +197,14 @@ export default class Stream extends EventEmitter 0 && page.hasNext() && !done); + if (sequenceId === '0' && this.middleware.state !== 'success') { + // The sequenceId is 0 there will be no message in the history to match it. + // The middleware is not in success which means there is some history so we apply it + // The situation occurs when history has been added in the time between the sync function resolving the stream + // getting to this point + this.middleware.applyHistory(); + } + // If the middleware is not in the success state it means there were some history messages and we never reached the target sequenceId. // This means the target sequenceId was too old and a re-sync from a newer state snapshot is required. if (this.middleware.state !== 'success') {