Skip to content

Commit

Permalink
refactor: use crossws interface to call hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
pi0 committed Feb 24, 2024
1 parent 5f72d42 commit 7e36eba
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 66 deletions.
2 changes: 1 addition & 1 deletion docs/2.adapters/1.index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ See Adapters section to learn more about all available built-in adapters.

## Integration with other runtimes

You can define your custom adapters using `defineWebSocketAdapter` wrapper.
You can define your custom adapters using `defineWebSocketAdapter` wrapper and using `createCrossWS` utility to handle events.

See other adapter implementations in [`src/adapters`](https://github.com/unjs/crossws/tree/main/src/adapters/) to get an idea of how adapters can be implemented and feel free to directly make a Pull Request to support your environment in CrossWS!
4 changes: 2 additions & 2 deletions src/adapter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { WebSocketHooks } from "./hooks";
import type { WebSocketHooks, AdapterHooks } from "./hooks";

export type WebSocketAdapter<RT = any, OT = any> = (
hooks: Partial<WebSocketHooks>,
hooks: Partial<WebSocketHooks & AdapterHooks>,
opts: OT,
) => RT;

Expand Down
29 changes: 16 additions & 13 deletions src/adapters/bun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import { WebSocketMessage } from "../message";
import { WebSocketError } from "../error";
import { WebSocketPeerBase } from "../peer";
import { defineWebSocketAdapter } from "../adapter";
import { CrossWSOptions, createCrossWS } from "../crossws";

export interface AdapterOptions {}
export interface AdapterOptions extends CrossWSOptions {}

type ContextData = { _peer?: WebSocketPeer };

Expand All @@ -16,7 +17,9 @@ export interface Adapter {
}

export default defineWebSocketAdapter<Adapter, AdapterOptions>(
(hooks, opts = {}) => {
(hooks, options = {}) => {
const crossws = createCrossWS(hooks, options);

const getPeer = (ws: ServerWebSocket<ContextData>) => {
if (ws.data?._peer) {
return ws.data._peer;
Expand All @@ -31,36 +34,36 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
websocket: {
message: (ws, message) => {
const peer = getPeer(ws);
hooks["bun:message"]?.(peer, ws, message);
hooks.message?.(peer, new WebSocketMessage(message));
crossws.$("bun:message", peer, ws, message);
crossws.message(peer, new WebSocketMessage(message));
},
open: (ws) => {
const peer = getPeer(ws);
hooks["bun:open"]?.(peer, ws);
hooks.open?.(peer);
crossws.$("bun:open", peer, ws);
crossws.open(peer);
},
close: (ws) => {
const peer = getPeer(ws);
hooks["bun:close"]?.(peer, ws);
hooks.close?.(peer, {});
crossws.$("bun:close", peer, ws);
crossws.close(peer, {});
},
drain: (ws) => {
const peer = getPeer(ws);
hooks["bun:drain"]?.(peer);
crossws.$("bun:drain", peer);
},
// @ts-expect-error types unavailable but mentioned in docs
error: (ws, error) => {
const peer = getPeer(ws);
hooks["bun:error"]?.(peer, ws, error);
hooks.error?.(peer, new WebSocketError(error));
crossws.$("bun:error", peer, ws, error);
crossws.error(peer, new WebSocketError(error));
},
ping(ws, data) {
const peer = getPeer(ws);
hooks["bun:ping"]?.(peer, ws, data);
crossws.$("bun:ping", peer, ws, data);
},
pong(ws, data) {
const peer = getPeer(ws);
hooks["bun:pong"]?.(peer, ws, data);
crossws.$("bun:pong", peer, ws, data);
},
},
};
Expand Down
15 changes: 9 additions & 6 deletions src/adapters/cloudflare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import { WebSocketPeerBase } from "../peer";
import { defineWebSocketAdapter } from "../adapter.js";
import { WebSocketMessage } from "../message";
import { WebSocketError } from "../error";
import { CrossWSOptions, createCrossWS } from "../crossws";

type Env = Record<string, any>;

declare const WebSocketPair: typeof _cf.WebSocketPair;
declare const Response: typeof _cf.Response;

export interface AdapterOptions {}
export interface AdapterOptions extends CrossWSOptions {}

export interface Adapter {
handleUpgrade(
Expand All @@ -23,7 +24,9 @@ export interface Adapter {
}

export default defineWebSocketAdapter<Adapter, AdapterOptions>(
(hooks, opts = {}) => {
(hooks, options = {}) => {
const crossws = createCrossWS(hooks, options);

const handleUpgrade = (
request: _cf.Request,
env: Env,
Expand All @@ -39,21 +42,21 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(

server.accept();

hooks["cloudflare:accept"]?.(peer);
crossws.$("cloudflare:accept", peer);
hooks.open?.(peer);

server.addEventListener("message", (event) => {
hooks["cloudflare:message"]?.(peer, event);
crossws.$("cloudflare:message", peer, event);
hooks.message?.(peer, new WebSocketMessage(event.data));
});

server.addEventListener("error", (event) => {
hooks["cloudflare:error"]?.(peer, event);
crossws.$("cloudflare:error", peer, event);
hooks.error?.(peer, new WebSocketError(event.error));
});

server.addEventListener("close", (event) => {
hooks["cloudflare:close"]?.(peer, event);
crossws.$("cloudflare:close", peer, event);
hooks.close?.(peer, { code: event.code, reason: event.reason });
});

Expand Down
15 changes: 9 additions & 6 deletions src/adapters/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import { WebSocketMessage } from "../message";
import { WebSocketError } from "../error";
import { WebSocketPeerBase } from "../peer";
import { defineWebSocketAdapter } from "../adapter.js";
import { CrossWSOptions, createCrossWS } from "../crossws";

export interface AdapterOptions {}
export interface AdapterOptions extends CrossWSOptions {}

export interface Adapter {
handleUpgrade(req: Request): Response;
Expand All @@ -18,26 +19,28 @@ declare global {
}

export default defineWebSocketAdapter<Adapter, AdapterOptions>(
(hooks, opts = {}) => {
(hooks, options = {}) => {
const crossws = createCrossWS(hooks, options);

const handleUpgrade = (request: Request) => {
const upgrade = Deno.upgradeWebSocket(request);
const peer = new DenoWebSocketPeer({
deno: { ws: upgrade.socket, request },
});
upgrade.socket.addEventListener("open", () => {
hooks["deno:open"]?.(peer);
crossws.$("deno:open", peer);
hooks.open?.(peer);
});
upgrade.socket.addEventListener("message", (event) => {
hooks["deno:message"]?.(peer, event);
crossws.$("deno:message", peer, event);
hooks.message?.(peer, new WebSocketMessage(event.data));
});
upgrade.socket.addEventListener("close", () => {
hooks["deno:close"]?.(peer);
crossws.$("deno:close", peer);
hooks.close?.(peer, {});
});
upgrade.socket.addEventListener("error", (error) => {
hooks["deno:error"]?.(peer, error);
crossws.$("deno:error", peer, error);
hooks.error?.(peer, new WebSocketError(error));
});
return upgrade.response;
Expand Down
29 changes: 17 additions & 12 deletions src/adapters/node-uws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import { WebSocketBehavior, WebSocket } from "uWebSockets.js";
import { WebSocketPeerBase } from "../peer";
import { WebSocketMessage } from "../message";
import { defineWebSocketAdapter } from "../adapter";
import { CrossWSOptions, createCrossWS } from "../crossws";

type UserData = { _peer?: any };

type WebSocketHandler = WebSocketBehavior<UserData>;

export interface AdapterOptions
extends Exclude<
export interface AdapterOptions extends CrossWSOptions {
uws?: Exclude<
WebSocketBehavior<any>,
| "close"
| "drain"
Expand All @@ -20,14 +22,17 @@ export interface AdapterOptions
| "pong"
| "subscription"
| "upgrade"
> {}
>;
}

export interface Adapter {
websocket: WebSocketHandler;
}

export default defineWebSocketAdapter<Adapter, AdapterOptions>(
(hooks, opts = {}) => {
(hooks, options = {}) => {
const crossws = createCrossWS(hooks, options);

const getPeer = (ws: WebSocket<UserData>) => {
const userData = ws.getUserData();
if (userData._peer) {
Expand All @@ -39,38 +44,38 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
};

const websocket: WebSocketHandler = {
...opts,
...options.uws,
close(ws, code, message) {
const peer = getPeer(ws);
hooks["uws:close"]?.(peer, ws, code, message);
crossws.$("uws:close", peer, ws, code, message);
hooks.close?.(peer, { code, reason: message?.toString() });
},
drain(ws) {
const peer = getPeer(ws);
hooks["uws:drain"]?.(peer, ws);
crossws.$("uws:drain", peer, ws);
},
message(ws, message, isBinary) {
const peer = getPeer(ws);
hooks["uws:message"]?.(peer, ws, message, isBinary);
crossws.$("uws:message", peer, ws, message, isBinary);
const msg = new WebSocketMessage(message, isBinary);
hooks.message?.(peer, msg);
},
open(ws) {
const peer = getPeer(ws);
hooks["uws:open"]?.(peer, ws);
crossws.$("uws:open", peer, ws);
hooks.open?.(peer);
},
ping(ws, message) {
const peer = getPeer(ws);
hooks["uws:ping"]?.(peer, ws, message);
crossws.$("uws:ping", peer, ws, message);
},
pong(ws, message) {
const peer = getPeer(ws);
hooks["uws:pong"]?.(peer, ws, message);
crossws.$("uws:pong", peer, ws, message);
},
subscription(ws, topic, newCount, oldCount) {
const peer = getPeer(ws);
hooks["uws:subscription"]?.(peer, ws, topic, newCount, oldCount);
crossws.$("uws:subscription", peer, ws, topic, newCount, oldCount);
},
// error ? TODO
// upgrade(res, req, context) {}
Expand Down
51 changes: 27 additions & 24 deletions src/adapters/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ import { WebSocketPeerBase } from "../peer";
import { WebSocketMessage } from "../message";
import { WebSocketError } from "../error";
import { defineWebSocketAdapter } from "../adapter";
import { CrossWSOptions, createCrossWS } from "../crossws";

export interface AdapterOptions {
export interface AdapterOptions extends CrossWSOptions {
wss?: WebSocketServer;
serverOptions?: ServerOptions;
}
Expand All @@ -25,70 +26,72 @@ export interface Adapter {
}

export default defineWebSocketAdapter<Adapter, AdapterOptions>(
(hooks, opts = {}) => {
(hooks, options = {}) => {
const crossws = createCrossWS(hooks, options);

const wss: WebSocketServer =
opts.wss ||
options.wss ||
(new _WebSocketServer({
noServer: true,
...(opts.serverOptions as any),
...(options.serverOptions as any),
}) as WebSocketServer);

// Unmanaged server-level events
// wss.on("error", (error) => {
// hooks["node:server-error"]?.( error);
// });
// wss.on("headers", (headers, request) => {
// hooks["node:server-headers"]?.( headers, request);
// });
// wss.on("listening", () => {
// hooks.onEvent?.("node:server-listening");
// });
// wss.on("close", () => {
// hooks.onEvent?.("node:server-close");
// });
wss.on("error", (error) => {
crossws.$("node:server-error", error);
});
wss.on("headers", (headers, request) => {
crossws.$("node:server-headers", headers, request);
});
wss.on("listening", () => {
crossws.$("node:server-listening");
});
wss.on("close", () => {
crossws.$("node:server-close");
});

wss.on("connection", (ws, req) => {
const peer = new NodeWebSocketPeer({ node: { ws, req, server: wss } });
hooks.open?.(peer);

// Managed socket-level events
ws.on("message", (data: RawData, isBinary: boolean) => {
hooks["node:message"]?.(peer, data, isBinary);
crossws.$("node:message", peer, data, isBinary);
if (Array.isArray(data)) {
data = Buffer.concat(data);
}
hooks.message?.(peer, new WebSocketMessage(data, isBinary));
});
ws.on("error", (error: Error) => {
hooks["node:error"]?.(peer, error);
crossws.$("node:error", peer, error);
hooks.error?.(peer, new WebSocketError(error));
});
ws.on("close", (code: number, reason: Buffer) => {
hooks["node:close"]?.(peer, code, reason);
crossws.$("node:close", peer, code, reason);
hooks.close?.(peer, {
code,
reason: reason?.toString(),
});
});
ws.on("open", () => {
hooks["node:open"]?.(peer);
crossws.$("node:open", peer);
});

// Unmanaged socket-level events
ws.on("ping", (data: Buffer) => {
hooks["node:ping"]?.(peer, data);
crossws.$("node:ping", peer, data);
});
ws.on("pong", (data: Buffer) => {
hooks["node:pong"]?.(peer, data);
crossws.$("node:pong", peer, data);
});
ws.on(
"unexpected-response",
(req: ClientRequest, res: IncomingMessage) => {
hooks["node:unexpected-response"]?.(peer, req, res);
crossws.$("node:unexpected-response", peer, req, res);
},
);
ws.on("upgrade", (req: IncomingMessage) => {
hooks["node:upgrade"]?.(peer, req);
crossws.$("node:upgrade", peer, req);
});
});

Expand Down
Loading

0 comments on commit 7e36eba

Please sign in to comment.