Skip to content

Commit

Permalink
fix: properly call the send callback during upgrade
Browse files Browse the repository at this point in the history
The "drain" event (added in [1]) had two different meanings:

- the transport is ready to be written
- the packets are sent over the wire

For the WebSocket and the WebTransport transports, those two events
happen at the same time, but this is not the case for the HTTP
long-polling transport:

- the transport is ready to be written when the client sends a GET request
- the packets are sent over the wire when the server responds to the GET request

Which caused an issue with send callbacks during an upgrade, since the
packets were written but the client would not open a new GET request.

There are now two distinct events: "ready" and "drain"

Related: #695

[1]: 2a93f06
  • Loading branch information
darrachequesne committed Jun 21, 2024
1 parent afd2934 commit 362bc78
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 8 deletions.
5 changes: 3 additions & 2 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,18 +221,21 @@ export class Socket extends EventEmitter {
*/
private setTransport(transport) {
const onError = this.onError.bind(this);
const onReady = () => this.flush();
const onPacket = this.onPacket.bind(this);
const onDrain = this.onDrain.bind(this);
const onClose = this.onClose.bind(this, "transport close");

this.transport = transport;
this.transport.once("error", onError);
this.transport.on("ready", onReady);
this.transport.on("packet", onPacket);
this.transport.on("drain", onDrain);
this.transport.once("close", onClose);

this.cleanupFn.push(function () {
transport.removeListener("error", onError);
transport.removeListener("ready", onReady);
transport.removeListener("packet", onPacket);
transport.removeListener("drain", onDrain);
transport.removeListener("close", onClose);
Expand All @@ -245,8 +248,6 @@ export class Socket extends EventEmitter {
* @private
*/
private onDrain() {
this.flush();

if (this.sentCallbackFn.length > 0) {
debug("executing batch send callback");
const seqFn = this.sentCallbackFn.shift();
Expand Down
3 changes: 2 additions & 1 deletion lib/transports-uws/polling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export class Polling extends Transport {
res.onAborted(onClose);

this.writable = true;
this.emit("drain");
this.emit("ready");

// if we're still writable but had a pending close, trigger an empty send
if (this.writable && this.shouldClose) {
Expand Down Expand Up @@ -291,6 +291,7 @@ export class Polling extends Transport {
debug('writing "%s"', data);
this.doWrite(data, options, () => {
this.req.cleanup();
this.emit("drain");
});
}

Expand Down
3 changes: 2 additions & 1 deletion lib/transports-uws/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ export class WebSocket extends Transport {
this.socket.send(data, isBinary, compress);

if (isLast) {
this.writable = true;
this.emit("drain");
this.writable = true;
this.emit("ready");
}
};

Expand Down
3 changes: 2 additions & 1 deletion lib/transports/polling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export class Polling extends Transport {
req.on("close", onClose);

this.writable = true;
this.emit("drain");
this.emit("ready");

// if we're still writable but had a pending close, trigger an empty send
if (this.writable && this.shouldClose) {
Expand Down Expand Up @@ -258,6 +258,7 @@ export class Polling extends Transport {
debug('writing "%s"', data);
this.doWrite(data, options, () => {
this.req.cleanup();
this.emit("drain");
});
}

Expand Down
3 changes: 2 additions & 1 deletion lib/transports/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ export class WebSocket extends Transport {
if (err) {
this.onError("write error", err.stack);
} else {
this.writable = true;
this.emit("drain");
this.writable = true;
this.emit("ready");
}
};

Expand Down
3 changes: 2 additions & 1 deletion lib/transports/webtransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ export class WebTransport extends Transport {
debug("error while writing: %s", e.message);
}

this.writable = true;
this.emit("drain");
this.writable = true;
this.emit("ready");
}

doClose(fn) {
Expand Down
2 changes: 1 addition & 1 deletion lib/userver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export class uServer extends BaseServer {
const transport = ws.getUserData().transport;
transport.socket = ws;
transport.writable = true;
transport.emit("drain");
transport.emit("ready");
},
message: (ws, message, isBinary) => {
ws.getUserData().transport.onData(
Expand Down
23 changes: 23 additions & 0 deletions test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -2701,6 +2701,29 @@ describe("server", () => {
});
});

it("should execute when message sent during polling upgrade window", (done) => {
const engine = listen((port) => {
const socket = new ClientSocket(`ws://localhost:${port}`, {
transports: ["polling", "websocket"],
});

const partialDone = createPartialDone(() => {
engine.httpServer?.close();
socket.close();
done();
}, 2);

engine.on("connection", (conn) => {
conn.on("upgrading", () => {
conn.send("a", partialDone);
});
});
socket.on("open", () => {
socket.on("message", partialDone);
});
});
});

it("should execute when message sent (websocket)", (done) => {
const engine = listen({ allowUpgrades: false }, (port) => {
const socket = new ClientSocket(`ws://localhost:${port}`, {
Expand Down

0 comments on commit 362bc78

Please sign in to comment.