Skip to content

Commit

Permalink
add support when serquenceId is 0 and channel history is present
Browse files Browse the repository at this point in the history
  • Loading branch information
kaschula committed Mar 18, 2024
1 parent c96cf2a commit f976bb1
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 13 deletions.
65 changes: 54 additions & 11 deletions src/stream/Middleware.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ describe('OrderedHistoryResumer', () => {
});

it('orders numerically', () => {
const sequenceId = 0;
const sequenceId = 1;
const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0);
const subscription = vi.fn();
middleware.subscribe(subscription);

// construct history page newest to oldest
let history: Types.Message[] = [createMessage(10), createMessage(2), createMessage(1), createMessage(0)];
let history: Types.Message[] = [createMessage(10), createMessage(3), createMessage(2), createMessage(1)];
// shuffle as the middleware should be resilient to some out-of-orderiness by sequenceId due to CGO
expect(middleware.addHistoricalMessages(shuffle(history))).toBe(true);
expect(() => middleware.addHistoricalMessages(history)).toThrowError(
Expand All @@ -201,23 +201,23 @@ describe('OrderedHistoryResumer', () => {
});

it('orders lexicographically', () => {
const sequenceId = 0;
const sequenceId = 1;
const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0, lexicographicOrderer);
const subscription = vi.fn();
middleware.subscribe(subscription);

// construct history page newest to oldest
let history: Types.Message[] = [createMessage(10), createMessage(2), createMessage(1), createMessage(0)];
let history: Types.Message[] = [createMessage(10), createMessage(3), createMessage(2), createMessage(1)];
// shuffle as the middleware should be resilient to some out-of-orderiness by sequenceId due to CGO
expect(middleware.addHistoricalMessages(shuffle(history))).toBe(true);
expect(() => middleware.addHistoricalMessages(history)).toThrowError(
'can only add historical messages while in seeking state',
);

expect(subscription).toHaveBeenCalledTimes(3);
expect(subscription).toHaveBeenNthCalledWith(1, null, history[2]);
expect(subscription).toHaveBeenNthCalledWith(2, null, history[0]);
expect(subscription).toHaveBeenNthCalledWith(3, null, history[1]);
expect(subscription).toHaveBeenNthCalledWith(1, null, history[0]); // id: 10
expect(subscription).toHaveBeenNthCalledWith(2, null, history[2]); // id: 2
expect(subscription).toHaveBeenNthCalledWith(3, null, history[1]); // id: 3
});

it('emits messages after the boundary with sparse sequence', () => {
Expand Down Expand Up @@ -266,25 +266,26 @@ describe('OrderedHistoryResumer', () => {
expect(subscription).toHaveBeenNthCalledWith(2, null, history[0]);
});

it('flushes when empty history page reached', () => {
const sequenceId = 0; // out of reach
it('flushes but still in seeking state when empty history page reached', () => {
const sequenceId = 1; // out of reach
const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0);
const subscription = vi.fn();
middleware.subscribe(subscription);

let history: Types.Message[] = [
createMessage(6),
createMessage(5),
createMessage(4),
createMessage(3),
createMessage(2),
createMessage(1),
];
const page1 = history;
const page2 = [];

expect(middleware.addHistoricalMessages(shuffle(page1))).toBe(false);
expect(middleware.addHistoricalMessages(shuffle(page2))).toBe(true);
expect(middleware.addHistoricalMessages(page2)).toBe(true);

expect(middleware.state).toBe('seeking');
expect(subscription).toHaveBeenCalledTimes(5);
expect(subscription).toHaveBeenNthCalledWith(1, null, history[4]);
expect(subscription).toHaveBeenNthCalledWith(2, null, history[3]);
Expand All @@ -293,6 +294,37 @@ describe('OrderedHistoryResumer', () => {
expect(subscription).toHaveBeenNthCalledWith(5, null, history[0]);
});

it('successfully applies history when messages is empty but sequenceId is 0', () => {
const sequenceId = 0;
const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0);
const subscription = vi.fn();
middleware.subscribe(subscription);

let history: Types.Message[] = [createMessage(3), createMessage(2), createMessage(1)];
const page1 = history;
const page2 = [];

expect(middleware.addHistoricalMessages(shuffle(page1))).toBe(false);
expect(middleware.addHistoricalMessages(page2)).toBe(true);

expect(middleware.state).toEqual('success');
expect(subscription).toHaveBeenCalledTimes(3);
expect(subscription).toHaveBeenNthCalledWith(1, null, history[2]);
expect(subscription).toHaveBeenNthCalledWith(2, null, history[1]);
expect(subscription).toHaveBeenNthCalledWith(3, null, history[0]);
});

it('state is successful with empty page and no history', () => {
const sequenceId = 1;
const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0);
const subscription = vi.fn();
middleware.subscribe(subscription);

expect(middleware.addHistoricalMessages([])).toBe(true);
expect(middleware.state).toEqual('success');
expect(subscription).not.toHaveBeenCalled();
});

it('merges historical messages with live messages', () => {
const sequenceId = 3;
const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0);
Expand Down Expand Up @@ -354,4 +386,15 @@ describe('OrderedHistoryResumer', () => {
expect(subscription).toHaveBeenNthCalledWith(5, null, live[2]);
expect(subscription).toHaveBeenNthCalledWith(6, null, live[3]);
});

it('no seeking preformed if sequenceId is 0', () => {
const sequenceId = 0;
const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0);
const subscription = vi.fn();
middleware.subscribe(subscription);

let history: Types.Message[] = [createMessage(1), createMessage(0)];
expect(middleware.addHistoricalMessages(shuffle(history))).toBe(false);
expect(subscription).not.toHaveBeenCalled();
});
});
22 changes: 20 additions & 2 deletions src/stream/Middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ export class OrderedHistoryResumer extends MiddlewareBase {
return this.eventOrderer(a, b) <= 0;
}

public applyHistory() {
if (this.historicalMessages.length === 0 || this.currentState === 'success') {
return;
}

this.flush();
this.currentState = 'success';
}

public get state() {
return this.currentState;
}
Expand All @@ -145,10 +154,12 @@ export class OrderedHistoryResumer extends MiddlewareBase {
// the messages expired before the next page was requested.
if (messages.length === 0) {
// If there were some messages in history then there have definitely been changes to the state
// and we can't reach back far enough to resume from the correct point.
// and we can't reach back far enough to resume from the correct point. If the sequenceId is
// '0' then we assume this due to SQL coalesce and no actual message will ever be found so
// flush() will reply what history is there.
const noHistory = this.historicalMessages.length === 0;
this.flush();
if (noHistory) {
if (noHistory || this.sequenceId === '0') {
this.currentState = 'success';
}
return true;
Expand All @@ -169,6 +180,13 @@ export class OrderedHistoryResumer extends MiddlewareBase {
// This is sufficiently low likelihood that this can be ignored for now.
this.historicalMessages.sort((a, b) => this.reverseOrderer(a.id, b.id));

// A sequenceId of 0 is a cursor that represents the position before the start of the stream.
// There is no such historical message to seek to, instead we should paginate through all the
// of history to obtain all messages that were published since the sequenceId was obtained.
if (this.sequenceId === '0') {
return false;
}

// Seek backwards through history until we reach a message id <= the specified sequenceId.
// Discard anything older (>= sequenceId) and flush out the remaining messages.
for (let i = 0; i < this.historicalMessages.length; i++) {
Expand Down
114 changes: 114 additions & 0 deletions src/stream/Stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,120 @@ describe('Stream', () => {
});
});

it<StreamTestContext>('successfully to syncs if sequenceId is 0 with multiple pages of history', async ({
ably,
logger,
channelName,
}) => {
const subscribeListener = vi.fn();
const channel = ably.channels.get(channelName);
ably.channels.release = vi.fn();
channel.subscribe = vi.fn<any, any>(
async (): Promise<Types.ChannelStateChange | null> => ({
current: 'attached',
previous: 'attaching',
resumed: false,
hasBacklog: false,
}),
);
let i = 0;
channel.history = vi.fn<any, any>(async (): Promise<Partial<Types.PaginatedResult<Types.Message>>> => {
i++;
if (i === 1) {
return {
items: [createMessage(7), createMessage(6), createMessage(5)],
hasNext: () => true,
};
}
return {
items: [createMessage(4), createMessage(3), createMessage(2)],
hasNext: () => false,
};
});

const stream = new Stream({
ably,
logger,
channelName: 'foobar',
syncOptions: defaultSyncOptions,
eventBufferOptions: defaultEventBufferOptions,
});
stream.subscribe(subscribeListener);
let replayPromise = stream.replay('0');

await statePromise(stream, 'seeking');
await expect(replayPromise).resolves.toBeUndefined();

expect(channel.subscribe).toHaveBeenCalledOnce();
expect(channel.history).toHaveBeenCalledTimes(2);
expect(channel.history).toHaveBeenNthCalledWith(1, {
untilAttach: true,
limit: defaultSyncOptions.historyPageSize,
});
expect(channel.history).toHaveBeenNthCalledWith(2, {
untilAttach: true,
limit: defaultSyncOptions.historyPageSize,
});
expect(subscribeListener).toHaveBeenCalledTimes(6);
});

it<StreamTestContext>('successfully to syncs if sequenceId is 0 with 2 pages of history, second one empty', async ({
ably,
logger,
channelName,
}) => {
const subscribeListener = vi.fn();
const channel = ably.channels.get(channelName);
ably.channels.release = vi.fn();
channel.subscribe = vi.fn<any, any>(
async (): Promise<Types.ChannelStateChange | null> => ({
current: 'attached',
previous: 'attaching',
resumed: false,
hasBacklog: false,
}),
);
let i = 0;
channel.history = vi.fn<any, any>(async (): Promise<Partial<Types.PaginatedResult<Types.Message>>> => {
i++;
if (i === 1) {
return {
items: [createMessage(7), createMessage(6), createMessage(5)],
hasNext: () => true,
};
}
return {
items: [],
hasNext: () => false,
};
});

const stream = new Stream({
ably,
logger,
channelName: 'foobar',
syncOptions: defaultSyncOptions,
eventBufferOptions: defaultEventBufferOptions,
});
stream.subscribe(subscribeListener);
let replayPromise = stream.replay('0');

await statePromise(stream, 'seeking');
await expect(replayPromise).resolves.toBeUndefined();

expect(channel.subscribe).toHaveBeenCalledOnce();
expect(channel.history).toHaveBeenCalledTimes(2);
expect(channel.history).toHaveBeenNthCalledWith(1, {
untilAttach: true,
limit: defaultSyncOptions.historyPageSize,
});
expect(channel.history).toHaveBeenNthCalledWith(2, {
untilAttach: true,
limit: defaultSyncOptions.historyPageSize,
});
expect(subscribeListener).toHaveBeenCalledTimes(3);
});

it<StreamTestContext>('subscribes to messages', async ({ ably, logger, channelName }) => {
const channel = ably.channels.get(channelName);
channel.history = vi.fn<any, any>(
Expand Down
8 changes: 8 additions & 0 deletions src/stream/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ export default class Stream extends EventEmitter<Record<StreamState, StreamState
n++;
} while (page && page.items && page.items.length > 0 && page.hasNext() && !done);

if (sequenceId === '0' && this.middleware.state !== 'success') {
// The sequenceId is 0 there will be no message in the history to match it.
// The middleware is not in success which means there is some history so we apply it
// The situation occurs when history has been added in the time between the sync function resolving the stream
// getting to this point
this.middleware.applyHistory();
}

// If the middleware is not in the success state it means there were some history messages and we never reached the target sequenceId.
// This means the target sequenceId was too old and a re-sync from a newer state snapshot is required.
if (this.middleware.state !== 'success') {
Expand Down

0 comments on commit f976bb1

Please sign in to comment.