Skip to content

Commit

Permalink
feat(client): Implement silent-reconnects
Browse files Browse the repository at this point in the history
Closes: enisdenjo#7
  • Loading branch information
enisdenjo committed Sep 8, 2020
1 parent 77b8e81 commit c6f7872
Showing 1 changed file with 96 additions and 47 deletions.
143 changes: 96 additions & 47 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ export interface ClientOptions {
* @default true
*/
lazy?: boolean;
/**
* How many times should the client try to reconnect on socket closure before it errors out?
* @default 3
*/
retryAttempts?: number;
/**
* How long should the client wait until attempting to retry connecting.
* @default 3 * 1000 (3 seconds)
*/
retryTimeout?: number;
}

export interface Client extends Disposable {
Expand All @@ -43,7 +53,13 @@ export interface Client extends Disposable {

/** Creates a disposable GQL subscriptions client. */
export function createClient(options: ClientOptions): Client {
const { url, connectionParams, lazy = true } = options;
const {
url,
connectionParams,
lazy = true,
retryAttempts = 3,
retryTimeout = 3 * 1000, // 3 seconds
} = options;

let state = {
socket: null as WebSocket | null,
Expand Down Expand Up @@ -248,16 +264,34 @@ export function createClient(options: ClientOptions): Client {
// in non-lazy mode always hold one connection lock to persist the socket
if (!lazy) {
(async () => {
try {
const [, throwOrCancel] = await connect({ current: null });
await throwOrCancel(); // either the canceller will be called or the socket closed
} catch (errOrCloseEvent) {
// normal closure is disposal, shouldnt throw
if (isCloseEvent(errOrCloseEvent) && errOrCloseEvent.code === 1000) {
return;
}
// will break or throw eventually
let retries = 0;
for (;;) {
try {
const [, throwOrCancel] = await connect({ current: null });
retries = 0; // reset retry counter on successful connect
await throwOrCancel(); // either the canceller will be called or the socket closed
break; // break the loop on cancel
} catch (errOrCloseEvent) {
// throw non `CloseEvent`s immediately, something else is wrong
if (!isCloseEvent(errOrCloseEvent)) {
throw errOrCloseEvent;
}

// normal closure is disposal, shouldnt try again
if (errOrCloseEvent.code === 1000) {
break;
}

throw errOrCloseEvent;
// retries expired, throw
if (retries >= retryAttempts) {
throw errOrCloseEvent;
}

// wait a bit and retry
await new Promise((resolve) => setTimeout(resolve, retryTimeout));
retries++;
}
}
})();
}
Expand Down Expand Up @@ -293,49 +327,64 @@ export function createClient(options: ClientOptions): Client {

const cancellerRef: CancellerRef = { current: null };
(async () => {
try {
const [socket, throwOrCancel] = await connect(cancellerRef);
socket.addEventListener('message', messageHandler);

socket.send(
stringifyMessage<MessageType.Subscribe>({
id: uuid,
type: MessageType.Subscribe,
payload,
}),
);
// will break or throw eventually
let retries = 0;
for (;;) {
try {
const [socket, throwOrCancel] = await connect(cancellerRef);
retries = 0; // reset retry counter on successful connect
socket.addEventListener('message', messageHandler);

socket.send(
stringifyMessage<MessageType.Subscribe>({
id: uuid,
type: MessageType.Subscribe,
payload,
}),
);

// either the canceller will be called and the promise resolved
// or the socket closed and the promise rejected
await throwOrCancel();
// either the canceller will be called and the promise resolved
// or the socket closed and the promise rejected
await throwOrCancel();

// TODO-db-200909 wont be removed on throw, but should it? the socket is closed on throw
socket.removeEventListener('message', messageHandler);
// TODO-db-200909 wont be removed on throw, but should it? the socket is closed on throw
socket.removeEventListener('message', messageHandler);

// send complete message to server
socket.send(
stringifyMessage<MessageType.Complete>({
id: uuid,
type: MessageType.Complete,
}),
);
} catch (errOrCloseEvent) {
// throw non `CloseEvent`s immediately, something else is wrong
if (!isCloseEvent(errOrCloseEvent)) {
throw errOrCloseEvent;
}
// send complete message to server
socket.send(
stringifyMessage<MessageType.Complete>({
id: uuid,
type: MessageType.Complete,
}),
);

// normal closure is disposal, shouldnt try again
if (errOrCloseEvent.code === 1000) {
return;
}
// and break the loop
break;
} catch (errOrCloseEvent) {
// throw non `CloseEvent`s immediately, something else is wrong
if (!isCloseEvent(errOrCloseEvent)) {
throw errOrCloseEvent;
}

// user cancelled early, shouldnt try again
if (errOrCloseEvent.code === 3499) {
return;
}
// normal closure is disposal, shouldnt try again
if (errOrCloseEvent.code === 1000) {
break;
}

throw errOrCloseEvent;
// user cancelled early, shouldnt try again
if (errOrCloseEvent.code === 3499) {
break;
}

// retries expired, throw
if (retries >= retryAttempts) {
throw errOrCloseEvent;
}

// wait a bit and retry
await new Promise((resolve) => setTimeout(resolve, retryTimeout));
retries++;
}
}
})()
.catch(sink.error)
Expand Down

0 comments on commit c6f7872

Please sign in to comment.