Skip to content

Commit

Permalink
102 performant send (#120)
Browse files Browse the repository at this point in the history
* add: performance test as subproject, run it in CI

* fix: producer interface after rebase

* fix: improve readability

* add message buffer in producer

* add: buffer data in writer grows in size as needed, with max hard limit

* fix test: add maxFrameSize parameter in PublishRequeest instantiation

* add test for scheduled flush of producer queue

* update: perftest forces NullLogger in connection object

* add: write buffer size parameters (max, growth ratio, initial size) can be passed to connection objects

* fix: passed parameters to perftest

* fix: exit immediately on perftest error

* fix: constant reference when computing new write buffer size

* update: use connection frameMax parameter when creating a new publisher

* fix: type definition in perftest
  • Loading branch information
icappello committed Dec 20, 2023
1 parent 00d56f6 commit 3910c14
Show file tree
Hide file tree
Showing 26 changed files with 521 additions and 69 deletions.
80 changes: 80 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@
"@tsconfig/node16": "^1.0.3",
"@types/amqplib": "^0.10.1",
"@types/chai": "^4.3.4",
"@types/chai-as-promised": "^7.1.8",
"@types/chai-spies": "^1.0.6",
"@types/mocha": "^10.0.1",
"@types/node": "^16.18.11",
"@typescript-eslint/eslint-plugin": "^5.50.0",
"@typescript-eslint/parser": "^5.50.0",
"amqplib": "^0.10.3",
"chai": "^4.3.7",
"chai-as-promised": "^7.1.1",
"chai-spies": "^1.1.0",
"cspell": "^6.21.0",
"eslint": "^8.33.0",
"eslint-config-prettier": "^8.6.0",
Expand Down
22 changes: 16 additions & 6 deletions performance_test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { randomUUID } from "crypto"
import { argv } from "process"
import { PerfTestProducer } from "./perf_test_producer"
import { inspect } from "util"
import { BufferSizeSettings } from "../dist/requests/request"

const logger = createLogger({
level: "info",
Expand All @@ -18,6 +19,8 @@ const logger = createLogger({
transports: new transports.Console(),
})

const connLogger = undefined

function parseArgs(args) {
const zip = (a: string[], b: string[]): [string, number][] => {
const shorterArray = a.length < b.length ? a : b
Expand All @@ -36,23 +39,30 @@ function parseArgs(args) {
async function main() {
const rabbitUser = process.env.RABBITMQ_USER || "rabbit"
const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit"
const bufferSizeSettings: BufferSizeSettings = { initialSize: 16384 }
const frameMax = 65536

const connection = await connect(
{
hostname: "localhost",
port: 5552,
username: rabbitUser,
password: rabbitPassword,
bufferSizeSettings: bufferSizeSettings,
vhost: "/",
frameMax,
},
logger
connLogger
)

const streamName = `my-stream-${randomUUID()}`
await connection.createStream({ stream: streamName, arguments: {} })
const publisherRef = `my-publisher-${randomUUID()}`
const passedArgs = parseArgs(argv.slice(2))
logger.info(
`Stream: ${streamName} - publisher ${publisherRef} - max messages ${passedArgs.maxMessages} - message size: ${passedArgs.messageSize} bytes`
`Stream: ${streamName} - publisher ${publisherRef} - max messages ${passedArgs.maxMessages} - message size: ${
passedArgs.messageSize
} bytes - write buffer settings: ${inspect(bufferSizeSettings)}`
)

const perfTestProducer = new PerfTestProducer(
Expand All @@ -67,8 +77,8 @@ async function main() {
}

main()
.then((_v) => {
logger.info(`Ending...`)
setTimeout(() => process.exit(0), 1000)
.then((_v) => setTimeout(() => process.exit(0), 1000))
.catch((res) => {
logger.error("ERROR ", res)
process.exit(400)
})
.catch((res) => logger.error("ERROR ", res))
12 changes: 10 additions & 2 deletions performance_test/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion performance_test/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"perftest": "ts-node ./index.ts"
"perftest": "ts-node ./index.ts",
"perftest-reset": "cd .. && npm run build && cd - && npm install --force && ts-node ./index.ts"
},
"author": "",
"license": "ISC",
Expand Down
17 changes: 10 additions & 7 deletions performance_test/perf_test_producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,39 @@ export class PerfTestProducer {
const publisher = await this.connection.declarePublisher(this.publisherParams)
publisher.on("publish_confirm", (err, confirmedIds) => {
if (err) {
console.log(err)
this.logger.error(err)
}
this.metrics.addCounter("confirmed", confirmedIds.length)
})

this.displayTimer = setInterval(() => {
this.displayMetrics()
this.metrics.setStart()
}, 1000)
}, 500)

await this.send(publisher)

return true
}

private displayMetrics(stop: boolean = false) {
const metrics = { ...this.metrics.getMetrics(), total: this.ctr }
this.logger.info(`${new Date().toISOString()} - ${inspect(metrics)}`)
this.logger.info(`${inspect(metrics)}`)
if (stop && this.displayTimer) {
clearInterval(this.displayTimer)
}
}

private async send(publisher: Producer) {
while (this.maxMessages === -1 || this.ctr < this.maxMessages) {
const nmsgs = this.maxMessages > 0 ? Math.min(this.maxChunkSize, this.maxMessages) : this.maxChunkSize
for (let index = 0; index < nmsgs; index++) {
const messageQuantity = this.maxMessages > 0 ? Math.min(this.maxChunkSize, this.maxMessages) : this.maxChunkSize
for (let index = 0; index < messageQuantity; index++) {
await publisher.send(this.payload, {})
}
this.ctr = this.ctr + nmsgs
this.metrics.addCounter("published", nmsgs)
this.ctr = this.ctr + messageQuantity
this.metrics.addCounter("published", messageQuantity)
}
this.displayMetrics(true)
}

public getDisplayTimer() {
Expand Down
Loading

0 comments on commit 3910c14

Please sign in to comment.