From aaaf8d5b6d19ecac9eeb519066e19b5daf2b6456 Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Wed, 6 Dec 2023 16:14:57 +0100 Subject: [PATCH 01/14] add: performance test as subproject, run it in CI --- performance_test/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/performance_test/index.ts b/performance_test/index.ts index fb0ae46c..c84ee2ae 100644 --- a/performance_test/index.ts +++ b/performance_test/index.ts @@ -24,6 +24,7 @@ function parseArgs(args) { const zipped = shorterArray.map((_, i) => [a[i], +b[i]] as [string, number]) return zipped } + const orderedNamedArgs = ["maxMessages", "messageSize"] const defaultNamedArgs = { maxMessages: 100000, From cc09748fd65945951178264e6f344d53e1b19c82 Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Wed, 6 Dec 2023 16:32:17 +0100 Subject: [PATCH 02/14] fix: producer interface after rebase --- performance_test/package-lock.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/performance_test/package-lock.json b/performance_test/package-lock.json index 94522fc4..7d7cff2b 100644 --- a/performance_test/package-lock.json +++ b/performance_test/package-lock.json @@ -1,11 +1,11 @@ { - "name": "example", + "name": "perftest", "version": "1.0.0", "lockfileVersion": 2, "requires": true, "packages": { "": { - "name": "example", + "name": "perftest", "version": "1.0.0", "license": "ISC", "dependencies": { From 17072d364ce74a336578aef7a41fd64b352fdac8 Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Tue, 19 Dec 2023 17:09:45 +0100 Subject: [PATCH 03/14] fix: improve readability --- performance_test/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/performance_test/index.ts b/performance_test/index.ts index c84ee2ae..fb0ae46c 100644 --- a/performance_test/index.ts +++ b/performance_test/index.ts @@ -24,7 +24,6 @@ function parseArgs(args) { const zipped = shorterArray.map((_, i) => [a[i], +b[i]] as [string, number]) return zipped } - const orderedNamedArgs = ["maxMessages", "messageSize"] const defaultNamedArgs = { maxMessages: 100000, From a502a379cd7942d98358982c50c3e1d6b89f69f9 Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Mon, 11 Dec 2023 14:35:26 +0100 Subject: [PATCH 04/14] add message buffer in producer --- package-lock.json | 41 +++++++ package.json | 2 + performance_test/index.ts | 5 +- performance_test/package-lock.json | 4 + performance_test/package.json | 3 +- performance_test/perf_test_producer.ts | 7 +- src/connection.ts | 5 + src/producer.ts | 111 ++++++++++++++---- src/requests/abstract_request.ts | 5 +- src/requests/frame_size_exception.ts | 1 + src/requests/publish_request.ts | 10 +- .../sub_entry_batch_publish_request.ts | 15 ++- test/e2e/query_publisher_sequence.test.ts | 1 + test/unit/producer.test.ts | 72 +++++++++++- 14 files changed, 239 insertions(+), 43 deletions(-) create mode 100644 src/requests/frame_size_exception.ts diff --git a/package-lock.json b/package-lock.json index 43b01d1e..2bd8fa8b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,12 +12,14 @@ "@tsconfig/node16": "^1.0.3", "@types/amqplib": "^0.10.1", "@types/chai": "^4.3.4", + "@types/chai-as-promised": "^7.1.8", "@types/mocha": "^10.0.1", "@types/node": "^16.18.11", "@typescript-eslint/eslint-plugin": "^5.50.0", "@typescript-eslint/parser": "^5.50.0", "amqplib": "^0.10.3", "chai": "^4.3.7", + "chai-as-promised": "^7.1.1", "cspell": "^6.21.0", "eslint": "^8.33.0", "eslint-config-prettier": "^8.6.0", @@ -787,6 +789,15 @@ "integrity": "sha512-KnRanxnpfpjUTqTCXslZSEdLfXExwgNxYPdiO2WGUj8+HDjFi8R3k5RVKPeSCzLjCcshCAtVO2QBbVuAV4kTnw==", "dev": true }, + "node_modules/@types/chai-as-promised": { + "version": "7.1.8", + "resolved": "https://registry.npmjs.org/@types/chai-as-promised/-/chai-as-promised-7.1.8.tgz", + "integrity": "sha512-ThlRVIJhr69FLlh6IctTXFkmhtP3NpMZ2QGq69StYLyKZFp/HOp1VdKZj7RvfNWYYcJ1xlbLGLLWj1UvP5u/Gw==", + "dev": true, + "dependencies": { + "@types/chai": "*" + } + }, "node_modules/@types/http-cache-semantics": { "version": "4.0.1", "resolved": "https://registry.npmjs.org/@types/http-cache-semantics/-/http-cache-semantics-4.0.1.tgz", @@ -1687,6 +1698,18 @@ "node": ">=4" } }, + "node_modules/chai-as-promised": { + "version": "7.1.1", + "resolved": "https://registry.npmjs.org/chai-as-promised/-/chai-as-promised-7.1.1.tgz", + "integrity": "sha512-azL6xMoi+uxu6z4rhWQ1jbdUhOMhis2PvscD/xjLqNMkv3BPPp2JyyuTHOrf9BOosGpNQ11v6BKv/g57RXbiaA==", + "dev": true, + "dependencies": { + "check-error": "^1.0.2" + }, + "peerDependencies": { + "chai": ">= 2.1.2 < 5" + } + }, "node_modules/chalk": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", @@ -6116,6 +6139,15 @@ "integrity": "sha512-KnRanxnpfpjUTqTCXslZSEdLfXExwgNxYPdiO2WGUj8+HDjFi8R3k5RVKPeSCzLjCcshCAtVO2QBbVuAV4kTnw==", "dev": true }, + "@types/chai-as-promised": { + "version": "7.1.8", + "resolved": "https://registry.npmjs.org/@types/chai-as-promised/-/chai-as-promised-7.1.8.tgz", + "integrity": "sha512-ThlRVIJhr69FLlh6IctTXFkmhtP3NpMZ2QGq69StYLyKZFp/HOp1VdKZj7RvfNWYYcJ1xlbLGLLWj1UvP5u/Gw==", + "dev": true, + "requires": { + "@types/chai": "*" + } + }, "@types/http-cache-semantics": { "version": "4.0.1", "resolved": "https://registry.npmjs.org/@types/http-cache-semantics/-/http-cache-semantics-4.0.1.tgz", @@ -6716,6 +6748,15 @@ "type-detect": "^4.0.5" } }, + "chai-as-promised": { + "version": "7.1.1", + "resolved": "https://registry.npmjs.org/chai-as-promised/-/chai-as-promised-7.1.1.tgz", + "integrity": "sha512-azL6xMoi+uxu6z4rhWQ1jbdUhOMhis2PvscD/xjLqNMkv3BPPp2JyyuTHOrf9BOosGpNQ11v6BKv/g57RXbiaA==", + "dev": true, + "requires": { + "check-error": "^1.0.2" + } + }, "chalk": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", diff --git a/package.json b/package.json index 8c040d66..8e86a386 100644 --- a/package.json +++ b/package.json @@ -31,12 +31,14 @@ "@tsconfig/node16": "^1.0.3", "@types/amqplib": "^0.10.1", "@types/chai": "^4.3.4", + "@types/chai-as-promised": "^7.1.8", "@types/mocha": "^10.0.1", "@types/node": "^16.18.11", "@typescript-eslint/eslint-plugin": "^5.50.0", "@typescript-eslint/parser": "^5.50.0", "amqplib": "^0.10.3", "chai": "^4.3.7", + "chai-as-promised": "^7.1.1", "cspell": "^6.21.0", "eslint": "^8.33.0", "eslint-config-prettier": "^8.6.0", diff --git a/performance_test/index.ts b/performance_test/index.ts index fb0ae46c..9fb9c408 100644 --- a/performance_test/index.ts +++ b/performance_test/index.ts @@ -67,8 +67,5 @@ async function main() { } main() - .then((_v) => { - logger.info(`Ending...`) - setTimeout(() => process.exit(0), 1000) - }) + .then((_v) => setTimeout(() => process.exit(0), 1000)) .catch((res) => logger.error("ERROR ", res)) diff --git a/performance_test/package-lock.json b/performance_test/package-lock.json index 7d7cff2b..3d0e54b0 100644 --- a/performance_test/package-lock.json +++ b/performance_test/package-lock.json @@ -27,12 +27,14 @@ "@tsconfig/node16": "^1.0.3", "@types/amqplib": "^0.10.1", "@types/chai": "^4.3.4", + "@types/chai-as-promised": "^7.1.8", "@types/mocha": "^10.0.1", "@types/node": "^16.18.11", "@typescript-eslint/eslint-plugin": "^5.50.0", "@typescript-eslint/parser": "^5.50.0", "amqplib": "^0.10.3", "chai": "^4.3.7", + "chai-as-promised": "^7.1.1", "cspell": "^6.21.0", "eslint": "^8.33.0", "eslint-config-prettier": "^8.6.0", @@ -702,12 +704,14 @@ "@tsconfig/node16": "^1.0.3", "@types/amqplib": "^0.10.1", "@types/chai": "^4.3.4", + "@types/chai-as-promised": "^7.1.8", "@types/mocha": "^10.0.1", "@types/node": "^16.18.11", "@typescript-eslint/eslint-plugin": "^5.50.0", "@typescript-eslint/parser": "^5.50.0", "amqplib": "^0.10.3", "chai": "^4.3.7", + "chai-as-promised": "^7.1.1", "cspell": "^6.21.0", "eslint": "^8.33.0", "eslint-config-prettier": "^8.6.0", diff --git a/performance_test/package.json b/performance_test/package.json index 383b8ecf..21f2198f 100644 --- a/performance_test/package.json +++ b/performance_test/package.json @@ -5,7 +5,8 @@ "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1", - "perftest": "ts-node ./index.ts" + "perftest": "ts-node ./index.ts", + "perftest-reset": "cd .. && npm run build && cd - && npm install --force && ts-node ./index.ts 10000000" }, "author": "", "license": "ISC", diff --git a/performance_test/perf_test_producer.ts b/performance_test/perf_test_producer.ts index c2eed4f8..ed99f0e4 100644 --- a/performance_test/perf_test_producer.ts +++ b/performance_test/perf_test_producer.ts @@ -35,14 +35,17 @@ export class PerfTestProducer { this.displayTimer = setInterval(() => { this.displayMetrics() this.metrics.setStart() - }, 1000) + }, 500) await this.send(publisher) + this.displayMetrics(true) + + return true } private displayMetrics(stop: boolean = false) { const metrics = { ...this.metrics.getMetrics(), total: this.ctr } - this.logger.info(`${new Date().toISOString()} - ${inspect(metrics)}`) + this.logger.info(`${inspect(metrics)}`) if (stop && this.displayTimer) { clearInterval(this.displayTimer) } diff --git a/src/connection.ts b/src/connection.ts index a2fe38db..c79c074c 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -198,6 +198,9 @@ export class Connection { publisherId: publisherId, publisherRef: params.publisherRef, boot: params.boot, + maxFrameSize: params.maxFrameSize, + maxChunkLength: params.maxChunkLength, + logger: this.logger, }) this.logger.info( `New producer created with stream name ${params.stream}, publisher id ${publisherId} and publisher reference ${params.publisherRef}` @@ -529,6 +532,8 @@ export interface DeclarePublisherParams { stream: string publisherRef?: string boot?: boolean + maxFrameSize?: number + maxChunkLength?: number } export interface DeclareConsumerParams { diff --git a/src/producer.ts b/src/producer.ts index f0e1e045..bb51c527 100644 --- a/src/producer.ts +++ b/src/producer.ts @@ -1,6 +1,10 @@ +import { inspect } from "util" +import { messageSize } from "./amqp10/encoder" import { CompressionType } from "./compression" import { Connection } from "./connection" -import { PublishRequest } from "./requests/publish_request" +import { Logger } from "./logger" +import { FrameSizeException } from "./requests/frame_size_exception" +import { PublishRequest, PublishRequestMessage } from "./requests/publish_request" import { SubEntryBatchPublishRequest } from "./requests/sub_entry_batch_publish_request" import { PublishConfirmResponse } from "./responses/publish_confirm_response" import { PublishErrorResponse } from "./responses/publish_error_response" @@ -50,8 +54,9 @@ interface MessageOptions { } export interface Producer { - send(message: Buffer, opts?: MessageOptions): Promise - basicSend(publishingId: bigint, content: Buffer, opts?: MessageOptions): Promise + send(message: Buffer, opts?: MessageOptions): Promise + basicSend(publishingId: bigint, content: Buffer, opts?: MessageOptions): Promise + flush(): Promise sendSubEntries(messages: Message[], compressionType?: CompressionType): Promise on(eventName: "publish_confirm", cb: PublishConfirmCallback): void getLastPublishingId(): Promise @@ -67,6 +72,11 @@ export class StreamProducer implements Producer { protected publisherRef: string private boot: boolean private publishingId: bigint + private maxFrameSize: number + private queue: PublishRequestMessage[] + private scheduled: NodeJS.Immediate | null + private logger: Logger + private maxChunkLength: number constructor(params: { connection: Connection @@ -74,6 +84,9 @@ export class StreamProducer implements Producer { publisherId: number publisherRef?: string boot?: boolean + maxFrameSize?: number + maxChunkLength?: number + logger: Logger }) { this.connection = params.connection this.stream = params.stream @@ -81,6 +94,11 @@ export class StreamProducer implements Producer { this.publisherRef = params.publisherRef || "" this.boot = params.boot || false this.publishingId = params.boot ? -1n : 0n + this.maxFrameSize = params.maxFrameSize || 1048576 + this.queue = [] + this.scheduled = null + this.logger = params.logger + this.maxChunkLength = params.maxChunkLength || 100 } async send(message: Buffer, opts: MessageOptions = {}) { @@ -89,31 +107,17 @@ export class StreamProducer implements Producer { } this.publishingId = this.publishingId + 1n - return this.connection.send( - new PublishRequest({ - publisherId: this.publisherId, - messages: [ - { - publishingId: this.publishingId, - message: { - content: message, - messageProperties: opts.messageProperties, - applicationProperties: opts.applicationProperties, - messageAnnotations: opts.messageAnnotations, - }, - }, - ], - }) - ) + return this.basicSend(this.publishingId, message, opts) } basicSend(publishingId: bigint, content: Buffer, opts: MessageOptions = {}) { - return this.connection.send( - new PublishRequest({ - publisherId: this.publisherId, - messages: [{ publishingId: publishingId, message: { content: content, ...opts } }], - }) - ) + const msg = { publishingId: publishingId, message: { content: content, ...opts } } + return this.enqueue(msg) + } + + async flush() { + await this.sendChunk() + return true } async sendSubEntries(messages: Message[], compressionType: CompressionType = CompressionType.None) { @@ -122,6 +126,7 @@ export class StreamProducer implements Producer { publisherId: this.publisherId, publishingId: this.publishingId, compression: this.connection.getCompression(compressionType), + maxFrameSize: this.maxFrameSize, messages: messages, }) ) @@ -141,4 +146,60 @@ export class StreamProducer implements Producer { get ref() { return this.publisherRef } + + private async enqueue(publishRequestMessage: PublishRequestMessage) { + this.checkMessageSize(publishRequestMessage) + const sendCycleNeeded = this.add(publishRequestMessage) + let sent = false + if (sendCycleNeeded) { + await this.sendChunk() + sent = true + } + this.scheduleIfNeeded() + + return sent + } + + private checkMessageSize(publishRequestMessage: PublishRequestMessage) { + const computedSize = messageSize(publishRequestMessage.message) + if (computedSize > this.maxFrameSize) { + throw new FrameSizeException(`Message too big to fit in one frame: ${computedSize}`) + } + + return true + } + + private async sendChunk() { + const chunk = this.popChunk() + if (chunk.length > 0) { + await this.connection.send( + new PublishRequest({ + publisherId: this.publisherId, + messages: chunk, + maxFrameSize: this.maxFrameSize, + }) + ) + } + } + + private scheduleIfNeeded() { + if (this.queue.length > 0 && this.scheduled === null) { + this.scheduled = setImmediate(() => { + this.sendChunk() + .then((_v) => _v) + .catch((err) => this.logger.error(`Error in send: ${inspect(err)}`)) + .finally(() => this.scheduleIfNeeded()) + }) + } + } + + private add(message: PublishRequestMessage) { + this.queue.push(message) + + return this.queue.length >= this.maxChunkLength + } + + private popChunk() { + return this.queue.splice(0, this.maxChunkLength) + } } diff --git a/src/requests/abstract_request.ts b/src/requests/abstract_request.ts index e96792c7..fc992767 100644 --- a/src/requests/abstract_request.ts +++ b/src/requests/abstract_request.ts @@ -70,9 +70,10 @@ export abstract class AbstractRequest implements Request { abstract get responseKey(): number readonly version = 1 + constructor(private readonly writeBufferSize: number = 1048576) {} + toBuffer(correlationId?: number): Buffer { - const writeBufferSize = 65536 - const dataWriter = new BufferDataWriter(Buffer.alloc(writeBufferSize), 4) + const dataWriter = new BufferDataWriter(Buffer.alloc(this.writeBufferSize), 4) dataWriter.writeUInt16(this.key) dataWriter.writeUInt16(this.version) if (typeof correlationId === "number") { diff --git a/src/requests/frame_size_exception.ts b/src/requests/frame_size_exception.ts new file mode 100644 index 00000000..3a3784cf --- /dev/null +++ b/src/requests/frame_size_exception.ts @@ -0,0 +1 @@ +export class FrameSizeException extends Error {} diff --git a/src/requests/publish_request.ts b/src/requests/publish_request.ts index 619f07f6..7d155fbd 100644 --- a/src/requests/publish_request.ts +++ b/src/requests/publish_request.ts @@ -3,9 +3,15 @@ import { Message } from "../producer" import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" +export type PublishRequestMessage = { + publishingId: bigint + message: Message +} + interface PublishRequestParams { publisherId: number - messages: Array<{ publishingId: bigint; message: Message }> + maxFrameSize: number + messages: Array } export class PublishRequest extends AbstractRequest { @@ -13,7 +19,7 @@ export class PublishRequest extends AbstractRequest { readonly responseKey = -1 constructor(private params: PublishRequestParams) { - super() + super(params.maxFrameSize) } protected writeContent(writer: DataWriter): void { diff --git a/src/requests/sub_entry_batch_publish_request.ts b/src/requests/sub_entry_batch_publish_request.ts index 6a57b777..48310a62 100644 --- a/src/requests/sub_entry_batch_publish_request.ts +++ b/src/requests/sub_entry_batch_publish_request.ts @@ -1,5 +1,5 @@ import { amqpEncode, messageSize } from "../amqp10/encoder" -import { Compression } from "../compression" +import { Compression, CompressionType } from "../compression" import { Message } from "../producer" import { AbstractRequest, BufferDataWriter } from "./abstract_request" import { DataWriter } from "./data_writer" @@ -8,6 +8,7 @@ interface SubEntryBatchPublishRequestParams { publisherId: number publishingId: bigint compression: Compression + maxFrameSize: number messages: Message[] } @@ -16,20 +17,20 @@ export class SubEntryBatchPublishRequest extends AbstractRequest { readonly responseKey = -1 constructor(private params: SubEntryBatchPublishRequestParams) { - super() + super(params.maxFrameSize) } protected writeContent(writer: DataWriter): void { - const { compression, messages, publishingId, publisherId } = this.params + const { compression, messages, publishingId, publisherId, maxFrameSize } = this.params writer.writeUInt8(publisherId) // number of root messages. In this case will be always 1 writer.writeUInt32(1) writer.writeUInt64(publishingId) - writer.writeByte(0x80 | (compression.getType() << 4)) + writer.writeByte(this.encodeCompressionType(compression.getType())) writer.writeUInt16(messages.length) writer.writeUInt32(messages.reduce((sum, message) => sum + 4 + messageSize(message), 0)) - const data = new BufferDataWriter(Buffer.alloc(1024), 0) + const data = new BufferDataWriter(Buffer.alloc(maxFrameSize), 0) messages.forEach((m) => amqpEncode(data, m)) const compressedData = compression.compress(data.toBuffer()) @@ -37,4 +38,8 @@ export class SubEntryBatchPublishRequest extends AbstractRequest { writer.writeUInt32(compressedData.length) writer.writeData(compressedData) } + + private encodeCompressionType(compressionType: CompressionType) { + return 0x80 | (compressionType << 4) + } } diff --git a/test/e2e/query_publisher_sequence.test.ts b/test/e2e/query_publisher_sequence.test.ts index fb2bd291..bd7634ea 100644 --- a/test/e2e/query_publisher_sequence.test.ts +++ b/test/e2e/query_publisher_sequence.test.ts @@ -33,6 +33,7 @@ describe("query publisher sequence", () => { await publisher.basicSend(2n, Buffer.from(`test${randomUUID()}`)) await publisher.basicSend(3n, Buffer.from(`test${randomUUID()}`)) await publisher.basicSend(4n, Buffer.from(`test${randomUUID()}`)) + await publisher.flush() const lastPublishingId = await publisher.getLastPublishingId() diff --git a/test/unit/producer.test.ts b/test/unit/producer.test.ts index 5e9c8631..f8c254ba 100644 --- a/test/unit/producer.test.ts +++ b/test/unit/producer.test.ts @@ -1,8 +1,11 @@ -import { expect } from "chai" +import { expect, use as chaiUse } from "chai" import { randomUUID } from "crypto" -import { connect } from "../../src" +import { Connection, connect } from "../../src" import { Rabbit } from "../support/rabbit" import { password, username } from "../support/util" +import { FrameSizeException } from "../../src/requests/frame_size_exception" +import chaiAsPromised from "chai-as-promised" +chaiUse(chaiAsPromised) describe("Producer", () => { const rabbit = new Rabbit(username, password) @@ -29,6 +32,7 @@ describe("Producer", () => { const oldPublisher = await oldConnection.declarePublisher({ stream: testStreamName, publisherRef }) const oldMessages = [...Array(3).keys()] await Promise.all(oldMessages.map(() => oldPublisher.send(Buffer.from(`test${randomUUID()}`)))) + await oldPublisher.flush() await oldConnection.close() const newConnection = await connect({ hostname: "localhost", @@ -42,6 +46,7 @@ describe("Producer", () => { const newPublisher = await newConnection.declarePublisher({ stream: testStreamName, publisherRef, boot: true }) await newPublisher.send(Buffer.from(`test${randomUUID()}`)) + await newPublisher.flush() expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length) + 1n) await newConnection.close() @@ -60,6 +65,7 @@ describe("Producer", () => { const oldPublisher = await oldConnection.declarePublisher({ stream: testStreamName, publisherRef }) const oldMessages = [...Array(3).keys()] await Promise.all(oldMessages.map(() => oldPublisher.send(Buffer.from(`test${randomUUID()}`)))) + await oldPublisher.flush() await oldConnection.close() const newConnection = await connect({ hostname: "localhost", @@ -77,4 +83,66 @@ describe("Producer", () => { expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length)) await newConnection.close() }).timeout(10000) + + describe("Send operation limits", () => { + let connection: Connection | null = null + + beforeEach(async () => { + connection = await connect({ + hostname: "localhost", + port: 5552, + username, + password, + vhost: "/", + frameMax: 0, + heartbeat: 0, + }) + }) + + afterEach(async () => { + await connection!.close() + }) + it("if a message is too big an exception is raised when sending it", async () => { + const maxFrameSize = 1000 + const publisher = await connection!.declarePublisher({ + stream: testStreamName, + publisherRef, + maxFrameSize: maxFrameSize, + }) + const msg = Buffer.from(Array.from(Array(maxFrameSize + 1).keys()).map((_v) => 1)) + + return expect(publisher.send(msg, {})).to.be.rejectedWith(FrameSizeException) + }) + + it("if chunk size is not reached, then the message is enqueued", async () => { + const maxFrameSize = 1000 + const chunkSize = 100 + const publisher = await connection!.declarePublisher({ + stream: testStreamName, + publisherRef, + maxFrameSize: maxFrameSize, + maxChunkLength: chunkSize, + }) + const msg = Buffer.from([1]) + + const result = await publisher.send(msg, {}) + + expect(result).is.false + }) + it("if max queue length is reached, then the chunk is sent immediately", async () => { + const maxFrameSize = 1000 + const queueLength = 2 + const publisher = await connection!.declarePublisher({ + stream: testStreamName, + publisherRef, + maxFrameSize: maxFrameSize, + maxChunkLength: queueLength, + }) + const msgs = [Buffer.from([1]), Buffer.from([2])] + + const result = await Promise.all(msgs.map((msg) => publisher.send(msg, {}))) + + expect(result).eql([false, true]) + }) + }) }) From 22868f594bd7aec09f324b4484c28a19b74df92c Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Mon, 11 Dec 2023 17:41:57 +0100 Subject: [PATCH 05/14] add: buffer data in writer grows in size as needed, with max hard limit --- src/requests/abstract_request.ts | 40 +++++++++++++++++-- .../sub_entry_batch_publish_request.ts | 3 +- src/responses/credit_response.ts | 3 +- src/responses/deliver_response.ts | 3 +- src/responses/metadata_update_response.ts | 3 +- src/responses/publish_confirm_response.ts | 3 +- src/responses/publish_error_response.ts | 3 +- src/responses/tune_response.ts | 3 +- test/unit/buffer_data_writer.test.ts | 40 +++++++++++++++++++ test/unit/response_decoder.test.ts | 3 +- 10 files changed, 93 insertions(+), 11 deletions(-) create mode 100644 test/unit/buffer_data_writer.test.ts diff --git a/src/requests/abstract_request.ts b/src/requests/abstract_request.ts index fc992767..04169857 100644 --- a/src/requests/abstract_request.ts +++ b/src/requests/abstract_request.ts @@ -4,7 +4,7 @@ import { Request } from "./request" export class BufferDataWriter implements DataWriter { private _offset = 0 - constructor(private buffer: Buffer, startFrom: number) { + constructor(private readonly writeBufferMaxSize: number, private buffer: Buffer, startFrom: number) { this._offset = startFrom } @@ -17,6 +17,7 @@ export class BufferDataWriter implements DataWriter { } writeData(data: string | Buffer): void { + this.growIfNeeded(Buffer.byteLength(data, "utf-8")) if (Buffer.isBuffer(data)) { this._offset += data.copy(this.buffer, this._offset) return @@ -25,38 +26,56 @@ export class BufferDataWriter implements DataWriter { } writeByte(data: number): void { + const bytes = 1 + this.growIfNeeded(bytes) this._offset = this.buffer.writeUInt8(data, this._offset) } writeInt8(data: number) { + const bytes = 1 + this.growIfNeeded(bytes) this._offset = this.buffer.writeInt8(data, this._offset) } writeUInt8(data: number): void { + const bytes = 1 + this.growIfNeeded(bytes) this._offset = this.buffer.writeUInt8(data, this._offset) } writeUInt16(data: number) { + const bytes = 2 + this.growIfNeeded(bytes) this._offset = this.buffer.writeUInt16BE(data, this._offset) } writeUInt32(data: number): void { + const bytes = 4 + this.growIfNeeded(bytes) this._offset = this.buffer.writeUInt32BE(data, this._offset) } writeInt32(data: number): void { + const bytes = 4 + this.growIfNeeded(bytes) this._offset = this.buffer.writeInt32BE(data, this._offset) } writeUInt64(data: bigint): void { + const bytes = 8 + this.growIfNeeded(bytes) this._offset = this.buffer.writeBigUInt64BE(data, this._offset) } writeInt64(data: bigint): void { + const bytes = 8 + this.growIfNeeded(bytes) this._offset = this.buffer.writeBigInt64BE(data, this._offset) } writeString(data: string): void { + const bytes = 2 + this.growIfNeeded(bytes) this._offset = this.buffer.writeInt16BE(data.length, this._offset) this.writeData(data) } @@ -64,16 +83,31 @@ export class BufferDataWriter implements DataWriter { toBuffer(): Buffer { return this.buffer.slice(0, this._offset) } + + private growIfNeeded(additionalBytes: number) { + const maxRatio = 0.9 + if ((this._offset + additionalBytes) / this.buffer.length > maxRatio) { + this.growBuffer(additionalBytes) + } + } + + private growBuffer(requiredBytes: number) { + const newSize = Math.min(this.buffer.length * 2 + requiredBytes, this.writeBufferMaxSize) + const data = Buffer.from(this.buffer) + this.buffer = Buffer.alloc(newSize) + data.copy(this.buffer, 0) + } } export abstract class AbstractRequest implements Request { abstract get key(): number abstract get responseKey(): number readonly version = 1 - constructor(private readonly writeBufferSize: number = 1048576) {} + constructor(private readonly writeBufferMaxSize: number = 1048576) {} toBuffer(correlationId?: number): Buffer { - const dataWriter = new BufferDataWriter(Buffer.alloc(this.writeBufferSize), 4) + const initialBufferSize = 65536 + const dataWriter = new BufferDataWriter(this.writeBufferMaxSize, Buffer.alloc(initialBufferSize), 4) dataWriter.writeUInt16(this.key) dataWriter.writeUInt16(this.version) if (typeof correlationId === "number") { diff --git a/src/requests/sub_entry_batch_publish_request.ts b/src/requests/sub_entry_batch_publish_request.ts index 48310a62..3d1265c8 100644 --- a/src/requests/sub_entry_batch_publish_request.ts +++ b/src/requests/sub_entry_batch_publish_request.ts @@ -30,7 +30,8 @@ export class SubEntryBatchPublishRequest extends AbstractRequest { writer.writeUInt16(messages.length) writer.writeUInt32(messages.reduce((sum, message) => sum + 4 + messageSize(message), 0)) - const data = new BufferDataWriter(Buffer.alloc(maxFrameSize), 0) + const initialDataBufferSize = 65536 + const data = new BufferDataWriter(maxFrameSize, Buffer.alloc(initialDataBufferSize), 0) messages.forEach((m) => amqpEncode(data, m)) const compressedData = compression.compress(data.toBuffer()) diff --git a/src/responses/credit_response.ts b/src/responses/credit_response.ts index 963a70b8..424735ae 100644 --- a/src/responses/credit_response.ts +++ b/src/responses/credit_response.ts @@ -12,7 +12,8 @@ export class CreditResponse implements Response { } toBuffer(): Buffer { - const dw = new BufferDataWriter(Buffer.alloc(1024), 4) + const bufferSize = 1024 + const dw = new BufferDataWriter(bufferSize, Buffer.alloc(bufferSize), 4) dw.writeUInt16(CreditResponse.key) dw.writeUInt16(1) dw.writeUInt16(this.response.responseCode) diff --git a/src/responses/deliver_response.ts b/src/responses/deliver_response.ts index 8bbf4db2..8eac7eb7 100755 --- a/src/responses/deliver_response.ts +++ b/src/responses/deliver_response.ts @@ -13,7 +13,8 @@ export class DeliverResponse implements Response { } toBuffer(): Buffer { - const dw = new BufferDataWriter(Buffer.alloc(1024), 4) + const bufferSize = 1024 + const dw = new BufferDataWriter(bufferSize, Buffer.alloc(bufferSize), 4) dw.writeUInt16(DeliverResponse.key) dw.writeUInt16(1) dw.writeUInt8(this.response.subscriptionId) diff --git a/src/responses/metadata_update_response.ts b/src/responses/metadata_update_response.ts index 0b23b85a..a31648c3 100644 --- a/src/responses/metadata_update_response.ts +++ b/src/responses/metadata_update_response.ts @@ -12,7 +12,8 @@ export class MetadataUpdateResponse implements Response { } toBuffer(): Buffer { - const dw = new BufferDataWriter(Buffer.alloc(1024), 4) + const bufferSize = 1024 + const dw = new BufferDataWriter(bufferSize, Buffer.alloc(bufferSize), 4) dw.writeUInt16(MetadataUpdateResponse.key) dw.writeUInt16(1) dw.writeUInt16(this.response.metadataInfo.code) diff --git a/src/responses/publish_confirm_response.ts b/src/responses/publish_confirm_response.ts index 0410c11a..701abf14 100644 --- a/src/responses/publish_confirm_response.ts +++ b/src/responses/publish_confirm_response.ts @@ -15,7 +15,8 @@ export class PublishConfirmResponse implements Response { } toBuffer(): Buffer { - const dw = new BufferDataWriter(Buffer.alloc(1024), 4) + const bufferSize = 1024 + const dw = new BufferDataWriter(bufferSize, Buffer.alloc(bufferSize), 4) dw.writeUInt16(PublishConfirmResponse.key) dw.writeUInt16(1) dw.writeUInt8(this.publisherId) diff --git a/src/responses/publish_error_response.ts b/src/responses/publish_error_response.ts index d4dea0fd..a316c962 100644 --- a/src/responses/publish_error_response.ts +++ b/src/responses/publish_error_response.ts @@ -20,7 +20,8 @@ export class PublishErrorResponse implements Response { } toBuffer(): Buffer { - const dw = new BufferDataWriter(Buffer.alloc(1024), 4) + const bufferSize = 1024 + const dw = new BufferDataWriter(bufferSize, Buffer.alloc(bufferSize), 4) dw.writeUInt16(PublishErrorResponse.key) dw.writeUInt16(1) dw.writeUInt8(this.publisherId) diff --git a/src/responses/tune_response.ts b/src/responses/tune_response.ts index 8747641c..b132444e 100644 --- a/src/responses/tune_response.ts +++ b/src/responses/tune_response.ts @@ -11,7 +11,8 @@ export class TuneResponse implements Response { } toBuffer(): Buffer { - const dw = new BufferDataWriter(Buffer.alloc(1024), 4) + const bufferSize = 1024 + const dw = new BufferDataWriter(bufferSize, Buffer.alloc(bufferSize), 4) dw.writeUInt16(TuneResponse.key) dw.writeUInt16(1) dw.writeUInt32(this.response.frameMax) diff --git a/test/unit/buffer_data_writer.test.ts b/test/unit/buffer_data_writer.test.ts new file mode 100644 index 00000000..08929892 --- /dev/null +++ b/test/unit/buffer_data_writer.test.ts @@ -0,0 +1,40 @@ +import { expect } from "chai" +import { BufferDataWriter } from "../../src/requests/abstract_request" +describe("Buffer Data Writer functionalities", () => { + const bufferMaxSize = 1024 + const bufferInitialSize = 1 + const stringPayload = "a long string that requires the buffer to grow" + + it("allocate a functioning buffer data writer", () => { + const b = new BufferDataWriter(bufferMaxSize, Buffer.alloc(bufferInitialSize), 0) + b.writeByte(1) + + const result = b.toBuffer() + + expect(result).eql(Buffer.from([1])) + }) + + it("grow the buffer when needed", () => { + const b = new BufferDataWriter(bufferMaxSize, Buffer.alloc(bufferInitialSize), 0) + + b.writeString(stringPayload) + + const result = b.toBuffer() + const header = result.slice(0, 2) + const pl = result.slice(2) + expect(header).eql(Buffer.from([0, 46])) + expect(pl.length).eql(46) + expect(pl.toString()).eql(stringPayload) + }) + + it("the buffer max size is a hard limit", () => { + const maxSize = 32 + const b = new BufferDataWriter(maxSize, Buffer.alloc(bufferInitialSize), 0) + + b.writeString(stringPayload) + + const result = b.toBuffer() + const pl = result.slice(2) + expect(pl.toString()).eql("a long string that requires th") + }) +}) diff --git a/test/unit/response_decoder.test.ts b/test/unit/response_decoder.test.ts index 99063cc1..d5d022d8 100644 --- a/test/unit/response_decoder.test.ts +++ b/test/unit/response_decoder.test.ts @@ -54,7 +54,8 @@ describe("ResponseDecoder", () => { }) function createResponse(params: { key: number; correlationId?: number; responseCode?: number }): Buffer { - const dataWriter = new BufferDataWriter(Buffer.alloc(1024), 4) + const bufferSize = 1024 + const dataWriter = new BufferDataWriter(bufferSize, Buffer.alloc(bufferSize), 4) dataWriter.writeUInt16(params.key) dataWriter.writeUInt16(1) dataWriter.writeUInt32(params.correlationId || 101) From d8cbc17e1051e0d64b88eaff51fa8669130e9bb5 Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Mon, 11 Dec 2023 17:57:04 +0100 Subject: [PATCH 06/14] fix test: add maxFrameSize parameter in PublishRequeest instantiation --- test/unit/publish_request.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/unit/publish_request.test.ts b/test/unit/publish_request.test.ts index c59411f5..afce83f4 100644 --- a/test/unit/publish_request.test.ts +++ b/test/unit/publish_request.test.ts @@ -5,10 +5,11 @@ import { PublishRequest } from "../../src/requests/publish_request" describe("PublishRequest", () => { it("Produce a buffer for a long list of messages", () => { const publisherId = 1 + const maxFrameSize = 1024 const messages = [...Array(100).keys()].map((idx) => { return { publishingId: BigInt(idx), message: { content: Buffer.from(randomUUID()) } } }) - const pr = new PublishRequest({ publisherId, messages }) + const pr = new PublishRequest({ publisherId, messages, maxFrameSize }) const written = pr.toBuffer() From 5ceafc1308e01e7aac4ae42254fd7848e227a6d4 Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Tue, 12 Dec 2023 13:04:26 +0100 Subject: [PATCH 07/14] add test for scheduled flush of producer queue --- package-lock.json | 39 ++++++++++++++++++++++ package.json | 2 ++ performance_test/perf_test_producer.ts | 12 +++---- src/producer.ts | 3 +- test/unit/producer.test.ts | 45 +++++++++++++++++++------- 5 files changed, 82 insertions(+), 19 deletions(-) diff --git a/package-lock.json b/package-lock.json index 2bd8fa8b..7ccd1258 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,6 +13,7 @@ "@types/amqplib": "^0.10.1", "@types/chai": "^4.3.4", "@types/chai-as-promised": "^7.1.8", + "@types/chai-spies": "^1.0.6", "@types/mocha": "^10.0.1", "@types/node": "^16.18.11", "@typescript-eslint/eslint-plugin": "^5.50.0", @@ -20,6 +21,7 @@ "amqplib": "^0.10.3", "chai": "^4.3.7", "chai-as-promised": "^7.1.1", + "chai-spies": "^1.1.0", "cspell": "^6.21.0", "eslint": "^8.33.0", "eslint-config-prettier": "^8.6.0", @@ -798,6 +800,15 @@ "@types/chai": "*" } }, + "node_modules/@types/chai-spies": { + "version": "1.0.6", + "resolved": "https://registry.npmjs.org/@types/chai-spies/-/chai-spies-1.0.6.tgz", + "integrity": "sha512-xkk4HmhBB9OQeTAifa9MJ+6R5/Rq9+ungDe4JidZD+vqZVeiWZwc2i7/pd1ZKjyGlSBIQePoWdyUyFUGT0rv5w==", + "dev": true, + "dependencies": { + "@types/chai": "*" + } + }, "node_modules/@types/http-cache-semantics": { "version": "4.0.1", "resolved": "https://registry.npmjs.org/@types/http-cache-semantics/-/http-cache-semantics-4.0.1.tgz", @@ -1710,6 +1721,18 @@ "chai": ">= 2.1.2 < 5" } }, + "node_modules/chai-spies": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/chai-spies/-/chai-spies-1.1.0.tgz", + "integrity": "sha512-ikaUhQvQWchRYj2K54itFp3nrcxaFRpSDQxDlRzSn9aWgu9Pi7lD8yFxTso4WnQ39+WZ69oB/qOvqp+isJIIWA==", + "dev": true, + "engines": { + "node": ">= 4.0.0" + }, + "peerDependencies": { + "chai": "*" + } + }, "node_modules/chalk": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", @@ -6148,6 +6171,15 @@ "@types/chai": "*" } }, + "@types/chai-spies": { + "version": "1.0.6", + "resolved": "https://registry.npmjs.org/@types/chai-spies/-/chai-spies-1.0.6.tgz", + "integrity": "sha512-xkk4HmhBB9OQeTAifa9MJ+6R5/Rq9+ungDe4JidZD+vqZVeiWZwc2i7/pd1ZKjyGlSBIQePoWdyUyFUGT0rv5w==", + "dev": true, + "requires": { + "@types/chai": "*" + } + }, "@types/http-cache-semantics": { "version": "4.0.1", "resolved": "https://registry.npmjs.org/@types/http-cache-semantics/-/http-cache-semantics-4.0.1.tgz", @@ -6757,6 +6789,13 @@ "check-error": "^1.0.2" } }, + "chai-spies": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/chai-spies/-/chai-spies-1.1.0.tgz", + "integrity": "sha512-ikaUhQvQWchRYj2K54itFp3nrcxaFRpSDQxDlRzSn9aWgu9Pi7lD8yFxTso4WnQ39+WZ69oB/qOvqp+isJIIWA==", + "dev": true, + "requires": {} + }, "chalk": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", diff --git a/package.json b/package.json index 8e86a386..72c37752 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "@types/amqplib": "^0.10.1", "@types/chai": "^4.3.4", "@types/chai-as-promised": "^7.1.8", + "@types/chai-spies": "^1.0.6", "@types/mocha": "^10.0.1", "@types/node": "^16.18.11", "@typescript-eslint/eslint-plugin": "^5.50.0", @@ -39,6 +40,7 @@ "amqplib": "^0.10.3", "chai": "^4.3.7", "chai-as-promised": "^7.1.1", + "chai-spies": "^1.1.0", "cspell": "^6.21.0", "eslint": "^8.33.0", "eslint-config-prettier": "^8.6.0", diff --git a/performance_test/perf_test_producer.ts b/performance_test/perf_test_producer.ts index ed99f0e4..83877537 100644 --- a/performance_test/perf_test_producer.ts +++ b/performance_test/perf_test_producer.ts @@ -27,7 +27,7 @@ export class PerfTestProducer { const publisher = await this.connection.declarePublisher(this.publisherParams) publisher.on("publish_confirm", (err, confirmedIds) => { if (err) { - console.log(err) + this.logger.error(err) } this.metrics.addCounter("confirmed", confirmedIds.length) }) @@ -38,7 +38,6 @@ export class PerfTestProducer { }, 500) await this.send(publisher) - this.displayMetrics(true) return true } @@ -53,13 +52,14 @@ export class PerfTestProducer { private async send(publisher: Producer) { while (this.maxMessages === -1 || this.ctr < this.maxMessages) { - const nmsgs = this.maxMessages > 0 ? Math.min(this.maxChunkSize, this.maxMessages) : this.maxChunkSize - for (let index = 0; index < nmsgs; index++) { + const messageQuantity = this.maxMessages > 0 ? Math.min(this.maxChunkSize, this.maxMessages) : this.maxChunkSize + for (let index = 0; index < messageQuantity; index++) { await publisher.send(this.payload, {}) } - this.ctr = this.ctr + nmsgs - this.metrics.addCounter("published", nmsgs) + this.ctr = this.ctr + messageQuantity + this.metrics.addCounter("published", messageQuantity) } + this.displayMetrics(true) } public getDisplayTimer() { diff --git a/src/producer.ts b/src/producer.ts index bb51c527..7aff634c 100644 --- a/src/producer.ts +++ b/src/producer.ts @@ -185,7 +185,8 @@ export class StreamProducer implements Producer { private scheduleIfNeeded() { if (this.queue.length > 0 && this.scheduled === null) { this.scheduled = setImmediate(() => { - this.sendChunk() + this.scheduled = null + this.flush() .then((_v) => _v) .catch((err) => this.logger.error(`Error in send: ${inspect(err)}`)) .finally(() => this.scheduleIfNeeded()) diff --git a/test/unit/producer.test.ts b/test/unit/producer.test.ts index f8c254ba..c3c2b554 100644 --- a/test/unit/producer.test.ts +++ b/test/unit/producer.test.ts @@ -1,18 +1,21 @@ -import { expect, use as chaiUse } from "chai" +import { expect, spy, use as chaiUse } from "chai" import { randomUUID } from "crypto" import { Connection, connect } from "../../src" import { Rabbit } from "../support/rabbit" -import { password, username } from "../support/util" +import { eventually, password, username } from "../support/util" import { FrameSizeException } from "../../src/requests/frame_size_exception" import chaiAsPromised from "chai-as-promised" +import spies from "chai-spies" chaiUse(chaiAsPromised) +chaiUse(spies) describe("Producer", () => { const rabbit = new Rabbit(username, password) - const testStreamName = "test-stream" + let testStreamName = "" let publisherRef: string beforeEach(async () => { + testStreamName = `${randomUUID()}` publisherRef = randomUUID() await rabbit.createStream(testStreamName) }) @@ -85,10 +88,13 @@ describe("Producer", () => { }).timeout(10000) describe("Send operation limits", () => { - let connection: Connection | null = null + const maxFrameSize = 1000 + let writeConnection: Connection | null = null + let spySandbox: ChaiSpies.Sandbox | null = null beforeEach(async () => { - connection = await connect({ + spySandbox = spy.sandbox() + writeConnection = await connect({ hostname: "localhost", port: 5552, username, @@ -100,11 +106,11 @@ describe("Producer", () => { }) afterEach(async () => { - await connection!.close() + await writeConnection!.close() + spySandbox?.restore() }) it("if a message is too big an exception is raised when sending it", async () => { - const maxFrameSize = 1000 - const publisher = await connection!.declarePublisher({ + const publisher = await writeConnection!.declarePublisher({ stream: testStreamName, publisherRef, maxFrameSize: maxFrameSize, @@ -115,9 +121,8 @@ describe("Producer", () => { }) it("if chunk size is not reached, then the message is enqueued", async () => { - const maxFrameSize = 1000 const chunkSize = 100 - const publisher = await connection!.declarePublisher({ + const publisher = await writeConnection!.declarePublisher({ stream: testStreamName, publisherRef, maxFrameSize: maxFrameSize, @@ -130,9 +135,8 @@ describe("Producer", () => { expect(result).is.false }) it("if max queue length is reached, then the chunk is sent immediately", async () => { - const maxFrameSize = 1000 const queueLength = 2 - const publisher = await connection!.declarePublisher({ + const publisher = await writeConnection!.declarePublisher({ stream: testStreamName, publisherRef, maxFrameSize: maxFrameSize, @@ -144,5 +148,22 @@ describe("Producer", () => { expect(result).eql([false, true]) }) + + it("even if max queue length is not reached, the messages are eventually sent", async () => { + const queueLength = 10 + const messageQuantity = queueLength - 2 + const publisher = await writeConnection!.declarePublisher({ + stream: testStreamName, + publisherRef, + maxFrameSize: maxFrameSize, + maxChunkLength: queueLength, + }) + const msgs = Array.from(Array(messageQuantity).keys()).map((k) => Buffer.from([k])) + spySandbox?.on(publisher, "flush") + + await Promise.all(msgs.map((msg) => publisher.send(msg, {}))) + + await eventually(() => expect(publisher.flush).called) + }) }) }) From 9d30cddcc091d726711e95b292e64cf0b4534965 Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Wed, 13 Dec 2023 10:06:35 +0100 Subject: [PATCH 08/14] update: perftest forces NullLogger in connection object --- performance_test/index.ts | 4 +++- performance_test/package.json | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/performance_test/index.ts b/performance_test/index.ts index 9fb9c408..142f4422 100644 --- a/performance_test/index.ts +++ b/performance_test/index.ts @@ -18,6 +18,8 @@ const logger = createLogger({ transports: new transports.Console(), }) +const connLogger = undefined + function parseArgs(args) { const zip = (a: string[], b: string[]): [string, number][] => { const shorterArray = a.length < b.length ? a : b @@ -44,7 +46,7 @@ async function main() { password: rabbitPassword, vhost: "/", }, - logger + connLogger ) const streamName = `my-stream-${randomUUID()}` diff --git a/performance_test/package.json b/performance_test/package.json index 21f2198f..232d23cc 100644 --- a/performance_test/package.json +++ b/performance_test/package.json @@ -5,7 +5,7 @@ "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1", - "perftest": "ts-node ./index.ts", + "perftest": "ts-node ./index.ts 10000000", "perftest-reset": "cd .. && npm run build && cd - && npm install --force && ts-node ./index.ts 10000000" }, "author": "", From 235326a772c23b74147bfd5c2ca30ccd9ea7234f Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Wed, 13 Dec 2023 15:53:12 +0100 Subject: [PATCH 09/14] add: write buffer size parameters (max, growth ratio, initial size) can be passed to connection objects --- performance_test/index.ts | 3 ++ performance_test/package-lock.json | 4 +++ performance_test/perf_test_producer.ts | 4 +-- src/connection.ts | 20 ++++++++--- src/producer.ts | 4 +-- src/requests/abstract_request.ts | 30 ++++++++++------ src/requests/publish_request.ts | 3 +- src/requests/request.ts | 10 +++++- .../sub_entry_batch_publish_request.ts | 9 +++-- src/responses/credit_response.ts | 3 +- src/responses/deliver_response.ts | 3 +- src/responses/metadata_update_response.ts | 3 +- src/responses/publish_confirm_response.ts | 3 +- src/responses/publish_error_response.ts | 3 +- src/responses/tune_response.ts | 3 +- test/e2e/basic_publish.test.ts | 34 ++++++++++++++++++- test/support/fake_data.ts | 11 +++++- test/unit/buffer_data_writer.test.ts | 9 +++-- test/unit/publish_request.test.ts | 4 +-- test/unit/response_decoder.test.ts | 3 +- 20 files changed, 128 insertions(+), 38 deletions(-) diff --git a/performance_test/index.ts b/performance_test/index.ts index 142f4422..13fb49eb 100644 --- a/performance_test/index.ts +++ b/performance_test/index.ts @@ -4,6 +4,7 @@ import { randomUUID } from "crypto" import { argv } from "process" import { PerfTestProducer } from "./perf_test_producer" import { inspect } from "util" +import { BufferSizeSettings } from "../dist/requests/request" const logger = createLogger({ level: "info", @@ -38,12 +39,14 @@ function parseArgs(args) { async function main() { const rabbitUser = process.env.RABBITMQ_USER || "rabbit" const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit" + const bufferSizeSettings: BufferSizeSettings = { initialSize: 16384 } const connection = await connect( { hostname: "localhost", port: 5552, username: rabbitUser, password: rabbitPassword, + bufferSizeSettings: bufferSizeSettings, vhost: "/", }, connLogger diff --git a/performance_test/package-lock.json b/performance_test/package-lock.json index 3d0e54b0..df4d7c76 100644 --- a/performance_test/package-lock.json +++ b/performance_test/package-lock.json @@ -28,6 +28,7 @@ "@types/amqplib": "^0.10.1", "@types/chai": "^4.3.4", "@types/chai-as-promised": "^7.1.8", + "@types/chai-spies": "^1.0.6", "@types/mocha": "^10.0.1", "@types/node": "^16.18.11", "@typescript-eslint/eslint-plugin": "^5.50.0", @@ -35,6 +36,7 @@ "amqplib": "^0.10.3", "chai": "^4.3.7", "chai-as-promised": "^7.1.1", + "chai-spies": "^1.1.0", "cspell": "^6.21.0", "eslint": "^8.33.0", "eslint-config-prettier": "^8.6.0", @@ -705,6 +707,7 @@ "@types/amqplib": "^0.10.1", "@types/chai": "^4.3.4", "@types/chai-as-promised": "^7.1.8", + "@types/chai-spies": "^1.0.6", "@types/mocha": "^10.0.1", "@types/node": "^16.18.11", "@typescript-eslint/eslint-plugin": "^5.50.0", @@ -712,6 +715,7 @@ "amqplib": "^0.10.3", "chai": "^4.3.7", "chai-as-promised": "^7.1.1", + "chai-spies": "^1.1.0", "cspell": "^6.21.0", "eslint": "^8.33.0", "eslint-config-prettier": "^8.6.0", diff --git a/performance_test/perf_test_producer.ts b/performance_test/perf_test_producer.ts index 83877537..93764bd6 100644 --- a/performance_test/perf_test_producer.ts +++ b/performance_test/perf_test_producer.ts @@ -8,7 +8,7 @@ export class PerfTestProducer { private payload: Buffer private readonly maxChunkSize: number = 1000 private ctr = 0 - private displayTimer: NodeJS.Timeout | null + private displayTimer: NodeJS.Timer | null constructor( private readonly connection: Connection, @@ -46,7 +46,7 @@ export class PerfTestProducer { const metrics = { ...this.metrics.getMetrics(), total: this.ctr } this.logger.info(`${inspect(metrics)}`) if (stop && this.displayTimer) { - clearInterval(this.displayTimer) + clearInterval(this.displayTimer[Symbol.toPrimitive]()) } } diff --git a/src/connection.ts b/src/connection.ts index c79c074c..e45df80a 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -10,7 +10,7 @@ import { DeleteStreamRequest } from "./requests/delete_stream_request" import { OpenRequest } from "./requests/open_request" import { PeerPropertiesRequest } from "./requests/peer_properties_request" import { QueryPublisherRequest } from "./requests/query_publisher_request" -import { Request } from "./requests/request" +import { BufferSizeParams, BufferSizeSettings, Request } from "./requests/request" import { SaslAuthenticateRequest } from "./requests/sasl_authenticate_request" import { SaslHandshakeRequest } from "./requests/sasl_handshake_request" import { TuneRequest } from "./requests/tune_request" @@ -64,6 +64,7 @@ export class Connection { private consumerId = 0 private consumers = new Map() private compressions = new Map() + private readonly bufferSizeSettings: BufferSizeSettings private frameMax: number = DEFAULT_FRAME_MAX private constructor(private readonly logger: Logger, private readonly params: ConnectionParams) { @@ -74,11 +75,11 @@ export class Connection { this.socket = new Socket() this.socket.connect(this.params.port, this.params.hostname) } - this.heartbeat = new Heartbeat(this, this.logger) this.compressions.set(CompressionType.None, NoneCompression.create()) this.compressions.set(CompressionType.Gzip, GzipCompression.create()) this.decoder = new ResponseDecoder((...args) => this.responseReceived(...args), this.logger) + this.bufferSizeSettings = params.bufferSizeSettings || {} } getCompression(compressionType: CompressionType) { @@ -269,7 +270,8 @@ export class Connection { public send(cmd: Request): Promise { return new Promise((res, rej) => { - const body = cmd.toBuffer() + const bufferSizeParams = this.getBufferSizeParams() + const body = cmd.toBuffer(bufferSizeParams) this.logger.debug( `Write cmd key: ${cmd.key.toString(16)} - no correlationId - data: ${inspect(body.toJSON())} length: ${ body.byteLength @@ -375,6 +377,10 @@ export class Connection { return res } + public get maxFrameSize() { + return this.frameMax + } + private askForCredit(params: CreditRequestParams): Promise { return this.send(new CreditRequest({ ...params })) } @@ -424,7 +430,8 @@ export class Connection { private sendAndWait(cmd: Request): Promise { return new Promise((res, rej) => { const correlationId = this.incCorrelationId() - const body = cmd.toBuffer(correlationId) + const bufferSizeParams = this.getBufferSizeParams() + const body = cmd.toBuffer(bufferSizeParams, correlationId) this.logger.debug( `Write cmd key: ${cmd.key.toString(16)} - correlationId: ${correlationId}: data: ${inspect( body.toJSON() @@ -502,6 +509,10 @@ export class Connection { if (tuneResponseFrameMax === DEFAULT_UNLIMITED_FRAME_MAX) return this.frameMax return Math.min(this.frameMax, tuneResponseFrameMax) } + + private getBufferSizeParams(): BufferSizeParams { + return { maxSize: this.frameMax, ...this.bufferSizeSettings } + } } export type ListenersParams = { @@ -526,6 +537,7 @@ export interface ConnectionParams { heartbeat?: number listeners?: ListenersParams ssl?: SSLConnectionParams + bufferSizeSettings?: BufferSizeSettings } export interface DeclarePublisherParams { diff --git a/src/producer.ts b/src/producer.ts index 7aff634c..3f00f6b7 100644 --- a/src/producer.ts +++ b/src/producer.ts @@ -8,6 +8,7 @@ import { PublishRequest, PublishRequestMessage } from "./requests/publish_reques import { SubEntryBatchPublishRequest } from "./requests/sub_entry_batch_publish_request" import { PublishConfirmResponse } from "./responses/publish_confirm_response" import { PublishErrorResponse } from "./responses/publish_error_response" +import { DEFAULT_UNLIMITED_FRAME_MAX } from "./util" export type MessageApplicationProperties = Record @@ -162,7 +163,7 @@ export class StreamProducer implements Producer { private checkMessageSize(publishRequestMessage: PublishRequestMessage) { const computedSize = messageSize(publishRequestMessage.message) - if (computedSize > this.maxFrameSize) { + if (this.maxFrameSize !== DEFAULT_UNLIMITED_FRAME_MAX && computedSize > this.maxFrameSize) { throw new FrameSizeException(`Message too big to fit in one frame: ${computedSize}`) } @@ -176,7 +177,6 @@ export class StreamProducer implements Producer { new PublishRequest({ publisherId: this.publisherId, messages: chunk, - maxFrameSize: this.maxFrameSize, }) ) } diff --git a/src/requests/abstract_request.ts b/src/requests/abstract_request.ts index 04169857..32e075f3 100644 --- a/src/requests/abstract_request.ts +++ b/src/requests/abstract_request.ts @@ -1,11 +1,18 @@ +import { DEFAULT_FRAME_MAX } from "../util" import { DataWriter } from "./data_writer" -import { Request } from "./request" +import { BufferSizeParams, Request } from "./request" export class BufferDataWriter implements DataWriter { private _offset = 0 + private readonly maxBufferSize: number + private readonly growthTriggerRatio: number + private readonly sizeMultiplier: number - constructor(private readonly writeBufferMaxSize: number, private buffer: Buffer, startFrom: number) { + constructor(private buffer: Buffer, startFrom: number, bufferSizeParameters?: BufferSizeParams) { this._offset = startFrom + this.maxBufferSize = bufferSizeParameters?.maxSize ?? 1048576 + this.growthTriggerRatio = bufferSizeParameters?.maxRatio ?? 0.9 + this.sizeMultiplier = bufferSizeParameters?.multiplier ?? 2 } get offset() { @@ -85,29 +92,32 @@ export class BufferDataWriter implements DataWriter { } private growIfNeeded(additionalBytes: number) { - const maxRatio = 0.9 - if ((this._offset + additionalBytes) / this.buffer.length > maxRatio) { + if ((this._offset + additionalBytes) / this.buffer.length > this.growthTriggerRatio) { this.growBuffer(additionalBytes) } } private growBuffer(requiredBytes: number) { - const newSize = Math.min(this.buffer.length * 2 + requiredBytes, this.writeBufferMaxSize) + const newSize = this.getNewSize(requiredBytes) const data = Buffer.from(this.buffer) this.buffer = Buffer.alloc(newSize) data.copy(this.buffer, 0) } + + private getNewSize(requiredBytes: number) { + const requiredNewSize = this.buffer.length * this.sizeMultiplier + this._offset + requiredBytes + if (this.maxBufferSize === DEFAULT_FRAME_MAX) return requiredNewSize + return Math.min(requiredNewSize, this.maxBufferSize) + } } export abstract class AbstractRequest implements Request { abstract get key(): number abstract get responseKey(): number readonly version = 1 - constructor(private readonly writeBufferMaxSize: number = 1048576) {} - - toBuffer(correlationId?: number): Buffer { - const initialBufferSize = 65536 - const dataWriter = new BufferDataWriter(this.writeBufferMaxSize, Buffer.alloc(initialBufferSize), 4) + toBuffer(bufferSizeParams?: BufferSizeParams, correlationId?: number): Buffer { + const initialSize = bufferSizeParams?.initialSize ?? 65536 + const dataWriter = new BufferDataWriter(Buffer.alloc(initialSize), 4, bufferSizeParams) dataWriter.writeUInt16(this.key) dataWriter.writeUInt16(this.version) if (typeof correlationId === "number") { diff --git a/src/requests/publish_request.ts b/src/requests/publish_request.ts index 7d155fbd..27a0c660 100644 --- a/src/requests/publish_request.ts +++ b/src/requests/publish_request.ts @@ -10,7 +10,6 @@ export type PublishRequestMessage = { interface PublishRequestParams { publisherId: number - maxFrameSize: number messages: Array } @@ -19,7 +18,7 @@ export class PublishRequest extends AbstractRequest { readonly responseKey = -1 constructor(private params: PublishRequestParams) { - super(params.maxFrameSize) + super() } protected writeContent(writer: DataWriter): void { diff --git a/src/requests/request.ts b/src/requests/request.ts index dd55b90e..3b8ebbd1 100644 --- a/src/requests/request.ts +++ b/src/requests/request.ts @@ -1,5 +1,13 @@ +export type BufferSizeSettings = { + initialSize?: number + maxRatio?: number + multiplier?: number +} + +export type BufferSizeParams = BufferSizeSettings & { maxSize: number } + export interface Request { - toBuffer(correlationId?: number): Buffer + toBuffer(bufferSizeParams?: BufferSizeParams, correlationId?: number): Buffer readonly responseKey: number readonly key: number readonly version: 1 diff --git a/src/requests/sub_entry_batch_publish_request.ts b/src/requests/sub_entry_batch_publish_request.ts index 3d1265c8..585dd870 100644 --- a/src/requests/sub_entry_batch_publish_request.ts +++ b/src/requests/sub_entry_batch_publish_request.ts @@ -15,13 +15,15 @@ interface SubEntryBatchPublishRequestParams { export class SubEntryBatchPublishRequest extends AbstractRequest { readonly key = 0x02 readonly responseKey = -1 + private readonly maxFrameSize: number constructor(private params: SubEntryBatchPublishRequestParams) { - super(params.maxFrameSize) + super() + this.maxFrameSize = params.maxFrameSize } protected writeContent(writer: DataWriter): void { - const { compression, messages, publishingId, publisherId, maxFrameSize } = this.params + const { compression, messages, publishingId, publisherId } = this.params writer.writeUInt8(publisherId) // number of root messages. In this case will be always 1 writer.writeUInt32(1) @@ -31,7 +33,8 @@ export class SubEntryBatchPublishRequest extends AbstractRequest { writer.writeUInt32(messages.reduce((sum, message) => sum + 4 + messageSize(message), 0)) const initialDataBufferSize = 65536 - const data = new BufferDataWriter(maxFrameSize, Buffer.alloc(initialDataBufferSize), 0) + const bufferSizeParams = { maxSize: this.maxFrameSize } + const data = new BufferDataWriter(Buffer.alloc(initialDataBufferSize), 0, bufferSizeParams) messages.forEach((m) => amqpEncode(data, m)) const compressedData = compression.compress(data.toBuffer()) diff --git a/src/responses/credit_response.ts b/src/responses/credit_response.ts index 424735ae..9db7f912 100644 --- a/src/responses/credit_response.ts +++ b/src/responses/credit_response.ts @@ -13,7 +13,8 @@ export class CreditResponse implements Response { toBuffer(): Buffer { const bufferSize = 1024 - const dw = new BufferDataWriter(bufferSize, Buffer.alloc(bufferSize), 4) + const bufferSizeParams = { maxSize: bufferSize } + const dw = new BufferDataWriter(Buffer.alloc(bufferSize), 4, bufferSizeParams) dw.writeUInt16(CreditResponse.key) dw.writeUInt16(1) dw.writeUInt16(this.response.responseCode) diff --git a/src/responses/deliver_response.ts b/src/responses/deliver_response.ts index 8eac7eb7..5e25d4a4 100755 --- a/src/responses/deliver_response.ts +++ b/src/responses/deliver_response.ts @@ -14,7 +14,8 @@ export class DeliverResponse implements Response { toBuffer(): Buffer { const bufferSize = 1024 - const dw = new BufferDataWriter(bufferSize, Buffer.alloc(bufferSize), 4) + const bufferSizeParams = { maxSize: bufferSize } + const dw = new BufferDataWriter(Buffer.alloc(bufferSize), 4, bufferSizeParams) dw.writeUInt16(DeliverResponse.key) dw.writeUInt16(1) dw.writeUInt8(this.response.subscriptionId) diff --git a/src/responses/metadata_update_response.ts b/src/responses/metadata_update_response.ts index a31648c3..b4887193 100644 --- a/src/responses/metadata_update_response.ts +++ b/src/responses/metadata_update_response.ts @@ -13,7 +13,8 @@ export class MetadataUpdateResponse implements Response { toBuffer(): Buffer { const bufferSize = 1024 - const dw = new BufferDataWriter(bufferSize, Buffer.alloc(bufferSize), 4) + const bufferSizeParams = { maxSize: bufferSize } + const dw = new BufferDataWriter(Buffer.alloc(bufferSize), 4, bufferSizeParams) dw.writeUInt16(MetadataUpdateResponse.key) dw.writeUInt16(1) dw.writeUInt16(this.response.metadataInfo.code) diff --git a/src/responses/publish_confirm_response.ts b/src/responses/publish_confirm_response.ts index 701abf14..2bdbad63 100644 --- a/src/responses/publish_confirm_response.ts +++ b/src/responses/publish_confirm_response.ts @@ -16,7 +16,8 @@ export class PublishConfirmResponse implements Response { toBuffer(): Buffer { const bufferSize = 1024 - const dw = new BufferDataWriter(bufferSize, Buffer.alloc(bufferSize), 4) + const bufferSizeParams = { maxSize: bufferSize } + const dw = new BufferDataWriter(Buffer.alloc(bufferSize), 4, bufferSizeParams) dw.writeUInt16(PublishConfirmResponse.key) dw.writeUInt16(1) dw.writeUInt8(this.publisherId) diff --git a/src/responses/publish_error_response.ts b/src/responses/publish_error_response.ts index a316c962..8c18273b 100644 --- a/src/responses/publish_error_response.ts +++ b/src/responses/publish_error_response.ts @@ -21,7 +21,8 @@ export class PublishErrorResponse implements Response { toBuffer(): Buffer { const bufferSize = 1024 - const dw = new BufferDataWriter(bufferSize, Buffer.alloc(bufferSize), 4) + const bufferSizeParams = { maxSize: bufferSize } + const dw = new BufferDataWriter(Buffer.alloc(bufferSize), 4, bufferSizeParams) dw.writeUInt16(PublishErrorResponse.key) dw.writeUInt16(1) dw.writeUInt8(this.publisherId) diff --git a/src/responses/tune_response.ts b/src/responses/tune_response.ts index b132444e..fb79ce5d 100644 --- a/src/responses/tune_response.ts +++ b/src/responses/tune_response.ts @@ -12,7 +12,8 @@ export class TuneResponse implements Response { toBuffer(): Buffer { const bufferSize = 1024 - const dw = new BufferDataWriter(bufferSize, Buffer.alloc(bufferSize), 4) + const bufferSizeParams = { maxSize: bufferSize } + const dw = new BufferDataWriter(Buffer.alloc(bufferSize), 4, bufferSizeParams) dw.writeUInt16(TuneResponse.key) dw.writeUInt16(1) dw.writeUInt32(this.response.frameMax) diff --git a/test/e2e/basic_publish.test.ts b/test/e2e/basic_publish.test.ts index 7857ba61..628d61f5 100644 --- a/test/e2e/basic_publish.test.ts +++ b/test/e2e/basic_publish.test.ts @@ -5,15 +5,19 @@ import { Producer } from "../../src/producer" import { createConnection, createProperties, createPublisher, createStreamName } from "../support/fake_data" import { Rabbit } from "../support/rabbit" import { eventually, username, password, getMessageFrom } from "../support/util" +import { BufferSizeSettings } from "../../src/requests/request" +import { FrameSizeException } from "../../src/requests/frame_size_exception" describe("publish a message", () => { const rabbit = new Rabbit(username, password) let connection: Connection let streamName: string let publisher: Producer + let bufferSizeSettings: BufferSizeSettings | undefined = undefined + let maxFrameSize: number | undefined = undefined beforeEach(async () => { - connection = await createConnection(username, password) + connection = await createConnection(username, password, undefined, maxFrameSize, bufferSizeSettings) streamName = createStreamName() await rabbit.createStream(streamName) publisher = await createPublisher(streamName, connection) @@ -85,6 +89,34 @@ describe("publish a message", () => { expect(properties.headers).eql({ ...applicationProperties, "x-stream-offset": 0 }) }) + describe("with custom buffer size limits", () => { + before(() => { + bufferSizeSettings = { initialSize: 16 } + maxFrameSize = 256 + }) + + after(() => { + bufferSizeSettings = undefined + maxFrameSize = undefined + }) + + it("send a message that triggers buffer size growth", async () => { + const message = Array(bufferSizeSettings!.initialSize! * 3).join(".") + + await publisher.send(Buffer.from(message)) + + const msg = await getMessageFrom(streamName, username, password) + const { content } = msg + expect(message).eql(content) + }) + + it("max buffer size reached, exception thrown", async () => { + const message = Array(maxFrameSize).join(".") + + return expect(publisher.send(Buffer.from(message))).to.be.rejectedWith(FrameSizeException) + }) + }) + describe("deduplication", () => { it("is active if create a publisher with publishRef", async () => { const howMany = 100 diff --git a/test/support/fake_data.ts b/test/support/fake_data.ts index bf169fa5..ddb61280 100644 --- a/test/support/fake_data.ts +++ b/test/support/fake_data.ts @@ -1,6 +1,7 @@ import { randomUUID } from "crypto" import { Connection, ListenersParams, connect } from "../../src/connection" import { MessageProperties } from "../../src/producer" +import { BufferSizeSettings } from "../../src/requests/request" export function createProperties(): MessageProperties { return { @@ -28,11 +29,18 @@ export async function createPublisher(streamName: string, connection: Connection const publisher = await connection.declarePublisher({ stream: streamName, publisherRef: `my-publisher-${randomUUID()}`, + maxFrameSize: connection.maxFrameSize, }) return publisher } -export function createConnection(username: string, password: string, listeners?: ListenersParams, frameMax?: number) { +export function createConnection( + username: string, + password: string, + listeners?: ListenersParams, + frameMax?: number, + bufferSizeSettings?: BufferSizeSettings +) { return connect({ hostname: "localhost", port: 5552, @@ -42,5 +50,6 @@ export function createConnection(username: string, password: string, listeners?: frameMax: frameMax ?? 0, heartbeat: 0, listeners: listeners, + bufferSizeSettings: bufferSizeSettings, }) } diff --git a/test/unit/buffer_data_writer.test.ts b/test/unit/buffer_data_writer.test.ts index 08929892..bbd24752 100644 --- a/test/unit/buffer_data_writer.test.ts +++ b/test/unit/buffer_data_writer.test.ts @@ -6,7 +6,8 @@ describe("Buffer Data Writer functionalities", () => { const stringPayload = "a long string that requires the buffer to grow" it("allocate a functioning buffer data writer", () => { - const b = new BufferDataWriter(bufferMaxSize, Buffer.alloc(bufferInitialSize), 0) + const bufferSizeParams = { maxSize: bufferMaxSize } + const b = new BufferDataWriter(Buffer.alloc(bufferInitialSize), 0, bufferSizeParams) b.writeByte(1) const result = b.toBuffer() @@ -15,7 +16,8 @@ describe("Buffer Data Writer functionalities", () => { }) it("grow the buffer when needed", () => { - const b = new BufferDataWriter(bufferMaxSize, Buffer.alloc(bufferInitialSize), 0) + const bufferSizeParams = { maxSize: bufferMaxSize } + const b = new BufferDataWriter(Buffer.alloc(bufferInitialSize), 0, bufferSizeParams) b.writeString(stringPayload) @@ -29,7 +31,8 @@ describe("Buffer Data Writer functionalities", () => { it("the buffer max size is a hard limit", () => { const maxSize = 32 - const b = new BufferDataWriter(maxSize, Buffer.alloc(bufferInitialSize), 0) + const bufferSizeParams = { maxSize: maxSize } + const b = new BufferDataWriter(Buffer.alloc(bufferInitialSize), 0, bufferSizeParams) b.writeString(stringPayload) diff --git a/test/unit/publish_request.test.ts b/test/unit/publish_request.test.ts index afce83f4..acbb5519 100644 --- a/test/unit/publish_request.test.ts +++ b/test/unit/publish_request.test.ts @@ -9,9 +9,9 @@ describe("PublishRequest", () => { const messages = [...Array(100).keys()].map((idx) => { return { publishingId: BigInt(idx), message: { content: Buffer.from(randomUUID()) } } }) - const pr = new PublishRequest({ publisherId, messages, maxFrameSize }) + const pr = new PublishRequest({ publisherId, messages }) - const written = pr.toBuffer() + const written = pr.toBuffer({ maxSize: maxFrameSize }) expect(written.byteLength).eql(5313) }) diff --git a/test/unit/response_decoder.test.ts b/test/unit/response_decoder.test.ts index d5d022d8..be967ee2 100644 --- a/test/unit/response_decoder.test.ts +++ b/test/unit/response_decoder.test.ts @@ -55,7 +55,8 @@ describe("ResponseDecoder", () => { function createResponse(params: { key: number; correlationId?: number; responseCode?: number }): Buffer { const bufferSize = 1024 - const dataWriter = new BufferDataWriter(bufferSize, Buffer.alloc(bufferSize), 4) + const bufferSizeParams = { maxSize: bufferSize } + const dataWriter = new BufferDataWriter(Buffer.alloc(bufferSize), 4, bufferSizeParams) dataWriter.writeUInt16(params.key) dataWriter.writeUInt16(1) dataWriter.writeUInt32(params.correlationId || 101) From 0f2e03df0f7053c4310eff00d0c1879658dbd34d Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Wed, 13 Dec 2023 16:40:40 +0100 Subject: [PATCH 10/14] fix: passed parameters to perftest --- performance_test/index.ts | 6 +++++- performance_test/package.json | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/performance_test/index.ts b/performance_test/index.ts index 13fb49eb..6cce4959 100644 --- a/performance_test/index.ts +++ b/performance_test/index.ts @@ -39,7 +39,9 @@ function parseArgs(args) { async function main() { const rabbitUser = process.env.RABBITMQ_USER || "rabbit" const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit" + const bufferSizeSettings: BufferSizeSettings = { initialSize: 16384 } + const connection = await connect( { hostname: "localhost", @@ -57,7 +59,9 @@ async function main() { const publisherRef = `my-publisher-${randomUUID()}` const passedArgs = parseArgs(argv.slice(2)) logger.info( - `Stream: ${streamName} - publisher ${publisherRef} - max messages ${passedArgs.maxMessages} - message size: ${passedArgs.messageSize} bytes` + `Stream: ${streamName} - publisher ${publisherRef} - max messages ${passedArgs.maxMessages} - message size: ${ + passedArgs.messageSize + } bytes - write buffer settings: ${inspect(bufferSizeSettings)}` ) const perfTestProducer = new PerfTestProducer( diff --git a/performance_test/package.json b/performance_test/package.json index 232d23cc..d992e805 100644 --- a/performance_test/package.json +++ b/performance_test/package.json @@ -5,8 +5,8 @@ "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1", - "perftest": "ts-node ./index.ts 10000000", - "perftest-reset": "cd .. && npm run build && cd - && npm install --force && ts-node ./index.ts 10000000" + "perftest": "ts-node ./index.ts", + "perftest-reset": "cd .. && npm run build && cd - && npm install --force && ts-node ./index.ts" }, "author": "", "license": "ISC", From f18c5bfedd9ae0465910dc0d68fa821e04ad814f Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Thu, 14 Dec 2023 10:27:36 +0100 Subject: [PATCH 11/14] fix: exit immediately on perftest error --- performance_test/index.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/performance_test/index.ts b/performance_test/index.ts index 6cce4959..3b05e4eb 100644 --- a/performance_test/index.ts +++ b/performance_test/index.ts @@ -39,7 +39,6 @@ function parseArgs(args) { async function main() { const rabbitUser = process.env.RABBITMQ_USER || "rabbit" const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit" - const bufferSizeSettings: BufferSizeSettings = { initialSize: 16384 } const connection = await connect( @@ -77,4 +76,7 @@ async function main() { main() .then((_v) => setTimeout(() => process.exit(0), 1000)) - .catch((res) => logger.error("ERROR ", res)) + .catch((res) => { + logger.error("ERROR ", res) + process.exit(400) + }) From 918f5d56424815cf98e24706cbc5fa5b9c9bd964 Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Thu, 14 Dec 2023 15:08:53 +0100 Subject: [PATCH 12/14] fix: constant reference when computing new write buffer size --- performance_test/index.ts | 2 ++ src/requests/abstract_request.ts | 4 ++-- test/unit/buffer_data_writer.test.ts | 16 ++++++++++++++++ 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/performance_test/index.ts b/performance_test/index.ts index 3b05e4eb..1d254923 100644 --- a/performance_test/index.ts +++ b/performance_test/index.ts @@ -40,6 +40,7 @@ async function main() { const rabbitUser = process.env.RABBITMQ_USER || "rabbit" const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit" const bufferSizeSettings: BufferSizeSettings = { initialSize: 16384 } + const frameMax = 65536 const connection = await connect( { @@ -49,6 +50,7 @@ async function main() { password: rabbitPassword, bufferSizeSettings: bufferSizeSettings, vhost: "/", + frameMax, }, connLogger ) diff --git a/src/requests/abstract_request.ts b/src/requests/abstract_request.ts index 32e075f3..ae248352 100644 --- a/src/requests/abstract_request.ts +++ b/src/requests/abstract_request.ts @@ -1,4 +1,4 @@ -import { DEFAULT_FRAME_MAX } from "../util" +import { DEFAULT_UNLIMITED_FRAME_MAX } from "../util" import { DataWriter } from "./data_writer" import { BufferSizeParams, Request } from "./request" @@ -106,7 +106,7 @@ export class BufferDataWriter implements DataWriter { private getNewSize(requiredBytes: number) { const requiredNewSize = this.buffer.length * this.sizeMultiplier + this._offset + requiredBytes - if (this.maxBufferSize === DEFAULT_FRAME_MAX) return requiredNewSize + if (this.maxBufferSize === DEFAULT_UNLIMITED_FRAME_MAX) return requiredNewSize return Math.min(requiredNewSize, this.maxBufferSize) } } diff --git a/test/unit/buffer_data_writer.test.ts b/test/unit/buffer_data_writer.test.ts index bbd24752..d958e943 100644 --- a/test/unit/buffer_data_writer.test.ts +++ b/test/unit/buffer_data_writer.test.ts @@ -1,5 +1,6 @@ import { expect } from "chai" import { BufferDataWriter } from "../../src/requests/abstract_request" +import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX } from "../../src/util" describe("Buffer Data Writer functionalities", () => { const bufferMaxSize = 1024 const bufferInitialSize = 1 @@ -40,4 +41,19 @@ describe("Buffer Data Writer functionalities", () => { const pl = result.slice(2) expect(pl.toString()).eql("a long string that requires th") }) + + it("when maxSize === DEFAULT_UNLIMITED_FRAME_MAX, the buffer can grow", () => { + const bufferSizeParams = { maxSize: DEFAULT_UNLIMITED_FRAME_MAX } + const b = new BufferDataWriter(Buffer.alloc(bufferInitialSize), 0, bufferSizeParams) + const payload = Buffer.from( + Array.from(Array(DEFAULT_FRAME_MAX + 1).keys()) + .map((_k) => "") + .join(",") + ) + + b.writeData(payload) + + const result = b.toBuffer() + expect(result).eql(payload) + }) }) From 1a80b58c7cdf246f66947cdd25af08328cebe928 Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Thu, 14 Dec 2023 16:19:34 +0100 Subject: [PATCH 13/14] update: use connection frameMax parameter when creating a new publisher --- src/connection.ts | 3 +-- src/producer.ts | 2 +- test/support/fake_data.ts | 1 - test/unit/producer.test.ts | 6 +----- 4 files changed, 3 insertions(+), 9 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index e45df80a..e80e3d1a 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -199,7 +199,7 @@ export class Connection { publisherId: publisherId, publisherRef: params.publisherRef, boot: params.boot, - maxFrameSize: params.maxFrameSize, + maxFrameSize: this.frameMax, maxChunkLength: params.maxChunkLength, logger: this.logger, }) @@ -544,7 +544,6 @@ export interface DeclarePublisherParams { stream: string publisherRef?: string boot?: boolean - maxFrameSize?: number maxChunkLength?: number } diff --git a/src/producer.ts b/src/producer.ts index 3f00f6b7..6a51dfbc 100644 --- a/src/producer.ts +++ b/src/producer.ts @@ -85,7 +85,7 @@ export class StreamProducer implements Producer { publisherId: number publisherRef?: string boot?: boolean - maxFrameSize?: number + maxFrameSize: number maxChunkLength?: number logger: Logger }) { diff --git a/test/support/fake_data.ts b/test/support/fake_data.ts index ddb61280..e7f78a67 100644 --- a/test/support/fake_data.ts +++ b/test/support/fake_data.ts @@ -29,7 +29,6 @@ export async function createPublisher(streamName: string, connection: Connection const publisher = await connection.declarePublisher({ stream: streamName, publisherRef: `my-publisher-${randomUUID()}`, - maxFrameSize: connection.maxFrameSize, }) return publisher } diff --git a/test/unit/producer.test.ts b/test/unit/producer.test.ts index c3c2b554..ad4c6cd1 100644 --- a/test/unit/producer.test.ts +++ b/test/unit/producer.test.ts @@ -100,7 +100,7 @@ describe("Producer", () => { username, password, vhost: "/", - frameMax: 0, + frameMax: maxFrameSize, heartbeat: 0, }) }) @@ -113,7 +113,6 @@ describe("Producer", () => { const publisher = await writeConnection!.declarePublisher({ stream: testStreamName, publisherRef, - maxFrameSize: maxFrameSize, }) const msg = Buffer.from(Array.from(Array(maxFrameSize + 1).keys()).map((_v) => 1)) @@ -125,7 +124,6 @@ describe("Producer", () => { const publisher = await writeConnection!.declarePublisher({ stream: testStreamName, publisherRef, - maxFrameSize: maxFrameSize, maxChunkLength: chunkSize, }) const msg = Buffer.from([1]) @@ -139,7 +137,6 @@ describe("Producer", () => { const publisher = await writeConnection!.declarePublisher({ stream: testStreamName, publisherRef, - maxFrameSize: maxFrameSize, maxChunkLength: queueLength, }) const msgs = [Buffer.from([1]), Buffer.from([2])] @@ -155,7 +152,6 @@ describe("Producer", () => { const publisher = await writeConnection!.declarePublisher({ stream: testStreamName, publisherRef, - maxFrameSize: maxFrameSize, maxChunkLength: queueLength, }) const msgs = Array.from(Array(messageQuantity).keys()).map((k) => Buffer.from([k])) From b4c7161e9fee4b919cee15a78828c17b98c3ab18 Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Wed, 20 Dec 2023 12:50:01 +0100 Subject: [PATCH 14/14] fix: type definition in perftest --- performance_test/perf_test_producer.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/performance_test/perf_test_producer.ts b/performance_test/perf_test_producer.ts index 93764bd6..83877537 100644 --- a/performance_test/perf_test_producer.ts +++ b/performance_test/perf_test_producer.ts @@ -8,7 +8,7 @@ export class PerfTestProducer { private payload: Buffer private readonly maxChunkSize: number = 1000 private ctr = 0 - private displayTimer: NodeJS.Timer | null + private displayTimer: NodeJS.Timeout | null constructor( private readonly connection: Connection, @@ -46,7 +46,7 @@ export class PerfTestProducer { const metrics = { ...this.metrics.getMetrics(), total: this.ctr } this.logger.info(`${inspect(metrics)}`) if (stop && this.displayTimer) { - clearInterval(this.displayTimer[Symbol.toPrimitive]()) + clearInterval(this.displayTimer) } }