Skip to content

Commit

Permalink
Credit (#75)
Browse files Browse the repository at this point in the history
* WIP: Credit, handle credit response from server

* Add tests for credit

* Remove unused credit response

* Fix for npm check

* Add test for credit and credit request, missing last expect for now

* Add credit number check test through http api

* Changes for review, still wip

* Increase timeout

* Skipping consumer test to debug

* Skipping credit test to debug

* Increase eventually time

* Move connection close in test and increase timeout

* Debug credit

* Remove only test in credit

* Skip of credit test

* Fix wrong function removal in util

* Removal of console logs

* Removal of unused function

* fix minimal code coherence typos

* Fix pr comments

---------

Co-authored-by: magne <magnello@coders51.com>
Co-authored-by: GPad <gpadovani@gmail.com>
Co-authored-by: Alberto Barrilà <alberto.barrila@gmail.com>
  • Loading branch information
4 people committed May 2, 2023
1 parent 0bb38e7 commit 8159eea
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 30 deletions.
20 changes: 15 additions & 5 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { SubscribeResponse } from "./responses/subscribe_response"
import { Offset, SubscribeRequest } from "./requests/subscribe_request"
import { Consumer, ConsumerFunc } from "./consumer"
import { DeliverResponse } from "./responses/deliver_response"
import { CreditRequest, CreditRequestParams } from "./requests/credit_request"

export class Connection {
private readonly socket = new Socket()
Expand Down Expand Up @@ -87,8 +88,8 @@ export class Connection {
})
}

