Skip to content

Commit

Permalink
Adds frame max negotiation
Browse files Browse the repository at this point in the history
  • Loading branch information
magne authored and icappello committed Dec 19, 2023
1 parent c08b348 commit 5df253d
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 6 deletions.
20 changes: 16 additions & 4 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {
PublishErrorListener,
ResponseDecoder,
} from "./response_decoder"
import { removeFrom } from "./util"
import { DEFAULT_MAX_FRAME_SIZE, DEFAULT_UNLIMITED_FRAME_SIZE, removeFrom } from "./util"
import { WaitingResponse } from "./waiting_response"
import { SubscribeResponse } from "./responses/subscribe_response"
import { TuneResponse } from "./responses/tune_response"
Expand Down Expand Up @@ -62,7 +62,7 @@ export class Connection {
private consumers = new Map<number, Consumer>()
private compressions = new Map<CompressionType, Compression>()

constructor(private readonly logger: Logger) {
constructor(private readonly logger: Logger, private frameMax: number = DEFAULT_MAX_FRAME_SIZE) {
this.heartbeat = new Heartbeat(this, this.logger)
this.compressions.set(CompressionType.None, NoneCompression.create())
this.compressions.set(CompressionType.Gzip, GzipCompression.create())
Expand All @@ -89,7 +89,7 @@ export class Connection {
}

static connect(params: ConnectionParams, logger?: Logger): Promise<Connection> {
return new Connection(logger ?? new NullLogger()).start(params)
return new Connection(logger ?? new NullLogger(), params.frameMax).start(params)
}

public start(params: ConnectionParams): Promise<Connection> {
Expand Down Expand Up @@ -235,6 +235,10 @@ export class Connection {
return this.consumers.size
}

public savedFrameMax() {
return this.frameMax
}

public send(cmd: Request): Promise<void> {
return new Promise((res, rej) => {
const body = cmd.toBuffer()
Expand Down Expand Up @@ -326,7 +330,8 @@ export class Connection {
const heartbeat = extractHeartbeatInterval(heartbeatInterval, tuneResponse)

return new Promise((res, rej) => {
const request = new TuneRequest({ frameMax: tuneResponse.frameMax, heartbeat })
this.frameMax = this.calculateFrameMaxSizeFrom(tuneResponse.frameMax)
const request = new TuneRequest({ frameMax: this.frameMax, heartbeat })
this.socket.write(request.toBuffer(), (err) => {
this.logger.debug(`Write COMPLETED for cmd TUNE: ${inspect(tuneResponse)} - err: ${err}`)
return err ? rej(err) : res({ heartbeat })
Expand Down Expand Up @@ -463,6 +468,13 @@ export class Connection {
response.messages.map((x) => consumer.handle(x))
})
}

private calculateFrameMaxSizeFrom(tuneResponseFrameMax: number) {
if (this.frameMax === DEFAULT_UNLIMITED_FRAME_SIZE && tuneResponseFrameMax === DEFAULT_UNLIMITED_FRAME_SIZE) {
return 0
}
return Math.max(1, Math.min(this.frameMax, tuneResponseFrameMax))
}
}

export type ListenersParams = {
Expand Down
3 changes: 3 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ export function range(count: number): number[] {
}
return ret
}

export const DEFAULT_MAX_FRAME_SIZE = 1048576
export const DEFAULT_UNLIMITED_FRAME_SIZE = 0
38 changes: 38 additions & 0 deletions test/e2e/connect_frame_size_negotiation.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { expect } from "chai"
import { createConnection } from "../support/fake_data"
import { Rabbit } from "../support/rabbit"
import { eventually, username, password } from "../support/util"

describe("connect frame size negotiation", () => {
const rabbit = new Rabbit(username, password)

it("using 65536 as frameMax", async () => {
const frameMax = 65536

const connection = await createConnection(username, password, undefined, frameMax)

await eventually(async () => {
expect(connection.savedFrameMax()).lte(frameMax)
expect(await rabbit.getConnections()).lengthOf(1)
}, 5000)
try {
await connection.close()
await rabbit.closeAllConnections()
} catch (e) {}
}).timeout(10000)

it("using 1024 as frameMax", async () => {
const frameMax = 1024

const connection = await createConnection(username, password, undefined, frameMax)

await eventually(async () => {
expect(connection.savedFrameMax()).lte(frameMax)
expect(await rabbit.getConnections()).lengthOf(1)
}, 5000)
try {
await connection.close()
await rabbit.closeAllConnections()
} catch (e) {}
}).timeout(10000)
})
4 changes: 2 additions & 2 deletions test/support/fake_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ export async function createPublisher(streamName: string, connection: Connection
return publisher
}

export function createConnection(username: string, password: string, listeners?: ListenersParams) {
export function createConnection(username: string, password: string, listeners?: ListenersParams, frameMax?: number) {
return connect({
hostname: "localhost",
port: 5552,
username,
password,
vhost: "/",
frameMax: 0, // not used
frameMax: frameMax ?? 0, // not used
heartbeat: 0,
listeners: listeners,
})
Expand Down

0 comments on commit 5df253d

Please sign in to comment.