diff --git a/src/connection.ts b/src/connection.ts index c8403811..94ba7a76 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -32,6 +32,7 @@ import { SubscribeResponse } from "./responses/subscribe_response" import { Offset, SubscribeRequest } from "./requests/subscribe_request" import { Consumer, ConsumerFunc } from "./consumer" import { DeliverResponse } from "./responses/deliver_response" +import { CreditRequest, CreditRequestParams } from "./requests/credit_request" export class Connection { private readonly socket = new Socket() @@ -87,8 +88,8 @@ export class Connection { }) } - public on(_event: "metadata_update", listener: MetadataUpdateListener) { - this.decoder.on("metadata_update", listener) + public on(event: "metadata_update", listener: MetadataUpdateListener) { + this.decoder.on(event, listener) } public async close( @@ -234,6 +235,10 @@ export class Connection { return res } + private askForCredit(params: CreditRequestParams): Promise { + return this.send(new CreditRequest({ ...params })) + } + private async exchangeProperties(): Promise { this.logger.debug(`Exchange peer properties ...`) const res = await this.sendAndWait(new PeerPropertiesRequest()) @@ -329,18 +334,23 @@ export class Connection { } private registerListeners(listeners?: ListenersParams) { - if (listeners) this.decoder.on("metadata_update", listeners.metadata_update) + if (listeners) { + this.on("metadata_update", listeners.metadata_update) + } } private registerDelivers() { - this.decoder.on("deliver", (response: DeliverResponse) => { + this.decoder.on("deliver", async (response: DeliverResponse) => { this.logger.debug(`on deliver -> ${inspect(response)} - consumers: ${this.consumers}`) + await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }) response.messages.map((x) => this.consumers[response.subscriptionId].handle(x)) }) } } -type ListenersParams = Record<"metadata_update", MetadataUpdateListener> +type ListenersParams = { + metadata_update: MetadataUpdateListener +} export interface ConnectionParams { hostname: string diff --git a/src/requests/credit_request.ts b/src/requests/credit_request.ts new file mode 100644 index 00000000..1c733b80 --- /dev/null +++ b/src/requests/credit_request.ts @@ -0,0 +1,21 @@ +import { AbstractRequest } from "./abstract_request" +import { DataWriter } from "./data_writer" + +export type CreditRequestParams = { + subscriptionId: number + credit: number +} + +export class CreditRequest extends AbstractRequest { + readonly key = 0x09 + readonly responseKey = -1 + + constructor(private params: CreditRequestParams) { + super() + } + + protected writeContent(writer: DataWriter): void { + writer.writeUInt8(this.params.subscriptionId) + writer.writeUInt16(this.params.credit) + } +} diff --git a/src/response_decoder.ts b/src/response_decoder.ts index 49d67493..597fb56d 100644 --- a/src/response_decoder.ts +++ b/src/response_decoder.ts @@ -10,6 +10,7 @@ import { PeerPropertiesResponse } from "./responses/peer_properties_response" import { DataReader, RawDeliverResponse, + RawCreditResponse, RawHeartbeatResponse, RawMetadataUpdateResponse, RawResponse, @@ -26,6 +27,7 @@ import { EventEmitter } from "events" import { SubscribeResponse } from "./responses/subscribe_response" import { DeliverResponse } from "./responses/deliver_response" import { FormatCodeType, FormatCode } from "./amqp10/decoder" +import { CreditResponse } from "./responses/credit_response" // Frame => Size (Request | Response | Command) // Size => uint32 (size without the 4 bytes of the size element) @@ -37,25 +39,27 @@ import { FormatCodeType, FormatCode } from "./amqp10/decoder" // ResponseCode => uint16 export type MetadataUpdateListener = (metadata: MetadataUpdateResponse) => void +export type CreditListener = (creditResponse: CreditResponse) => void export type DeliverListener = (response: DeliverResponse) => void type MessageAndSubId = { subscriptionId: number messages: Buffer[] } -function decode( - data: DataReader, - logger: Logger -): RawResponse | RawTuneResponse | RawHeartbeatResponse | RawMetadataUpdateResponse | RawDeliverResponse { +type PossibleRawResponses = + | RawResponse + | RawTuneResponse + | RawHeartbeatResponse + | RawMetadataUpdateResponse + | RawDeliverResponse + | RawCreditResponse + +function decode(data: DataReader, logger: Logger): PossibleRawResponses { const size = data.readUInt32() return decodeResponse(data.readTo(size), size, logger) } -function decodeResponse( - dataResponse: DataReader, - size: number, - logger: Logger -): RawResponse | RawTuneResponse | RawHeartbeatResponse | RawMetadataUpdateResponse | RawDeliverResponse { +function decodeResponse(dataResponse: DataReader, size: number, logger: Logger): PossibleRawResponses { const key = dataResponse.readUInt16() const version = dataResponse.readUInt16() if (key === DeliverResponse.key) { @@ -84,10 +88,24 @@ function decodeResponse( } return { size, key, version, metadataInfo } as RawMetadataUpdateResponse } + + if (key === CreditResponse.key) { + const responseCode = dataResponse.readUInt16() + const subscriptionId = dataResponse.readUInt8() + const response: RawCreditResponse = { + size, + key, + version, + responseCode, + subscriptionId, + } + return response + } + const correlationId = dataResponse.readUInt32() - const responseCode = dataResponse.readUInt16() + const code = dataResponse.readUInt16() const payload = dataResponse.readToEnd() - return { size, key, version, correlationId, code: responseCode, payload } + return { size, key, version, correlationId, code, payload } } function decodeDeliverResponse(dataResponse: DataReader, logger: Logger): MessageAndSubId { @@ -232,30 +250,26 @@ export class BufferDataReader implements DataReader { } } -function isTuneResponse( - params: RawResponse | RawTuneResponse | RawHeartbeatResponse | RawMetadataUpdateResponse | RawDeliverResponse -): params is RawTuneResponse { +function isTuneResponse(params: PossibleRawResponses): params is RawTuneResponse { return params.key === TuneResponse.key } -function isHeartbeatResponse( - params: RawResponse | RawTuneResponse | RawHeartbeatResponse | RawMetadataUpdateResponse | RawDeliverResponse -): params is RawHeartbeatResponse { +function isHeartbeatResponse(params: PossibleRawResponses): params is RawHeartbeatResponse { return params.key === HeartbeatResponse.key } -function isMetadataUpdateResponse( - params: RawResponse | RawTuneResponse | RawHeartbeatResponse | RawMetadataUpdateResponse | RawDeliverResponse -): params is RawMetadataUpdateResponse { +function isMetadataUpdateResponse(params: PossibleRawResponses): params is RawMetadataUpdateResponse { return params.key === MetadataUpdateResponse.key } -function isDeliverResponse( - params: RawResponse | RawTuneResponse | RawHeartbeatResponse | RawMetadataUpdateResponse | RawDeliverResponse -): params is RawDeliverResponse { +function isDeliverResponse(params: PossibleRawResponses): params is RawDeliverResponse { return params.key === DeliverResponse.key } +function isCreditResponse(params: PossibleRawResponses): params is RawCreditResponse { + return params.key === CreditResponse.key +} + export class ResponseDecoder { private responseFactories = new Map() private emitter = new EventEmitter() @@ -288,13 +302,19 @@ export class ResponseDecoder { } else if (isDeliverResponse(response)) { this.emitter.emit("deliver", new DeliverResponse(response)) this.logger.debug(`deliver received from the server: ${inspect(response)}`) + } else if (isCreditResponse(response)) { + this.logger.debug(`credit received from the server: ${inspect(response)}`) + this.emitter.emit("credit_response", new CreditResponse(response)) } else { this.emitResponseReceived(response) } } } - public on(event: "metadata_update" | "deliver", listener: MetadataUpdateListener | DeliverListener) { + public on( + event: "metadata_update" | "credit_response" | "deliver", + listener: MetadataUpdateListener | CreditListener | DeliverListener + ) { this.emitter.on(event, listener) } diff --git a/src/responses/credit_response.ts b/src/responses/credit_response.ts new file mode 100644 index 00000000..963a70b8 --- /dev/null +++ b/src/responses/credit_response.ts @@ -0,0 +1,47 @@ +import { BufferDataWriter } from "../requests/abstract_request" +import { RawCreditResponse } from "./raw_response" +import { Response } from "./response" + +export class CreditResponse implements Response { + static key = 0x8009 as const + + constructor(private response: RawCreditResponse) { + if (this.response.key !== CreditResponse.key) { + throw new Error(`Unable to create ${CreditResponse.name} from data of type ${this.response.key}`) + } + } + + toBuffer(): Buffer { + const dw = new BufferDataWriter(Buffer.alloc(1024), 4) + dw.writeUInt16(CreditResponse.key) + dw.writeUInt16(1) + dw.writeUInt16(this.response.responseCode) + dw.writeUInt8(this.response.subscriptionId) + dw.writePrefixSize() + return dw.toBuffer() + } + + get key() { + return this.response.key + } + + get correlationId(): number { + return -1 + } + + get code(): number { + return -1 + } + + get ok(): boolean { + return true + } + + get responseCode(): number { + return this.response.responseCode + } + + get subscriptionId(): number { + return this.response.subscriptionId + } +} diff --git a/src/responses/raw_response.ts b/src/responses/raw_response.ts index fd0f0487..00c68d0f 100644 --- a/src/responses/raw_response.ts +++ b/src/responses/raw_response.ts @@ -52,6 +52,14 @@ export interface MetadataInfo { stream: string } +export interface RawCreditResponse { + size: number + key: 0x8009 + version: number + responseCode: number + subscriptionId: number +} + export interface RawHeartbeatResponse { key: 0x0014 version: number diff --git a/test/e2e/credit.test.ts b/test/e2e/credit.test.ts new file mode 100644 index 00000000..6eebcb27 --- /dev/null +++ b/test/e2e/credit.test.ts @@ -0,0 +1,54 @@ +import { expect } from "chai" +import { Connection, connect } from "../../src" +import { Rabbit } from "../support/rabbit" +import { eventually } from "../support/util" +import { Offset } from "../../src/requests/subscribe_request" +import { Message } from "../../src/producer" + +describe("credit management", () => { + const rabbit = new Rabbit() + const streamName = "credit-test-stream" + let connection: Connection + + beforeEach(async () => { + connection = await connect({ + hostname: "localhost", + port: 5552, + username: "rabbit", + password: "rabbit", + vhost: "/", + frameMax: 0, // not used + heartbeat: 0, // not used + listeners: { + metadata_update: (_data) => console.info("Subscribe server error"), + }, + }) + await rabbit.createStream(streamName) + }) + + afterEach(async () => { + await connection.close() + await rabbit.deleteStream(streamName) + }) + + // This test can only run locally, the HTTP API gives different results in GitHub CI (https://coders51.slack.com/archives/C03E263HH38/p1681374600592829) + it.skip(`the number of credit remain stable after have consumed some messages`, async () => { + const receivedMessages: Buffer[] = [] + const howMany = 2 + const messages = Array.from(Array(howMany).keys()).map((_) => Buffer.from("hello")) + const publisher = await connection.declarePublisher({ stream: streamName }) + for (const m of messages) { + await publisher.send(m) + } + + await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (message: Message) => + receivedMessages.push(message.content) + ) + + await eventually(async () => { + expect(receivedMessages).eql(messages) + const allConsumerCredits = await rabbit.returnConsumersCredits() + expect(allConsumerCredits[0].allCredits[0]).eql(10) + }, 5000) + }).timeout(20000) +}) diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index c9370c40..30976eea 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -12,6 +12,9 @@ describe("declare consumer", () => { let connection: Connection beforeEach(async () => { + try { + await rabbit.deleteStream(testStreamName) + } catch (error) {} await rabbit.createStream(testStreamName) connection = await connect({ diff --git a/test/support/rabbit.ts b/test/support/rabbit.ts index 4de61e7b..fe74ff6a 100644 --- a/test/support/rabbit.ts +++ b/test/support/rabbit.ts @@ -4,6 +4,11 @@ interface RabbitConnectionResponse { name: string } +interface RabbitConsumerCredits { + connectionName: string + allCredits: number[] +} + // not completed interface MessageInfoResponse { messages: number @@ -22,10 +27,26 @@ interface RabbitConsumersResponseQueue { vhost: string } +interface RabbitChannelDetails { + connection_name: string + name: string + node: string + number: number + peer_host: string + peer_port: number + user: string +} + interface RabbitConsumersResponse { queue: RabbitConsumersResponseQueue consumer_tag: string + channel_details: RabbitChannelDetails +} + +interface RabbitConnectionDetails { + credits: number } + interface RabbitQueueResponse { arguments: Record auto_delete: boolean @@ -117,6 +138,29 @@ export class Rabbit { return resp.body.map((p) => p.consumer_tag) } + async returnConsumersCredits(): Promise { + const allConsumerCredits: RabbitConsumerCredits[] = [] + const allConsumersResp = await got.get(`http://localhost:15672/api/consumers`, { + username: "rabbit", + password: "rabbit", + responseType: "json", + }) + const consumerChannelDetails = allConsumersResp.body.map((d) => d.channel_details) + for (const consumerChannelDetail of consumerChannelDetails) { + const connectionName = consumerChannelDetail.connection_name + const resp = await got.get( + `http://localhost:15672/api/stream/connections/%2F/${connectionName}/consumers`, + { + username: "rabbit", + password: "rabbit", + responseType: "json", + } + ) + allConsumerCredits.push({ connectionName, allCredits: resp.body.map((rcd) => rcd.credits) }) + } + return allConsumerCredits + } + async getQueue(vhost: string = "%2F", name: string): Promise { const ret = await got.get(`http://localhost:15672/api/queues/${vhost}/${name}`, { username: "rabbit", diff --git a/test/support/util.ts b/test/support/util.ts index fde80728..2c77b467 100644 --- a/test/support/util.ts +++ b/test/support/util.ts @@ -13,7 +13,7 @@ export async function eventually(fn: Function, timeout = 1500) { } catch (error) { if (elapsedFrom(start) > timeout) { if (error instanceof AssertionError) throw error - expect.fail((error as Error).message) + expect.fail(error as string) } await wait(5) }