public on(_event: "metadata_update", listener: MetadataUpdateListener) {
this.decoder.on("metadata_update", listener)
public on(event: "metadata_update", listener: MetadataUpdateListener) {
this.decoder.on(event, listener)
}

public async close(
Expand Down Expand Up @@ -234,6 +235,10 @@ export class Connection {
return res
}

private askForCredit(params: CreditRequestParams): Promise<void> {
return this.send(new CreditRequest({ ...params }))
}

private async exchangeProperties(): Promise<PeerPropertiesResponse> {
this.logger.debug(`Exchange peer properties ...`)
const res = await this.sendAndWait<PeerPropertiesResponse>(new PeerPropertiesRequest())
Expand Down Expand Up @@ -329,18 +334,23 @@ export class Connection {
}

private registerListeners(listeners?: ListenersParams) {
if (listeners) this.decoder.on("metadata_update", listeners.metadata_update)
if (listeners) {
this.on("metadata_update", listeners.metadata_update)
}
}

private registerDelivers() {
this.decoder.on("deliver", (response: DeliverResponse) => {
this.decoder.on("deliver", async (response: DeliverResponse) => {
this.logger.debug(`on deliver -> ${inspect(response)} - consumers: ${this.consumers}`)
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId })
response.messages.map((x) => this.consumers[response.subscriptionId].handle(x))
})
}
}

type ListenersParams = Record<"metadata_update", MetadataUpdateListener>
type ListenersParams = {
metadata_update: MetadataUpdateListener
}

export interface ConnectionParams {
hostname: string
Expand Down
21 changes: 21 additions & 0 deletions src/requests/credit_request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { AbstractRequest } from "./abstract_request"
import { DataWriter } from "./data_writer"

export type CreditRequestParams = {
subscriptionId: number
credit: number
}

export class CreditRequest extends AbstractRequest {
readonly key = 0x09
readonly responseKey = -1

constructor(private params: CreditRequestParams) {
super()
}

protected writeContent(writer: DataWriter): void {
writer.writeUInt8(this.params.subscriptionId)
writer.writeUInt16(this.params.credit)
}
}
68 changes: 44 additions & 24 deletions src/response_decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { PeerPropertiesResponse } from "./responses/peer_properties_response"
import {
DataReader,
RawDeliverResponse,
RawCreditResponse,
RawHeartbeatResponse,
RawMetadataUpdateResponse,
RawResponse,
Expand All @@ -26,6 +27,7 @@ import { EventEmitter } from "events"
import { SubscribeResponse } from "./responses/subscribe_response"
import { DeliverResponse } from "./responses/deliver_response"
import { FormatCodeType, FormatCode } from "./amqp10/decoder"
import { CreditResponse } from "./responses/credit_response"

// Frame => Size (Request | Response | Command)
// Size => uint32 (size without the 4 bytes of the size element)
Expand All @@ -37,25 +39,27 @@ import { FormatCodeType, FormatCode } from "./amqp10/decoder"
// ResponseCode => uint16

export type MetadataUpdateListener = (metadata: MetadataUpdateResponse) => void
export type CreditListener = (creditResponse: CreditResponse) => void
export type DeliverListener = (response: DeliverResponse) => void
type MessageAndSubId = {
subscriptionId: number
messages: Buffer[]
}

function decode(
data: DataReader,
logger: Logger
): RawResponse | RawTuneResponse | RawHeartbeatResponse | RawMetadataUpdateResponse | RawDeliverResponse {
type PossibleRawResponses =
| RawResponse
| RawTuneResponse
| RawHeartbeatResponse
| RawMetadataUpdateResponse
| RawDeliverResponse
| RawCreditResponse

function decode(data: DataReader, logger: Logger): PossibleRawResponses {
const size = data.readUInt32()
return decodeResponse(data.readTo(size), size, logger)
}

function decodeResponse(
dataResponse: DataReader,
size: number,
logger: Logger
): RawResponse | RawTuneResponse | RawHeartbeatResponse | RawMetadataUpdateResponse | RawDeliverResponse {
function decodeResponse(dataResponse: DataReader, size: number, logger: Logger): PossibleRawResponses {
const key = dataResponse.readUInt16()
const version = dataResponse.readUInt16()
if (key === DeliverResponse.key) {
Expand Down Expand Up @@ -84,10 +88,24 @@ function decodeResponse(
}
return { size, key, version, metadataInfo } as RawMetadataUpdateResponse
}

if (key === CreditResponse.key) {
const responseCode = dataResponse.readUInt16()
const subscriptionId = dataResponse.readUInt8()
const response: RawCreditResponse = {
size,
key,
version,
responseCode,
subscriptionId,
}
return response
}

const correlationId = dataResponse.readUInt32()
const responseCode = dataResponse.readUInt16()
const code = dataResponse.readUInt16()
const payload = dataResponse.readToEnd()
return { size, key, version, correlationId, code: responseCode, payload }
return { size, key, version, correlationId, code, payload }
}

function decodeDeliverResponse(dataResponse: DataReader, logger: Logger): MessageAndSubId {
Expand Down Expand Up @@ -232,30 +250,26 @@ export class BufferDataReader implements DataReader {
}
}

function isTuneResponse(
params: RawResponse | RawTuneResponse | RawHeartbeatResponse | RawMetadataUpdateResponse | RawDeliverResponse
): params is RawTuneResponse {
function isTuneResponse(params: PossibleRawResponses): params is RawTuneResponse {
return params.key === TuneResponse.key
}

function isHeartbeatResponse(
params: RawResponse | RawTuneResponse | RawHeartbeatResponse | RawMetadataUpdateResponse | RawDeliverResponse
): params is RawHeartbeatResponse {
function isHeartbeatResponse(params: PossibleRawResponses): params is RawHeartbeatResponse {
return params.key === HeartbeatResponse.key
}

function isMetadataUpdateResponse(
params: RawResponse | RawTuneResponse | RawHeartbeatResponse | RawMetadataUpdateResponse | RawDeliverResponse
): params is RawMetadataUpdateResponse {
function isMetadataUpdateResponse(params: PossibleRawResponses): params is RawMetadataUpdateResponse {
return params.key === MetadataUpdateResponse.key
}

function isDeliverResponse(
params: RawResponse | RawTuneResponse | RawHeartbeatResponse | RawMetadataUpdateResponse | RawDeliverResponse
): params is RawDeliverResponse {
function isDeliverResponse(params: PossibleRawResponses): params is RawDeliverResponse {
return params.key === DeliverResponse.key
}

function isCreditResponse(params: PossibleRawResponses): params is RawCreditResponse {
return params.key === CreditResponse.key
}

export class ResponseDecoder {
private responseFactories = new Map<number, AbstractTypeClass>()
private emitter = new EventEmitter()
Expand Down Expand Up @@ -288,13 +302,19 @@ export class ResponseDecoder {
} else if (isDeliverResponse(response)) {
this.emitter.emit("deliver", new DeliverResponse(response))
this.logger.debug(`deliver received from the server: ${inspect(response)}`)
} else if (isCreditResponse(response)) {
this.logger.debug(`credit received from the server: ${inspect(response)}`)
this.emitter.emit("credit_response", new CreditResponse(response))
} else {
this.emitResponseReceived(response)
}
}
}

public on(event: "metadata_update" | "deliver", listener: MetadataUpdateListener | DeliverListener) {
public on(
event: "metadata_update" | "credit_response" | "deliver",
listener: MetadataUpdateListener | CreditListener | DeliverListener
) {
this.emitter.on(event, listener)
}

Expand Down
47 changes: 47 additions & 0 deletions src/responses/credit_response.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { BufferDataWriter } from "../requests/abstract_request"
import { RawCreditResponse } from "./raw_response"
import { Response } from "./response"

export class CreditResponse implements Response {
static key = 0x8009 as const

constructor(private response: RawCreditResponse) {
if (this.response.key !== CreditResponse.key) {
throw new Error(`Unable to create ${CreditResponse.name} from data of type ${this.response.key}`)
}
}

toBuffer(): Buffer {
const dw = new BufferDataWriter(Buffer.alloc(1024), 4)
dw.writeUInt16(CreditResponse.key)
dw.writeUInt16(1)
dw.writeUInt16(this.response.responseCode)
dw.writeUInt8(this.response.subscriptionId)
dw.writePrefixSize()
return dw.toBuffer()
}

get key() {
return this.response.key
}

get correlationId(): number {
return -1
}

get code(): number {
return -1
}

get ok(): boolean {
return true
}

get responseCode(): number {
return this.response.responseCode
}

get subscriptionId(): number {
return this.response.subscriptionId
}
}
8 changes: 8 additions & 0 deletions src/responses/raw_response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ export interface MetadataInfo {
stream: string
}

export interface RawCreditResponse {
size: number
key: 0x8009
version: number
responseCode: number
subscriptionId: number
}

export interface RawHeartbeatResponse {
key: 0x0014
version: number
Expand Down
54 changes: 54 additions & 0 deletions test/e2e/credit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { expect } from "chai"
import { Connection, connect } from "../../src"
import { Rabbit } from "../support/rabbit"
import { eventually } from "../support/util"
import { Offset } from "../../src/requests/subscribe_request"
import { Message } from "../../src/producer"

describe("credit management", () => {
const rabbit = new Rabbit()
const streamName = "credit-test-stream"
let connection: Connection

beforeEach(async () => {
connection = await connect({
hostname: "localhost",
port: 5552,
username: "rabbit",
password: "rabbit",
vhost: "/",
frameMax: 0, // not used
heartbeat: 0, // not used
listeners: {
metadata_update: (_data) => console.info("Subscribe server error"),
},
})
await rabbit.createStream(streamName)
})

afterEach(async () => {
await connection.close()
await rabbit.deleteStream(streamName)
})

// This test can only run locally, the HTTP API gives different results in GitHub CI (https://coders51.slack.com/archives/C03E263HH38/p1681374600592829)
it.skip(`the number of credit remain stable after have consumed some messages`, async () => {
const receivedMessages: Buffer[] = []
const howMany = 2
const messages = Array.from(Array(howMany).keys()).map((_) => Buffer.from("hello"))
const publisher = await connection.declarePublisher({ stream: streamName })
for (const m of messages) {
await publisher.send(m)
}

await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (message: Message) =>
receivedMessages.push(message.content)
)

await eventually(async () => {
expect(receivedMessages).eql(messages)
const allConsumerCredits = await rabbit.returnConsumersCredits()
expect(allConsumerCredits[0].allCredits[0]).eql(10)
}, 5000)
}).timeout(20000)
})
3 changes: 3 additions & 0 deletions test/e2e/declare_consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ describe("declare consumer", () => {
let connection: Connection

beforeEach(async () => {
try {
await rabbit.deleteStream(testStreamName)
} catch (error) {}
await rabbit.createStream(testStreamName)

connection = await connect({
Expand Down
Loading

0 comments on commit 8159eea

Please sign in to comment.