From 3108dcf878e662ddbd9cca85f8292ddead6b1feb Mon Sep 17 00:00:00 2001 From: magne Date: Tue, 23 Jan 2024 16:15:14 +0100 Subject: [PATCH 1/8] Adds filtering feature with publish request v2 and deliver v2 --- src/client.ts | 44 ++++--- src/publisher.ts | 53 +++++--- src/requests/abstract_request.ts | 112 +---------------- src/requests/buffer_data_writer.ts | 118 ++++++++++++++++++ src/requests/close_request.ts | 12 +- src/requests/consumer_update_response.ts | 12 +- src/requests/create_stream_request.ts | 12 +- src/requests/create_super_stream_request.ts | 12 +- src/requests/credit_request.ts | 12 +- src/requests/data_writer.ts | 1 + src/requests/declare_publisher_request.ts | 12 +- src/requests/delete_publisher_request.ts | 12 +- src/requests/delete_stream_request.ts | 12 +- src/requests/delete_super_stream_request.ts | 12 +- .../exchange_command_versions_request.ts | 12 +- src/requests/heartbeat_request.ts | 12 +- src/requests/metadata_request.ts | 12 +- src/requests/metadata_update_request.ts | 12 +- src/requests/open_request.ts | 12 +- src/requests/partitions_query.ts | 12 +- src/requests/peer_properties_request.ts | 12 +- src/requests/publish_request.ts | 13 +- src/requests/publish_request_v2.ts | 38 ++++++ src/requests/query_offset_request.ts | 12 +- src/requests/query_publisher_request.ts | 12 +- src/requests/request.ts | 2 +- src/requests/requests.ts | 1 + src/requests/route_query.ts | 12 +- src/requests/sasl_authenticate_request.ts | 12 +- src/requests/sasl_handshake_request.ts | 12 +- src/requests/store_offset_request.ts | 12 +- src/requests/stream_stats_request.ts | 12 +- .../sub_entry_batch_publish_request.ts | 15 ++- src/requests/subscribe_request.ts | 12 +- src/requests/tune_request.ts | 12 +- src/requests/unsubscribe_request.ts | 12 +- src/response_decoder.ts | 55 ++++++-- src/responses/consumer_update_query.ts | 2 +- src/responses/credit_response.ts | 2 +- src/responses/deliver_response.ts | 2 +- src/responses/deliver_response_v2.ts | 55 ++++++++ src/responses/metadata_update_response.ts | 2 +- src/responses/publish_confirm_response.ts | 2 +- src/responses/publish_error_response.ts | 2 +- src/responses/raw_response.ts | 9 ++ src/responses/responses.ts | 1 + src/responses/tune_response.ts | 2 +- src/versions.ts | 12 +- test/e2e/declare_consumer.test.ts | 2 +- test/e2e/filtering.test.ts | 50 ++++++++ test/support/fake_data.ts | 2 +- test/unit/buffer_data_writer.test.ts | 2 +- test/unit/response_decoder.test.ts | 2 +- test/unit/versions.test.ts | 4 +- 54 files changed, 689 insertions(+), 228 deletions(-) create mode 100644 src/requests/buffer_data_writer.ts create mode 100644 src/requests/publish_request_v2.ts create mode 100755 src/responses/deliver_response_v2.ts create mode 100644 test/e2e/filtering.test.ts diff --git a/src/client.ts b/src/client.ts index 4805d676..072f3aed 100644 --- a/src/client.ts +++ b/src/client.ts @@ -41,6 +41,7 @@ import { DeleteSuperStreamRequest } from "./requests/delete_super_stream_request import { lt, coerce } from "semver" import { ConnectionInfo, Connection, errorMessageOf } from "./connection" import { ConnectionPool } from "./connection_pool" +import { DeliverResponseV2 } from "./responses/deliver_response_v2" export type ConnectionClosedListener = (hadError: boolean) => void @@ -125,7 +126,7 @@ export class Client { return res.streams } - public async declarePublisher(params: DeclarePublisherParams): Promise { + public async declarePublisher(params: DeclarePublisherParams, filter?: FilterFunc): Promise { const { stream, publisherRef } = params const publisherId = this.incPublisherId() @@ -137,16 +138,23 @@ export class Client { await connection.close() throw new Error(`Declare Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } - const publisher = new StreamPublisher({ - connection: connection, - stream: params.stream, - publisherId: publisherId, - publisherRef: params.publisherRef, - boot: params.boot, - maxFrameSize: this.maxFrameSize, - maxChunkLength: params.maxChunkLength, - logger: this.logger, - }) + if (filter && !connection.filteringEnabled) { + await connection.close() + throw new Error(`Broker does not support message filtering.`) + } + const publisher = new StreamPublisher( + { + connection: connection, + stream: params.stream, + publisherId: publisherId, + publisherRef: params.publisherRef, + boot: params.boot, + maxFrameSize: this.maxFrameSize, + maxChunkLength: params.maxChunkLength, + logger: this.logger, + }, + filter + ) this.publishers.set(publisherId, { publisher: publisher, connection: connection }) this.logger.info( `New publisher created with stream name ${params.stream}, publisher id ${publisherId} and publisher reference ${params.publisherRef}` @@ -411,16 +419,16 @@ export class Client { } private getDeliverCallback() { - return async (response: DeliverResponse) => { - const consumer = this.consumers.get(response.subscriptionId) + return async (deliverVersion: "deliverV1" | "deliverV2", deliverResponse: DeliverResponse | DeliverResponseV2) => { + const consumer = this.consumers.get(deliverResponse.subscriptionId) if (!consumer) { - this.logger.error(`On deliver no consumer found`) + this.logger.error(`On ${deliverVersion} no consumer found`) return } - this.logger.debug(`on deliver -> ${consumer.consumerRef}`) - this.logger.debug(`response.messages.length: ${response.messages.length}`) - await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }) - response.messages.map((x) => consumer.handle(x)) + this.logger.debug(`on ${deliverVersion} -> ${consumer.consumerRef}`) + this.logger.debug(`deliverResponse.messages.length: ${deliverResponse.messages.length}`) + await this.askForCredit({ credit: 1, subscriptionId: deliverResponse.subscriptionId }) + deliverResponse.messages.map((x) => consumer.handle(x)) } } diff --git a/src/publisher.ts b/src/publisher.ts index fbad5e19..3f4b387d 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -7,10 +7,12 @@ import { PublishRequest, PublishRequestMessage } from "./requests/publish_reques import { SubEntryBatchPublishRequest } from "./requests/sub_entry_batch_publish_request" import { PublishConfirmResponse } from "./responses/publish_confirm_response" import { PublishErrorResponse } from "./responses/publish_error_response" -import { DEFAULT_UNLIMITED_FRAME_MAX } from "./util" +import { DEFAULT_UNLIMITED_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION } from "./util" import { MetadataUpdateListener } from "./response_decoder" import { ConnectionInfo, Connection } from "./connection" import { ConnectionPool } from "./connection_pool" +import { coerce, lt } from "semver" +import { PublishRequestV2 } from "./requests/publish_request_v2" export type MessageApplicationProperties = Record @@ -70,6 +72,7 @@ export interface Publisher { readonly publisherId: number } +export type FilterFunc = (msg: Message) => string | undefined type PublishConfirmCallback = (err: number | null, publishingIds: bigint[]) => void export class StreamPublisher implements Publisher { private connection: Connection @@ -85,16 +88,19 @@ export class StreamPublisher implements Publisher { private maxChunkLength: number private closed = false - constructor(params: { - connection: Connection - stream: string - publisherId: number - publisherRef?: string - boot?: boolean - maxFrameSize: number - maxChunkLength?: number - logger: Logger - }) { + constructor( + params: { + connection: Connection + stream: string + publisherId: number + publisherRef?: string + boot?: boolean + maxFrameSize: number + maxChunkLength?: number + logger: Logger + }, + private readonly filter?: FilterFunc + ) { this.connection = params.connection this.stream = params.stream this.publisherId = params.publisherId @@ -187,6 +193,12 @@ export class StreamPublisher implements Publisher { } private async enqueue(publishRequestMessage: PublishRequestMessage) { + if (this.filter) { + publishRequestMessage.filterValue = this.filter(publishRequestMessage.message) + } + if (!this.connection.isFilteringEnabled && this.filter) { + throw new Error(`Your rabbit server management version does not support filtering.`) + } this.checkMessageSize(publishRequestMessage) const sendCycleNeeded = this.add(publishRequestMessage) let sent = false @@ -211,12 +223,19 @@ export class StreamPublisher implements Publisher { private async sendBuffer() { const chunk = this.popChunk() if (chunk.length > 0) { - await this.connection.send( - new PublishRequest({ - publisherId: this.publisherId, - messages: chunk, - }) - ) + this.filter + ? await this.connection.send( + new PublishRequestV2({ + publisherId: this.publisherId, + messages: chunk, + }) + ) + : await this.connection.send( + new PublishRequest({ + publisherId: this.publisherId, + messages: chunk, + }) + ) } } diff --git a/src/requests/abstract_request.ts b/src/requests/abstract_request.ts index ae248352..5448a020 100644 --- a/src/requests/abstract_request.ts +++ b/src/requests/abstract_request.ts @@ -1,119 +1,11 @@ -import { DEFAULT_UNLIMITED_FRAME_MAX } from "../util" +import { BufferDataWriter } from "./buffer_data_writer" import { DataWriter } from "./data_writer" import { BufferSizeParams, Request } from "./request" -export class BufferDataWriter implements DataWriter { - private _offset = 0 - private readonly maxBufferSize: number - private readonly growthTriggerRatio: number - private readonly sizeMultiplier: number - - constructor(private buffer: Buffer, startFrom: number, bufferSizeParameters?: BufferSizeParams) { - this._offset = startFrom - this.maxBufferSize = bufferSizeParameters?.maxSize ?? 1048576 - this.growthTriggerRatio = bufferSizeParameters?.maxRatio ?? 0.9 - this.sizeMultiplier = bufferSizeParameters?.multiplier ?? 2 - } - - get offset() { - return this._offset - } - - writePrefixSize() { - this.buffer.writeUInt32BE(this._offset - 4, 0) - } - - writeData(data: string | Buffer): void { - this.growIfNeeded(Buffer.byteLength(data, "utf-8")) - if (Buffer.isBuffer(data)) { - this._offset += data.copy(this.buffer, this._offset) - return - } - this._offset += this.buffer.write(data, this._offset) - } - - writeByte(data: number): void { - const bytes = 1 - this.growIfNeeded(bytes) - this._offset = this.buffer.writeUInt8(data, this._offset) - } - - writeInt8(data: number) { - const bytes = 1 - this.growIfNeeded(bytes) - this._offset = this.buffer.writeInt8(data, this._offset) - } - - writeUInt8(data: number): void { - const bytes = 1 - this.growIfNeeded(bytes) - this._offset = this.buffer.writeUInt8(data, this._offset) - } - - writeUInt16(data: number) { - const bytes = 2 - this.growIfNeeded(bytes) - this._offset = this.buffer.writeUInt16BE(data, this._offset) - } - - writeUInt32(data: number): void { - const bytes = 4 - this.growIfNeeded(bytes) - this._offset = this.buffer.writeUInt32BE(data, this._offset) - } - - writeInt32(data: number): void { - const bytes = 4 - this.growIfNeeded(bytes) - this._offset = this.buffer.writeInt32BE(data, this._offset) - } - - writeUInt64(data: bigint): void { - const bytes = 8 - this.growIfNeeded(bytes) - this._offset = this.buffer.writeBigUInt64BE(data, this._offset) - } - - writeInt64(data: bigint): void { - const bytes = 8 - this.growIfNeeded(bytes) - this._offset = this.buffer.writeBigInt64BE(data, this._offset) - } - - writeString(data: string): void { - const bytes = 2 - this.growIfNeeded(bytes) - this._offset = this.buffer.writeInt16BE(data.length, this._offset) - this.writeData(data) - } - - toBuffer(): Buffer { - return this.buffer.slice(0, this._offset) - } - - private growIfNeeded(additionalBytes: number) { - if ((this._offset + additionalBytes) / this.buffer.length > this.growthTriggerRatio) { - this.growBuffer(additionalBytes) - } - } - - private growBuffer(requiredBytes: number) { - const newSize = this.getNewSize(requiredBytes) - const data = Buffer.from(this.buffer) - this.buffer = Buffer.alloc(newSize) - data.copy(this.buffer, 0) - } - - private getNewSize(requiredBytes: number) { - const requiredNewSize = this.buffer.length * this.sizeMultiplier + this._offset + requiredBytes - if (this.maxBufferSize === DEFAULT_UNLIMITED_FRAME_MAX) return requiredNewSize - return Math.min(requiredNewSize, this.maxBufferSize) - } -} export abstract class AbstractRequest implements Request { abstract get key(): number abstract get responseKey(): number - readonly version = 1 + abstract get version(): number toBuffer(bufferSizeParams?: BufferSizeParams, correlationId?: number): Buffer { const initialSize = bufferSizeParams?.initialSize ?? 65536 diff --git a/src/requests/buffer_data_writer.ts b/src/requests/buffer_data_writer.ts new file mode 100644 index 00000000..556871f8 --- /dev/null +++ b/src/requests/buffer_data_writer.ts @@ -0,0 +1,118 @@ +import { DEFAULT_UNLIMITED_FRAME_MAX } from "../util" +import { DataWriter } from "./data_writer" +import { BufferSizeParams } from "./request" + +export class BufferDataWriter implements DataWriter { + private _offset = 0 + private readonly maxBufferSize: number + private readonly growthTriggerRatio: number + private readonly sizeMultiplier: number + + constructor(private buffer: Buffer, startFrom: number, bufferSizeParameters?: BufferSizeParams) { + this._offset = startFrom + this.maxBufferSize = bufferSizeParameters?.maxSize ?? 1048576 + this.growthTriggerRatio = bufferSizeParameters?.maxRatio ?? 0.9 + this.sizeMultiplier = bufferSizeParameters?.multiplier ?? 2 + } + + get offset() { + return this._offset + } + + writePrefixSize() { + this.buffer.writeUInt32BE(this._offset - 4, 0) + } + + writeData(data: string | Buffer): void { + this.growIfNeeded(Buffer.byteLength(data, "utf-8")) + if (Buffer.isBuffer(data)) { + this._offset += data.copy(this.buffer, this._offset) + return + } + this._offset += this.buffer.write(data, this._offset) + } + + writeByte(data: number): void { + const bytes = 1 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeUInt8(data, this._offset) + } + + writeInt8(data: number) { + const bytes = 1 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeInt8(data, this._offset) + } + + writeUInt8(data: number): void { + const bytes = 1 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeUInt8(data, this._offset) + } + + writeInt16(data: number) { + const bytes = 2 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeInt16BE(data, this._offset) + } + + writeUInt16(data: number) { + const bytes = 2 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeUInt16BE(data, this._offset) + } + + writeUInt32(data: number): void { + const bytes = 4 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeUInt32BE(data, this._offset) + } + + writeInt32(data: number): void { + const bytes = 4 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeInt32BE(data, this._offset) + } + + writeUInt64(data: bigint): void { + const bytes = 8 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeBigUInt64BE(data, this._offset) + } + + writeInt64(data: bigint): void { + const bytes = 8 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeBigInt64BE(data, this._offset) + } + + writeString(data: string): void { + const bytes = 2 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeInt16BE(data.length, this._offset) + this.writeData(data) + } + + toBuffer(): Buffer { + return this.buffer.slice(0, this._offset) + } + + private growIfNeeded(additionalBytes: number) { + if ((this._offset + additionalBytes) / this.buffer.length > this.growthTriggerRatio) { + this.growBuffer(additionalBytes) + } + } + + private growBuffer(requiredBytes: number) { + const newSize = this.getNewSize(requiredBytes) + const data = Buffer.from(this.buffer) + this.buffer = Buffer.alloc(newSize) + data.copy(this.buffer, 0) + } + + private getNewSize(requiredBytes: number) { + const requiredNewSize = this.buffer.length * this.sizeMultiplier + this._offset + requiredBytes + if (this.maxBufferSize === DEFAULT_UNLIMITED_FRAME_MAX) return requiredNewSize + return Math.min(requiredNewSize, this.maxBufferSize) + } +} diff --git a/src/requests/close_request.ts b/src/requests/close_request.ts index cadbbed8..f39f468f 100644 --- a/src/requests/close_request.ts +++ b/src/requests/close_request.ts @@ -3,10 +3,8 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class CloseRequest extends AbstractRequest { - readonly responseKey = CloseResponse.key static readonly Key = 0x0016 static readonly Version = 1 - readonly key = CloseRequest.Key constructor(private params: { closingCode: number; closingReason: string }) { super() @@ -16,4 +14,14 @@ export class CloseRequest extends AbstractRequest { writer.writeUInt16(this.params.closingCode) writer.writeString(this.params.closingReason) } + + get key(): number { + return CloseRequest.Key + } + get responseKey(): number { + return CloseResponse.key + } + get version(): number { + return CloseRequest.Version + } } diff --git a/src/requests/consumer_update_response.ts b/src/requests/consumer_update_response.ts index 1f4e17d3..57767c98 100644 --- a/src/requests/consumer_update_response.ts +++ b/src/requests/consumer_update_response.ts @@ -4,10 +4,8 @@ import { DataWriter } from "./data_writer" import { Offset } from "./subscribe_request" export class ConsumerUpdateResponse extends AbstractRequest { - readonly responseKey = ConsumerUpdateQuery.key static readonly Key = 0x801a static readonly Version = 1 - readonly key = ConsumerUpdateResponse.Key constructor(private params: { correlationId: number; responseCode: number; offset: Offset }) { super() @@ -18,4 +16,14 @@ export class ConsumerUpdateResponse extends AbstractRequest { b.writeUInt16(this.params.responseCode) this.params.offset.write(b) } + + get key(): number { + return ConsumerUpdateResponse.Key + } + get responseKey(): number { + return ConsumerUpdateQuery.key + } + get version(): number { + return ConsumerUpdateResponse.Version + } } diff --git a/src/requests/create_stream_request.ts b/src/requests/create_stream_request.ts index 85b8b221..a1931d22 100644 --- a/src/requests/create_stream_request.ts +++ b/src/requests/create_stream_request.ts @@ -11,10 +11,8 @@ export interface CreateStreamArguments { } export class CreateStreamRequest extends AbstractRequest { - readonly responseKey = CreateStreamResponse.key static readonly Key = 0x000d static readonly Version = 1 - readonly key = CreateStreamRequest.Key private readonly _arguments: { key: keyof CreateStreamArguments; value: string | number }[] = [] private readonly stream: string @@ -40,4 +38,14 @@ export class CreateStreamRequest extends AbstractRequest { writer.writeString(value.toString()) }) } + + get key(): number { + return CreateStreamRequest.Key + } + get responseKey(): number { + return CreateStreamResponse.key + } + get version(): number { + return CreateStreamRequest.Version + } } diff --git a/src/requests/create_super_stream_request.ts b/src/requests/create_super_stream_request.ts index df300387..e52c1a15 100644 --- a/src/requests/create_super_stream_request.ts +++ b/src/requests/create_super_stream_request.ts @@ -11,10 +11,8 @@ export interface CreateSuperStreamParams { } export class CreateSuperStreamRequest extends AbstractRequest { - readonly responseKey = CreateSuperStreamResponse.key static readonly Key = 0x001d static readonly Version = 1 - readonly key = CreateSuperStreamRequest.Key private readonly _arguments: { key: keyof CreateStreamArguments; value: string | number }[] = [] private readonly streamName: string private readonly partitions: string[] @@ -47,4 +45,14 @@ export class CreateSuperStreamRequest extends AbstractRequest { writer.writeString(value.toString()) }) } + + get key(): number { + return CreateSuperStreamRequest.Key + } + get responseKey(): number { + return CreateSuperStreamResponse.key + } + get version(): number { + return CreateSuperStreamRequest.Version + } } diff --git a/src/requests/credit_request.ts b/src/requests/credit_request.ts index 65e72883..ce3f97a9 100644 --- a/src/requests/credit_request.ts +++ b/src/requests/credit_request.ts @@ -8,9 +8,7 @@ export type CreditRequestParams = { export class CreditRequest extends AbstractRequest { static readonly Key = 0x09 - readonly key = CreditRequest.Key static readonly Version = 1 - readonly responseKey = -1 constructor(private params: CreditRequestParams) { super() @@ -20,4 +18,14 @@ export class CreditRequest extends AbstractRequest { writer.writeUInt8(this.params.subscriptionId) writer.writeUInt16(this.params.credit) } + + get key(): number { + return CreditRequest.Key + } + get responseKey(): number { + return -1 + } + get version(): number { + return CreditRequest.Version + } } diff --git a/src/requests/data_writer.ts b/src/requests/data_writer.ts index 22474fc4..a7a46afb 100644 --- a/src/requests/data_writer.ts +++ b/src/requests/data_writer.ts @@ -1,6 +1,7 @@ export interface DataWriter { writeByte(Described: number): void writeInt8(data: number): void + writeInt16(data: number): void writeUInt8(data: number): void writeUInt16(data: number): void writeUInt32(data: number): void diff --git a/src/requests/declare_publisher_request.ts b/src/requests/declare_publisher_request.ts index 190c1f52..78f78564 100644 --- a/src/requests/declare_publisher_request.ts +++ b/src/requests/declare_publisher_request.ts @@ -3,10 +3,8 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class DeclarePublisherRequest extends AbstractRequest { - readonly responseKey = DeclarePublisherResponse.key static readonly Key = 0x0001 static readonly Version = 1 - readonly key = DeclarePublisherRequest.Key constructor(private params: { stream: string; publisherId: number; publisherRef?: string }) { super() @@ -17,4 +15,14 @@ export class DeclarePublisherRequest extends AbstractRequest { writer.writeString(this.params.publisherRef || "") writer.writeString(this.params.stream) } + + get key(): number { + return DeclarePublisherRequest.Key + } + get responseKey(): number { + return DeclarePublisherResponse.key + } + get version(): number { + return DeclarePublisherRequest.Version + } } diff --git a/src/requests/delete_publisher_request.ts b/src/requests/delete_publisher_request.ts index 3b558b1d..39ad6768 100644 --- a/src/requests/delete_publisher_request.ts +++ b/src/requests/delete_publisher_request.ts @@ -3,10 +3,8 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class DeletePublisherRequest extends AbstractRequest { - readonly responseKey = DeletePublisherResponse.key static readonly Key = 0x0006 static readonly Version = 1 - readonly key = DeletePublisherRequest.Key constructor(private publisherId: number) { super() @@ -15,4 +13,14 @@ export class DeletePublisherRequest extends AbstractRequest { writeContent(writer: DataWriter) { writer.writeUInt8(this.publisherId) } + + get key(): number { + return DeletePublisherRequest.Key + } + get responseKey(): number { + return DeletePublisherResponse.key + } + get version(): number { + return DeletePublisherRequest.Version + } } diff --git a/src/requests/delete_stream_request.ts b/src/requests/delete_stream_request.ts index f22f3690..63705a76 100644 --- a/src/requests/delete_stream_request.ts +++ b/src/requests/delete_stream_request.ts @@ -4,9 +4,7 @@ import { DataWriter } from "./data_writer" export class DeleteStreamRequest extends AbstractRequest { static readonly Key = 0x000e - readonly key = DeleteStreamRequest.Key static readonly Version = 1 - readonly responseKey = DeleteStreamResponse.key private readonly stream: string constructor(stream: string) { @@ -17,4 +15,14 @@ export class DeleteStreamRequest extends AbstractRequest { protected writeContent(writer: DataWriter): void { writer.writeString(this.stream) } + + get key(): number { + return DeleteStreamRequest.Key + } + get responseKey(): number { + return DeleteStreamResponse.key + } + get version(): number { + return DeleteStreamRequest.Version + } } diff --git a/src/requests/delete_super_stream_request.ts b/src/requests/delete_super_stream_request.ts index e607c5bd..5b94212f 100644 --- a/src/requests/delete_super_stream_request.ts +++ b/src/requests/delete_super_stream_request.ts @@ -4,9 +4,7 @@ import { DataWriter } from "./data_writer" export class DeleteSuperStreamRequest extends AbstractRequest { static readonly Key = 0x001e - readonly key = DeleteSuperStreamRequest.Key static readonly Version = 1 - readonly responseKey = DeleteSuperStreamResponse.key private readonly streamName: string constructor(streamName: string) { @@ -17,4 +15,14 @@ export class DeleteSuperStreamRequest extends AbstractRequest { protected writeContent(writer: DataWriter): void { writer.writeString(this.streamName) } + + get key(): number { + return DeleteSuperStreamRequest.Key + } + get responseKey(): number { + return DeleteSuperStreamResponse.key + } + get version(): number { + return DeleteSuperStreamRequest.Version + } } diff --git a/src/requests/exchange_command_versions_request.ts b/src/requests/exchange_command_versions_request.ts index abbf7288..34f004c3 100644 --- a/src/requests/exchange_command_versions_request.ts +++ b/src/requests/exchange_command_versions_request.ts @@ -5,9 +5,7 @@ import { DataWriter } from "./data_writer" export class ExchangeCommandVersionsRequest extends AbstractRequest { static readonly Key = 0x001b - readonly key = ExchangeCommandVersionsRequest.Key static readonly Version = 1 - readonly responseKey = ExchangeCommandVersionsResponse.key constructor(readonly versions: Version[]) { super() } @@ -20,4 +18,14 @@ export class ExchangeCommandVersionsRequest extends AbstractRequest { writer.writeUInt16(entry.maxVersion) }) } + + get key(): number { + return ExchangeCommandVersionsRequest.Key + } + get responseKey(): number { + return ExchangeCommandVersionsResponse.key + } + get version(): number { + return ExchangeCommandVersionsRequest.Version + } } diff --git a/src/requests/heartbeat_request.ts b/src/requests/heartbeat_request.ts index 3d5bec93..50b7cf3a 100644 --- a/src/requests/heartbeat_request.ts +++ b/src/requests/heartbeat_request.ts @@ -3,12 +3,20 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class HeartbeatRequest extends AbstractRequest { - readonly responseKey = HeartbeatResponse.key static readonly Key = 0x0017 static readonly Version = 1 - readonly key = HeartbeatRequest.Key writeContent(_b: DataWriter) { return } + + get key(): number { + return HeartbeatRequest.Key + } + get responseKey(): number { + return HeartbeatResponse.key + } + get version(): number { + return HeartbeatRequest.Version + } } diff --git a/src/requests/metadata_request.ts b/src/requests/metadata_request.ts index d6311a18..4d81aa70 100644 --- a/src/requests/metadata_request.ts +++ b/src/requests/metadata_request.ts @@ -3,10 +3,8 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class MetadataRequest extends AbstractRequest { - readonly responseKey = MetadataResponse.key static readonly Key = 0x000f static readonly Version = 1 - readonly key = MetadataRequest.Key constructor(private params: { streams: string[] }) { super() @@ -18,4 +16,14 @@ export class MetadataRequest extends AbstractRequest { writer.writeString(s) }) } + + get key(): number { + return MetadataRequest.Key + } + get responseKey(): number { + return MetadataResponse.key + } + get version(): number { + return MetadataRequest.Version + } } diff --git a/src/requests/metadata_update_request.ts b/src/requests/metadata_update_request.ts index ba158aa2..f3a3d567 100644 --- a/src/requests/metadata_update_request.ts +++ b/src/requests/metadata_update_request.ts @@ -3,10 +3,8 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class MetadataUpdateRequest extends AbstractRequest { - readonly responseKey = -1 static readonly Key = 0x0010 static readonly Version = 1 - readonly key = MetadataUpdateRequest.Key constructor(private params: { metadataInfo: MetadataInfo }) { super() @@ -16,4 +14,14 @@ export class MetadataUpdateRequest extends AbstractRequest { b.writeUInt16(this.params.metadataInfo.code) b.writeString(this.params.metadataInfo.stream) } + + get key(): number { + return MetadataUpdateRequest.Key + } + get responseKey(): number { + return -1 + } + get version(): number { + return MetadataUpdateRequest.Version + } } diff --git a/src/requests/open_request.ts b/src/requests/open_request.ts index f5edb4c9..d75aac5b 100644 --- a/src/requests/open_request.ts +++ b/src/requests/open_request.ts @@ -3,10 +3,8 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class OpenRequest extends AbstractRequest { - readonly responseKey = OpenResponse.key static readonly Key = 0x0015 static readonly Version = 1 - readonly key = OpenRequest.Key constructor(private params: { virtualHost: string }) { super() @@ -15,4 +13,14 @@ export class OpenRequest extends AbstractRequest { writeContent(writer: DataWriter) { writer.writeString(this.params.virtualHost) } + + get key(): number { + return OpenRequest.Key + } + get responseKey(): number { + return OpenResponse.key + } + get version(): number { + return OpenRequest.Version + } } diff --git a/src/requests/partitions_query.ts b/src/requests/partitions_query.ts index 826c2480..3c60bee5 100644 --- a/src/requests/partitions_query.ts +++ b/src/requests/partitions_query.ts @@ -3,10 +3,8 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class PartitionsQuery extends AbstractRequest { - readonly responseKey = PartitionsResponse.key static readonly Key = 0x0019 static readonly Version = 1 - readonly key = PartitionsQuery.Key constructor(private params: { superStream: string }) { super() @@ -15,4 +13,14 @@ export class PartitionsQuery extends AbstractRequest { writeContent(writer: DataWriter) { writer.writeString(this.params.superStream) } + + get key(): number { + return PartitionsQuery.Key + } + get responseKey(): number { + return PartitionsResponse.key + } + get version(): number { + return PartitionsQuery.Version + } } diff --git a/src/requests/peer_properties_request.ts b/src/requests/peer_properties_request.ts index 135a3b89..91a3a6bc 100644 --- a/src/requests/peer_properties_request.ts +++ b/src/requests/peer_properties_request.ts @@ -16,8 +16,6 @@ export const PROPERTIES = { export class PeerPropertiesRequest extends AbstractRequest { static readonly Key = 0x11 static readonly Version = 1 - readonly key = PeerPropertiesRequest.Key - readonly responseKey = PeerPropertiesResponse.key private readonly _properties: { key: string; value: string }[] = [] constructor(properties: Record = PROPERTIES) { @@ -32,4 +30,14 @@ export class PeerPropertiesRequest extends AbstractRequest { writer.writeString(value) }) } + + get key(): number { + return PeerPropertiesRequest.Key + } + get responseKey(): number { + return PeerPropertiesResponse.key + } + get version(): number { + return PeerPropertiesRequest.Version + } } diff --git a/src/requests/publish_request.ts b/src/requests/publish_request.ts index da39f44d..564cb895 100644 --- a/src/requests/publish_request.ts +++ b/src/requests/publish_request.ts @@ -5,6 +5,7 @@ import { DataWriter } from "./data_writer" export type PublishRequestMessage = { publishingId: bigint + filterValue?: string message: Message } @@ -16,8 +17,6 @@ interface PublishRequestParams { export class PublishRequest extends AbstractRequest { static readonly Key = 0x02 static readonly Version = 1 - readonly key = PublishRequest.Key - readonly responseKey = -1 constructor(private params: PublishRequestParams) { super() @@ -31,4 +30,14 @@ export class PublishRequest extends AbstractRequest { amqpEncode(writer, message) }) } + + get key(): number { + return PublishRequest.Key + } + get responseKey(): number { + return -1 + } + get version(): number { + return PublishRequest.Version + } } diff --git a/src/requests/publish_request_v2.ts b/src/requests/publish_request_v2.ts new file mode 100644 index 00000000..456dc876 --- /dev/null +++ b/src/requests/publish_request_v2.ts @@ -0,0 +1,38 @@ +import { amqpEncode } from "../amqp10/encoder" +import { AbstractRequest } from "./abstract_request" +import { DataWriter } from "./data_writer" +import { PublishRequestMessage } from "./publish_request" + +interface PublishRequestParams { + publisherId: number + messages: Array +} + +export class PublishRequestV2 extends AbstractRequest { + static readonly Key = 0x02 + static readonly Version = 2 + + constructor(private params: PublishRequestParams) { + super() + } + + protected writeContent(writer: DataWriter): void { + writer.writeUInt8(this.params.publisherId) + writer.writeUInt32(this.params.messages.length) + this.params.messages.forEach(({ publishingId, filterValue, message }) => { + writer.writeUInt64(publishingId) + filterValue ? writer.writeString(filterValue) : writer.writeInt16(-1) + amqpEncode(writer, message) + }) + } + + get key(): number { + return PublishRequestV2.Key + } + get responseKey(): number { + return -1 + } + get version(): number { + return PublishRequestV2.Version + } +} diff --git a/src/requests/query_offset_request.ts b/src/requests/query_offset_request.ts index d64c69a9..78668d22 100644 --- a/src/requests/query_offset_request.ts +++ b/src/requests/query_offset_request.ts @@ -3,10 +3,8 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class QueryOffsetRequest extends AbstractRequest { - readonly responseKey = QueryOffsetResponse.key static readonly Key = 0x000b static readonly Version = 1 - readonly key = QueryOffsetRequest.Key private readonly reference: string private readonly stream: string @@ -20,4 +18,14 @@ export class QueryOffsetRequest extends AbstractRequest { writer.writeString(this.reference) writer.writeString(this.stream) } + + get key(): number { + return QueryOffsetRequest.Key + } + get responseKey(): number { + return QueryOffsetResponse.key + } + get version(): number { + return QueryOffsetRequest.Version + } } diff --git a/src/requests/query_publisher_request.ts b/src/requests/query_publisher_request.ts index d6491c02..1d5ba79f 100644 --- a/src/requests/query_publisher_request.ts +++ b/src/requests/query_publisher_request.ts @@ -5,8 +5,6 @@ import { DataWriter } from "./data_writer" export class QueryPublisherRequest extends AbstractRequest { static readonly Key = 0x0005 static readonly Version = 1 - readonly key = QueryPublisherRequest.Key - readonly responseKey = QueryPublisherResponse.key constructor(private params: { stream: string; publisherRef: string }) { super() @@ -16,4 +14,14 @@ export class QueryPublisherRequest extends AbstractRequest { writer.writeString(this.params.publisherRef) writer.writeString(this.params.stream) } + + get key(): number { + return QueryPublisherRequest.Key + } + get responseKey(): number { + return QueryPublisherResponse.key + } + get version(): number { + return QueryPublisherRequest.Version + } } diff --git a/src/requests/request.ts b/src/requests/request.ts index 3b8ebbd1..93f8f447 100644 --- a/src/requests/request.ts +++ b/src/requests/request.ts @@ -10,5 +10,5 @@ export interface Request { toBuffer(bufferSizeParams?: BufferSizeParams, correlationId?: number): Buffer readonly responseKey: number readonly key: number - readonly version: 1 + readonly version: number } diff --git a/src/requests/requests.ts b/src/requests/requests.ts index bd0fa27f..1596bab4 100644 --- a/src/requests/requests.ts +++ b/src/requests/requests.ts @@ -13,6 +13,7 @@ export { MetadataUpdateRequest } from "./metadata_update_request" export { OpenRequest } from "./open_request" export { PeerPropertiesRequest } from "./peer_properties_request" export { PublishRequest } from "./publish_request" +export { PublishRequestV2 } from "./publish_request_v2" export { QueryOffsetRequest } from "./query_offset_request" export { QueryPublisherRequest } from "./query_publisher_request" export { SaslAuthenticateRequest } from "./sasl_authenticate_request" diff --git a/src/requests/route_query.ts b/src/requests/route_query.ts index 0c4a1aac..9783b13b 100644 --- a/src/requests/route_query.ts +++ b/src/requests/route_query.ts @@ -3,10 +3,8 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class RouteQuery extends AbstractRequest { - readonly responseKey = RouteResponse.key static readonly Key = 0x0018 static readonly Version = 1 - readonly key = RouteQuery.Key constructor(private params: { routingKey: string; superStream: string }) { super() @@ -16,4 +14,14 @@ export class RouteQuery extends AbstractRequest { writer.writeString(this.params.routingKey) writer.writeString(this.params.superStream) } + + get key(): number { + return RouteQuery.Key + } + get responseKey(): number { + return RouteResponse.key + } + get version(): number { + return RouteQuery.Version + } } diff --git a/src/requests/sasl_authenticate_request.ts b/src/requests/sasl_authenticate_request.ts index 96a0f910..5929d8f0 100644 --- a/src/requests/sasl_authenticate_request.ts +++ b/src/requests/sasl_authenticate_request.ts @@ -3,10 +3,8 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class SaslAuthenticateRequest extends AbstractRequest { - readonly responseKey = SaslAuthenticateResponse.key static readonly Key = 0x0013 static readonly Version = 1 - readonly key = SaslAuthenticateRequest.Key constructor(private params: { mechanism: string; username: string; password: string }) { super() @@ -20,4 +18,14 @@ export class SaslAuthenticateRequest extends AbstractRequest { writer.writeUInt8(0) writer.writeData(this.params.password) } + + get key(): number { + return SaslAuthenticateRequest.Key + } + get responseKey(): number { + return SaslAuthenticateResponse.key + } + get version(): number { + return SaslAuthenticateRequest.Version + } } diff --git a/src/requests/sasl_handshake_request.ts b/src/requests/sasl_handshake_request.ts index 6d7d6b08..e80c104a 100644 --- a/src/requests/sasl_handshake_request.ts +++ b/src/requests/sasl_handshake_request.ts @@ -3,12 +3,20 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class SaslHandshakeRequest extends AbstractRequest { - readonly responseKey = SaslHandshakeResponse.key static readonly Key = 0x0012 static readonly Version = 1 - readonly key = SaslHandshakeRequest.Key protected writeContent(_dw: DataWriter) { // do nothing } + + get key(): number { + return SaslHandshakeRequest.Key + } + get responseKey(): number { + return SaslHandshakeResponse.key + } + get version(): number { + return SaslHandshakeRequest.Version + } } diff --git a/src/requests/store_offset_request.ts b/src/requests/store_offset_request.ts index 9851f653..5e13891b 100644 --- a/src/requests/store_offset_request.ts +++ b/src/requests/store_offset_request.ts @@ -3,10 +3,8 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class StoreOffsetRequest extends AbstractRequest { - readonly responseKey = StoreOffsetResponse.key static readonly Key = 0x000a static readonly Version = 1 - readonly key = StoreOffsetRequest.Key private readonly reference: string private readonly stream: string private readonly offsetValue: bigint @@ -23,4 +21,14 @@ export class StoreOffsetRequest extends AbstractRequest { writer.writeString(this.stream) writer.writeUInt64(this.offsetValue) } + + get key(): number { + return StoreOffsetRequest.Key + } + get responseKey(): number { + return StoreOffsetResponse.key + } + get version(): number { + return StoreOffsetRequest.Version + } } diff --git a/src/requests/stream_stats_request.ts b/src/requests/stream_stats_request.ts index 9d8f8e49..c0786df0 100644 --- a/src/requests/stream_stats_request.ts +++ b/src/requests/stream_stats_request.ts @@ -3,10 +3,8 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class StreamStatsRequest extends AbstractRequest { - readonly responseKey = StreamStatsResponse.key static readonly Key = 0x001c static readonly Version = 1 - readonly key = StreamStatsRequest.Key constructor(private streamName: string) { super() @@ -15,4 +13,14 @@ export class StreamStatsRequest extends AbstractRequest { writeContent(writer: DataWriter) { writer.writeString(this.streamName) } + + get key(): number { + return StreamStatsRequest.Key + } + get responseKey(): number { + return StreamStatsResponse.key + } + get version(): number { + return StreamStatsRequest.Version + } } diff --git a/src/requests/sub_entry_batch_publish_request.ts b/src/requests/sub_entry_batch_publish_request.ts index b584cb33..d61f8862 100644 --- a/src/requests/sub_entry_batch_publish_request.ts +++ b/src/requests/sub_entry_batch_publish_request.ts @@ -1,7 +1,8 @@ import { amqpEncode, messageSize } from "../amqp10/encoder" import { Compression, CompressionType } from "../compression" import { Message } from "../publisher" -import { AbstractRequest, BufferDataWriter } from "./abstract_request" +import { AbstractRequest } from "./abstract_request" +import { BufferDataWriter } from "./buffer_data_writer" import { DataWriter } from "./data_writer" interface SubEntryBatchPublishRequestParams { @@ -15,8 +16,6 @@ interface SubEntryBatchPublishRequestParams { export class SubEntryBatchPublishRequest extends AbstractRequest { static readonly Key = 0x02 static readonly Version = 1 - readonly key = SubEntryBatchPublishRequest.Key - readonly responseKey = -1 private readonly maxFrameSize: number constructor(private params: SubEntryBatchPublishRequestParams) { @@ -48,4 +47,14 @@ export class SubEntryBatchPublishRequest extends AbstractRequest { private encodeCompressionType(compressionType: CompressionType) { return 0x80 | (compressionType << 4) } + + get key(): number { + return SubEntryBatchPublishRequest.Key + } + get responseKey(): number { + return -1 + } + get version(): number { + return SubEntryBatchPublishRequest.Version + } } diff --git a/src/requests/subscribe_request.ts b/src/requests/subscribe_request.ts index 418958e9..f4e95de7 100755 --- a/src/requests/subscribe_request.ts +++ b/src/requests/subscribe_request.ts @@ -45,8 +45,6 @@ export class Offset { export class SubscribeRequest extends AbstractRequest { static readonly Key = 0x0007 static readonly Version = 1 - readonly key = SubscribeRequest.Key - readonly responseKey = SubscribeResponse.key private readonly _properties: { key: string; value: string }[] = [] constructor( @@ -74,4 +72,14 @@ export class SubscribeRequest extends AbstractRequest { writer.writeString(value) }) } + + get key(): number { + return SubscribeRequest.Key + } + get responseKey(): number { + return SubscribeResponse.key + } + get version(): number { + return SubscribeRequest.Version + } } diff --git a/src/requests/tune_request.ts b/src/requests/tune_request.ts index cba98ec6..e9353f82 100644 --- a/src/requests/tune_request.ts +++ b/src/requests/tune_request.ts @@ -3,10 +3,8 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class TuneRequest extends AbstractRequest { - readonly responseKey = TuneResponse.key static readonly Key = 0x0014 static readonly Version = 1 - readonly key = TuneRequest.Key constructor(private params: { frameMax: number; heartbeat: number }) { super() @@ -16,4 +14,14 @@ export class TuneRequest extends AbstractRequest { b.writeUInt32(this.params.frameMax) b.writeUInt32(this.params.heartbeat) } + + get key(): number { + return TuneRequest.Key + } + get responseKey(): number { + return TuneResponse.key + } + get version(): number { + return TuneRequest.Version + } } diff --git a/src/requests/unsubscribe_request.ts b/src/requests/unsubscribe_request.ts index ba42da99..edc86de5 100644 --- a/src/requests/unsubscribe_request.ts +++ b/src/requests/unsubscribe_request.ts @@ -5,8 +5,6 @@ import { DataWriter } from "./data_writer" export class UnsubscribeRequest extends AbstractRequest { static readonly Key = 0x000c static readonly Version = 1 - readonly key = UnsubscribeRequest.Key - readonly responseKey = UnsubscribeResponse.key constructor(private subscriptionId: number) { super() @@ -15,4 +13,14 @@ export class UnsubscribeRequest extends AbstractRequest { protected writeContent(writer: DataWriter): void { writer.writeUInt8(this.subscriptionId) } + + get key(): number { + return UnsubscribeRequest.Key + } + get responseKey(): number { + return UnsubscribeResponse.key + } + get version(): number { + return UnsubscribeRequest.Version + } } diff --git a/src/response_decoder.ts b/src/response_decoder.ts index 20429b9c..802c6975 100644 --- a/src/response_decoder.ts +++ b/src/response_decoder.ts @@ -36,6 +36,7 @@ import { RawConsumerUpdateQueryResponse as RawConsumerUpdateQuery, RawCreditResponse, RawDeliverResponse, + RawDeliverResponseV2, RawHeartbeatResponse, RawMetadataUpdateResponse, RawPublishConfirmResponse, @@ -57,6 +58,7 @@ import { PartitionsResponse } from "./responses/partitions_response" import { ConsumerUpdateQuery } from "./responses/consumer_update_query" import { CreateSuperStreamResponse } from "./responses/create_super_stream_response" import { DeleteSuperStreamResponse } from "./responses/delete_super_stream_response" +import { DeliverResponseV2 } from "./responses/deliver_response_v2" // Frame => Size (Request | Response | Command) // Size => uint32 (size without the 4 bytes of the size element) @@ -71,12 +73,14 @@ const UINT32_SIZE = 4 export type MetadataUpdateListener = (metadata: MetadataUpdateResponse) => void export type CreditListener = (creditResponse: CreditResponse) => void export type DeliverListener = (response: DeliverResponse) => void +export type DeliverV2Listener = (response: DeliverResponseV2) => void export type PublishConfirmListener = (confirm: PublishConfirmResponse) => void export type PublishErrorListener = (confirm: PublishErrorResponse) => void export type ConsumerUpdateQueryListener = (metadata: ConsumerUpdateQuery) => void type DeliveryResponseDecoded = { subscriptionId: number + committedChunkId?: bigint messages: Message[] } @@ -86,6 +90,7 @@ type PossibleRawResponses = | RawHeartbeatResponse | RawMetadataUpdateResponse | RawDeliverResponse + | RawDeliverResponseV2 | RawCreditResponse | RawPublishConfirmResponse | RawPublishErrorResponse @@ -115,7 +120,7 @@ function decodeResponse( ): PossibleRawResponses { const key = dataResponse.readUInt16() const version = dataResponse.readUInt16() - if (key === DeliverResponse.key) { + if (key === DeliverResponse.key && version === 1) { const { subscriptionId, messages } = decodeDeliverResponse(dataResponse, getCompressionBy, logger) const response: RawDeliverResponse = { size, @@ -126,6 +131,23 @@ function decodeResponse( } return response } + if (key === DeliverResponseV2.key && version === 2) { + const { subscriptionId, committedChunkId, messages } = decodeDeliverResponse( + dataResponse, + getCompressionBy, + logger, + version + ) + const response: RawDeliverResponseV2 = { + size, + key: key as DeliverResponse["key"], + version, + subscriptionId, + committedChunkId: committedChunkId!, + messages, + } + return response + } if (key === TuneResponse.key) { const frameMax = dataResponse.readUInt32() const heartbeat = dataResponse.readUInt32() @@ -193,9 +215,11 @@ function decodeResponse( function decodeDeliverResponse( dataResponse: DataReader, getCompressionBy: (type: CompressionType) => Compression, - logger: Logger + logger: Logger, + version: 1 | 2 = 1 ): DeliveryResponseDecoded { const subscriptionId = dataResponse.readUInt8() + const committedChunkId = version === 2 ? dataResponse.readUInt64() : undefined const magicVersion = dataResponse.readInt8() const chunkType = dataResponse.readInt8() const numEntries = dataResponse.readUInt16() @@ -211,6 +235,7 @@ function decodeDeliverResponse( const messages: Message[] = [] const data = { + committedChunkId, magicVersion, chunkType, numEntries, @@ -237,7 +262,7 @@ function decodeDeliverResponse( messages.push(...decodeSubEntries(dataResponse, compression, logger)) } - return { subscriptionId, messages } + return { subscriptionId, committedChunkId, messages } } const EmptyBuffer = Buffer.from("") @@ -581,8 +606,12 @@ function isMetadataUpdateResponse(params: PossibleRawResponses): params is RawMe return params.key === MetadataUpdateResponse.key } -function isDeliverResponse(params: PossibleRawResponses): params is RawDeliverResponse { - return params.key === DeliverResponse.key +function isDeliverResponseV1(params: PossibleRawResponses): params is RawDeliverResponse { + return params.key === DeliverResponse.key && params.version === DeliverResponse.Version +} + +function isDeliverResponseV2(params: PossibleRawResponses): params is RawDeliverResponseV2 { + return params.key === DeliverResponseV2.key && params.version === DeliverResponseV2.Version } function isCreditResponse(params: PossibleRawResponses): params is RawCreditResponse { @@ -649,9 +678,12 @@ export class ResponseDecoder { } else if (isMetadataUpdateResponse(response)) { this.emitter.emit("metadata_update", new MetadataUpdateResponse(response)) this.logger.debug(`metadata update received from the server: ${inspect(response)}`) - } else if (isDeliverResponse(response)) { - this.emitter.emit("deliver", new DeliverResponse(response)) - this.logger.debug(`deliver received from the server: ${inspect(response)}`) + } else if (isDeliverResponseV1(response)) { + this.emitter.emit("deliverV1", new DeliverResponse(response)) + this.logger.debug(`deliverV1 received from the server: ${inspect(response)}`) + } else if (isDeliverResponseV2(response)) { + this.emitter.emit("deliverV2", new DeliverResponseV2(response)) + this.logger.debug(`deliverV2 received from the server: ${inspect(response)}`) } else if (isCreditResponse(response)) { this.logger.debug(`credit received from the server: ${inspect(response)}`) this.emitter.emit("credit_response", new CreditResponse(response)) @@ -672,18 +704,21 @@ export class ResponseDecoder { public on(event: "credit_response", listener: CreditListener): void public on(event: "publish_confirm", listener: PublishConfirmListener): void public on(event: "publish_error", listener: PublishErrorListener): void - public on(event: "deliver", listener: DeliverListener): void + public on(event: "deliverV1", listener: DeliverListener): void + public on(event: "deliverV2", listener: DeliverV2Listener): void public on( event: | "metadata_update" | "credit_response" | "publish_confirm" | "publish_error" - | "deliver" + | "deliverV1" + | "deliverV2" | "consumer_update_query", listener: | MetadataUpdateListener | DeliverListener + | DeliverV2Listener | CreditListener | PublishConfirmListener | PublishErrorListener diff --git a/src/responses/consumer_update_query.ts b/src/responses/consumer_update_query.ts index 9d64770e..80e8cc0b 100644 --- a/src/responses/consumer_update_query.ts +++ b/src/responses/consumer_update_query.ts @@ -1,4 +1,4 @@ -import { BufferDataWriter } from "../requests/abstract_request" +import { BufferDataWriter } from "../requests/buffer_data_writer" import { RawConsumerUpdateQueryResponse as RawConsumerUpdateQuery } from "./raw_response" import { Response } from "./response" diff --git a/src/responses/credit_response.ts b/src/responses/credit_response.ts index 444457d4..28ce1adf 100644 --- a/src/responses/credit_response.ts +++ b/src/responses/credit_response.ts @@ -1,4 +1,4 @@ -import { BufferDataWriter } from "../requests/abstract_request" +import { BufferDataWriter } from "../requests/buffer_data_writer" import { RawCreditResponse } from "./raw_response" import { Response } from "./response" diff --git a/src/responses/deliver_response.ts b/src/responses/deliver_response.ts index 6b7840e5..234370f6 100755 --- a/src/responses/deliver_response.ts +++ b/src/responses/deliver_response.ts @@ -1,5 +1,5 @@ import { Message } from "../publisher" -import { BufferDataWriter } from "../requests/abstract_request" +import { BufferDataWriter } from "../requests/buffer_data_writer" import { RawDeliverResponse } from "./raw_response" import { Response } from "./response" diff --git a/src/responses/deliver_response_v2.ts b/src/responses/deliver_response_v2.ts new file mode 100755 index 00000000..1afc41bd --- /dev/null +++ b/src/responses/deliver_response_v2.ts @@ -0,0 +1,55 @@ +import { Message } from "../publisher" +import { BufferDataWriter } from "../requests/buffer_data_writer" +import { RawDeliverResponseV2 } from "./raw_response" +import { Response } from "./response" + +export class DeliverResponseV2 implements Response { + static key = 0x0008 + static readonly Version = 2 + + constructor(private response: RawDeliverResponseV2) { + if (this.response.key !== DeliverResponseV2.key) { + throw new Error(`Unable to create ${DeliverResponseV2.name} from data of type ${this.response.key}`) + } + } + + toBuffer(): Buffer { + const bufferSize = 1024 + const bufferSizeParams = { maxSize: bufferSize } + const dw = new BufferDataWriter(Buffer.alloc(bufferSize), 4, bufferSizeParams) + dw.writeUInt16(DeliverResponseV2.key) + dw.writeUInt16(2) + dw.writeUInt8(this.response.subscriptionId) + dw.writeUInt64(this.response.committedChunkId) + dw.writePrefixSize() + return dw.toBuffer() + } + + get key() { + return this.response.key + } + + get correlationId(): number { + return -1 + } + + get code(): number { + return -1 + } + + get ok(): boolean { + return true + } + + get subscriptionId(): number { + return this.response.subscriptionId + } + + get committedChunkId(): bigint { + return this.response.committedChunkId + } + + get messages(): Message[] { + return this.response.messages + } +} diff --git a/src/responses/metadata_update_response.ts b/src/responses/metadata_update_response.ts index 9a005bbd..a122ea1f 100644 --- a/src/responses/metadata_update_response.ts +++ b/src/responses/metadata_update_response.ts @@ -1,5 +1,5 @@ +import { BufferDataWriter } from "../requests/buffer_data_writer" import { MetadataInfo, RawMetadataUpdateResponse } from "./raw_response" -import { BufferDataWriter } from "../requests/abstract_request" import { Response } from "./response" export class MetadataUpdateResponse implements Response { diff --git a/src/responses/publish_confirm_response.ts b/src/responses/publish_confirm_response.ts index a1f25364..a5164db5 100644 --- a/src/responses/publish_confirm_response.ts +++ b/src/responses/publish_confirm_response.ts @@ -1,4 +1,4 @@ -import { BufferDataWriter } from "../requests/abstract_request" +import { BufferDataWriter } from "../requests/buffer_data_writer" import { RawPublishConfirmResponse } from "./raw_response" import { Response } from "./response" diff --git a/src/responses/publish_error_response.ts b/src/responses/publish_error_response.ts index fd196963..56928feb 100644 --- a/src/responses/publish_error_response.ts +++ b/src/responses/publish_error_response.ts @@ -1,4 +1,4 @@ -import { BufferDataWriter } from "../requests/abstract_request" +import { BufferDataWriter } from "../requests/buffer_data_writer" import { RawPublishErrorResponse } from "./raw_response" import { Response } from "./response" diff --git a/src/responses/raw_response.ts b/src/responses/raw_response.ts index 5d13a632..ca6c1f73 100644 --- a/src/responses/raw_response.ts +++ b/src/responses/raw_response.ts @@ -55,6 +55,15 @@ export interface RawDeliverResponse { messages: Message[] } +export interface RawDeliverResponseV2 { + size: number + key: 0x0008 + version: number + subscriptionId: number + committedChunkId: bigint + messages: Message[] +} + export interface RawMetadataUpdateResponse { size: number key: 0x0010 diff --git a/src/responses/responses.ts b/src/responses/responses.ts index 2efefa79..36f001f2 100644 --- a/src/responses/responses.ts +++ b/src/responses/responses.ts @@ -7,6 +7,7 @@ export { DeletePublisherResponse } from "./delete_publisher_response" export { DeleteStreamResponse } from "./delete_stream_response" export { DeleteSuperStreamResponse } from "./delete_super_stream_response" export { DeliverResponse } from "./deliver_response" +export { DeliverResponseV2 } from "./deliver_response_v2" export { ExchangeCommandVersionsResponse } from "./exchange_command_versions_response" export { HeartbeatResponse } from "./heartbeat_response" export { MetadataResponse } from "./metadata_response" diff --git a/src/responses/tune_response.ts b/src/responses/tune_response.ts index b544415d..9342ff00 100644 --- a/src/responses/tune_response.ts +++ b/src/responses/tune_response.ts @@ -1,4 +1,4 @@ -import { BufferDataWriter } from "../requests/abstract_request" +import { BufferDataWriter } from "../requests/buffer_data_writer" import { RawTuneResponse } from "./raw_response" import { Response } from "./response" diff --git a/src/versions.ts b/src/versions.ts index 689f95fe..a499a81b 100644 --- a/src/versions.ts +++ b/src/versions.ts @@ -25,6 +25,7 @@ const supportedRequests = [ requests.OpenRequest, requests.PeerPropertiesRequest, requests.PublishRequest, + requests.PublishRequestV2, requests.QueryOffsetRequest, requests.QueryPublisherRequest, requests.SaslAuthenticateRequest, @@ -40,6 +41,7 @@ const supportedRequests = [ const supportedResponses = [ responses.DeliverResponse, + responses.DeliverResponseV2, responses.PublishConfirmResponse, responses.PublishErrorResponse, responses.ConsumerUpdateQuery, @@ -76,9 +78,15 @@ export function getClientSupportedVersions(serverVersion?: string) { } if (serverVersion && lt(coerce(serverVersion)!, REQUIRED_MANAGEMENT_VERSION)) { - return result.filter( - (r) => r.key !== requests.CreateSuperStreamRequest.Key && r.key !== requests.DeleteSuperStreamRequest.Key + const filteredResult = result.filter( + (r) => ![requests.CreateSuperStreamRequest.Key, requests.DeleteSuperStreamRequest.Key].includes(r.key) ) + return filteredResult.map((r) => { + if (r.key === requests.PublishRequest.Key || r.key === responses.DeliverResponse.key) { + return { key: r.key, minVersion: r.minVersion, maxVersion: 1 } + } + return r + }) } return result diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index 9c2613f7..cd5f4485 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -76,7 +76,7 @@ describe("declare consumer", () => { ) await eventually(() => expect(messages).eql([Buffer.from("hello")])) - }).timeout(10000) + }).timeout(15000) it("declaring multiple active consumers on an existing stream - only one consumer should handle the message", async () => { const messages: Buffer[] = [] diff --git a/test/e2e/filtering.test.ts b/test/e2e/filtering.test.ts new file mode 100644 index 00000000..4547aa50 --- /dev/null +++ b/test/e2e/filtering.test.ts @@ -0,0 +1,50 @@ +import { expect } from "chai" +import { randomUUID } from "crypto" +import { Client } from "../../src" +import { createClient, createStreamName } from "../support/fake_data" +import { Rabbit } from "../support/rabbit" +import { eventually, username, password } from "../support/util" +import { coerce, lt } from "semver" + +describe("filtering", () => { + const rabbit = new Rabbit(username, password) + let client: Client + let streamName: string + + beforeEach(async function () { + client = await createClient(username, password) + // eslint-disable-next-line no-invalid-this + if (lt(coerce(client.rabbitManagementVersion)!, "3.13.0")) this.skip() + streamName = createStreamName() + await client.createStream({ stream: streamName, arguments: {} }) + }) + + afterEach(async () => { + try { + await client.close() + await client.deleteStream({ stream: streamName }) + await rabbit.closeAllConnections() + await rabbit.deleteAllQueues({ match: /my-stream-/ }) + } catch (e) {} + }) + + it("is seen by rabbit and filtered", async () => { + const publisher = await client.declarePublisher( + { stream: streamName, publisherRef: `my-publisher-${randomUUID()}` }, + (msg) => msg.applicationProperties!["test"].toString() + ) + const message1 = "test1" + const message2 = "test2" + const message3 = "test3" + const applicationProperties1 = { test: "A" } + const applicationProperties2 = { test: "B" } + + await publisher.send(Buffer.from(message1), { applicationProperties: applicationProperties1 }) + await publisher.send(Buffer.from(message2), { applicationProperties: applicationProperties1 }) + await publisher.send(Buffer.from(message3), { applicationProperties: applicationProperties2 }) + + await eventually(async () => { + expect((await rabbit.getQueueInfo(streamName)).messages).eql(3) + }, 5000) + }).timeout(10000) +}) diff --git a/test/support/fake_data.ts b/test/support/fake_data.ts index 7872ce43..a26a80cd 100644 --- a/test/support/fake_data.ts +++ b/test/support/fake_data.ts @@ -4,7 +4,7 @@ import { MessageProperties } from "../../src/publisher" import { BufferSizeSettings } from "../../src/requests/request" import { Offset } from "../../src/requests/subscribe_request" import { Consumer, Publisher } from "../../src" -import { getTestNodesFromEnv } from "./util" +import { createConsoleLog, getTestNodesFromEnv } from "./util" export function createProperties(): MessageProperties { return { diff --git a/test/unit/buffer_data_writer.test.ts b/test/unit/buffer_data_writer.test.ts index d958e943..37e6e34b 100644 --- a/test/unit/buffer_data_writer.test.ts +++ b/test/unit/buffer_data_writer.test.ts @@ -1,6 +1,6 @@ import { expect } from "chai" -import { BufferDataWriter } from "../../src/requests/abstract_request" import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX } from "../../src/util" +import { BufferDataWriter } from "../../src/requests/buffer_data_writer" describe("Buffer Data Writer functionalities", () => { const bufferMaxSize = 1024 const bufferInitialSize = 1 diff --git a/test/unit/response_decoder.test.ts b/test/unit/response_decoder.test.ts index 99f3d1b1..a916382b 100644 --- a/test/unit/response_decoder.test.ts +++ b/test/unit/response_decoder.test.ts @@ -1,11 +1,11 @@ import { expect } from "chai" import { NoneCompression } from "../../src/compression" import { DecoderListenerFunc } from "../../src/decoder_listener" -import { BufferDataWriter } from "../../src/requests/abstract_request" import { ResponseDecoder } from "../../src/response_decoder" import { PeerPropertiesResponse } from "../../src/responses/peer_properties_response" import { Response } from "../../src/responses/response" import { createConsoleLog } from "../support/util" +import { BufferDataWriter } from "../../src/requests/buffer_data_writer" class MockDecoderListener { readonly responses: Response[] = [] diff --git a/test/unit/versions.test.ts b/test/unit/versions.test.ts index 4277385b..d65ea7a9 100644 --- a/test/unit/versions.test.ts +++ b/test/unit/versions.test.ts @@ -21,7 +21,7 @@ describe("Versions", () => { { key: 16, maxVersion: 1, minVersion: 1 }, { key: 21, maxVersion: 1, minVersion: 1 }, { key: 17, maxVersion: 1, minVersion: 1 }, - { key: 2, maxVersion: 1, minVersion: 1 }, + { key: 2, maxVersion: 2, minVersion: 1 }, { key: 11, maxVersion: 1, minVersion: 1 }, { key: 5, maxVersion: 1, minVersion: 1 }, { key: 19, maxVersion: 1, minVersion: 1 }, @@ -33,7 +33,7 @@ describe("Versions", () => { { key: 12, maxVersion: 1, minVersion: 1 }, { key: 24, maxVersion: 1, minVersion: 1 }, { key: 25, maxVersion: 1, minVersion: 1 }, - { key: 8, maxVersion: 1, minVersion: 1 }, + { key: 8, maxVersion: 2, minVersion: 1 }, { key: 3, maxVersion: 1, minVersion: 1 }, { key: 4, maxVersion: 1, minVersion: 1 }, { key: 26, maxVersion: 1, minVersion: 1 }, From 02b067b93da59dfe502c651211d0898fb77e3d29 Mon Sep 17 00:00:00 2001 From: magne Date: Thu, 25 Jan 2024 16:37:08 +0100 Subject: [PATCH 2/8] Npm check fix --- src/publisher.ts | 2 +- test/support/fake_data.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/publisher.ts b/src/publisher.ts index 3f4b387d..6c211972 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -7,7 +7,7 @@ import { PublishRequest, PublishRequestMessage } from "./requests/publish_reques import { SubEntryBatchPublishRequest } from "./requests/sub_entry_batch_publish_request" import { PublishConfirmResponse } from "./responses/publish_confirm_response" import { PublishErrorResponse } from "./responses/publish_error_response" -import { DEFAULT_UNLIMITED_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION } from "./util" +import { DEFAULT_UNLIMITED_FRAME_MAX } from "./util" import { MetadataUpdateListener } from "./response_decoder" import { ConnectionInfo, Connection } from "./connection" import { ConnectionPool } from "./connection_pool" diff --git a/test/support/fake_data.ts b/test/support/fake_data.ts index a26a80cd..7872ce43 100644 --- a/test/support/fake_data.ts +++ b/test/support/fake_data.ts @@ -4,7 +4,7 @@ import { MessageProperties } from "../../src/publisher" import { BufferSizeSettings } from "../../src/requests/request" import { Offset } from "../../src/requests/subscribe_request" import { Consumer, Publisher } from "../../src" -import { createConsoleLog, getTestNodesFromEnv } from "./util" +import { getTestNodesFromEnv } from "./util" export function createProperties(): MessageProperties { return { From 5c90105138d41ecaec0306867d6c80f98e8b849b Mon Sep 17 00:00:00 2001 From: magne Date: Mon, 29 Jan 2024 14:20:41 +0100 Subject: [PATCH 3/8] Rebase from refactor --- src/client.ts | 37 ++++++++++++++++++++++++++----------- src/connection.ts | 35 ++++++++++++++++++++++++++++------- src/publisher.ts | 1 - 3 files changed, 54 insertions(+), 19 deletions(-) diff --git a/src/client.ts b/src/client.ts index 072f3aed..ecd34b85 100644 --- a/src/client.ts +++ b/src/client.ts @@ -4,7 +4,7 @@ import { Compression, CompressionType, GzipCompression, NoneCompression } from " import { Consumer, ConsumerFunc, StreamConsumer } from "./consumer" import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes" import { Logger, NullLogger } from "./logger" -import { Message, Publisher, StreamPublisher } from "./publisher" +import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher" import { ConsumerUpdateResponse } from "./requests/consumer_update_response" import { CreateStreamArguments, CreateStreamRequest } from "./requests/create_stream_request" import { CreditRequest, CreditRequestParams } from "./requests/credit_request" @@ -138,7 +138,7 @@ export class Client { await connection.close() throw new Error(`Declare Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } - if (filter && !connection.filteringEnabled) { + if (filter && !connection.isFilteringEnabled) { await connection.close() throw new Error(`Broker does not support message filtering.`) } @@ -418,17 +418,31 @@ export class Client { return consumerId } - private getDeliverCallback() { - return async (deliverVersion: "deliverV1" | "deliverV2", deliverResponse: DeliverResponse | DeliverResponseV2) => { - const consumer = this.consumers.get(deliverResponse.subscriptionId) + private getDeliverV1Callback() { + return async (response: DeliverResponse) => { + const consumer = this.consumers.get(response.subscriptionId) + if (!consumer) { + this.logger.error(`On deliverV1 no consumer found`) + return + } + this.logger.debug(`on deliverV1 -> ${consumer.consumerRef}`) + this.logger.debug(`response.messages.length: ${response.messages.length}`) + await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }) + response.messages.map((x) => consumer.handle(x)) + } + } + + private getDeliverV2Callback() { + return async (response: DeliverResponseV2) => { + const consumer = this.consumers.get(response.subscriptionId) if (!consumer) { - this.logger.error(`On ${deliverVersion} no consumer found`) + this.logger.error(`On deliverV2 no consumer found`) return } - this.logger.debug(`on ${deliverVersion} -> ${consumer.consumerRef}`) - this.logger.debug(`deliverResponse.messages.length: ${deliverResponse.messages.length}`) - await this.askForCredit({ credit: 1, subscriptionId: deliverResponse.subscriptionId }) - deliverResponse.messages.map((x) => consumer.handle(x)) + this.logger.debug(`on deliverV2 -> ${consumer.consumerRef}`) + this.logger.debug(`response.messages.length: ${response.messages.length}`) + await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }) + response.messages.map((x) => consumer.handle(x)) } } @@ -501,7 +515,8 @@ export class Client { const connectionListeners = { ...this.params.listeners, connection_closed: connectionClosedListener, - deliver: this.getDeliverCallback(), + deliverV1: this.getDeliverV1Callback(), + deliverV2: this.getDeliverV2Callback(), consumer_update_query: this.getConsumerUpdateCallback(), } return { ...this.params, listeners: connectionListeners, leader: leader, streamName: streamName } diff --git a/src/connection.ts b/src/connection.ts index ab4299c6..343061ef 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -16,6 +16,7 @@ import { TuneRequest } from "./requests/tune_request" import { ConsumerUpdateQueryListener, DeliverListener, + DeliverV2Listener, MetadataUpdateListener, PublishConfirmListener, PublishErrorListener, @@ -29,7 +30,7 @@ import { Response } from "./responses/response" import { SaslAuthenticateResponse } from "./responses/sasl_authenticate_response" import { SaslHandshakeResponse } from "./responses/sasl_handshake_response" import { TuneResponse } from "./responses/tune_response" -import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX, removeFrom } from "./util" +import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, removeFrom } from "./util" import { Version, checkServerDeclaredVersions, getClientSupportedVersions } from "./versions" import { WaitingResponse } from "./waiting_response" import { ClientListenersParams, ClientParams, ClosingParams, QueryOffsetParams, StoreOffsetParams } from "./client" @@ -38,11 +39,13 @@ import { QueryPublisherRequest } from "./requests/query_publisher_request" import { StoreOffsetRequest } from "./requests/store_offset_request" import { QueryOffsetResponse } from "./responses/query_offset_response" import { QueryOffsetRequest } from "./requests/query_offset_request" +import { coerce, lt } from "semver" export type ConnectionClosedListener = (hadError: boolean) => void export type ConnectionProxyListenersParams = ClientListenersParams & { - deliver?: DeliverListener + deliverV1?: DeliverListener + deliverV2?: DeliverV2Listener consumer_update_query?: ConsumerUpdateQueryListener } @@ -82,6 +85,7 @@ export class Connection { private serverEndpoint: { host: string; port: number } = { host: "", port: 5552 } private readonly serverDeclaredVersions: Version[] = [] private refs: number = 0 + private filteringEnabled: boolean = false constructor(private readonly params: ConnectionProxyParams, private readonly logger: Logger) { this.hostname = params.hostname @@ -125,6 +129,7 @@ export class Connection { this.socket.on("connect", async () => { this.logger.info(`Connected to RabbitMQ ${this.params.hostname}:${this.params.port}`) this.peerProperties = (await this.exchangeProperties()).properties + this.filteringEnabled = lt(coerce(this.rabbitManagementVersion)!, REQUIRED_MANAGEMENT_VERSION) ? false : true await this.auth({ username: this.params.username, password: this.params.password }) const { heartbeat } = await this.tune(this.params.heartbeat ?? 0) await this.open({ virtualHost: this.params.vhost }) @@ -151,15 +156,23 @@ export class Connection { public on(event: "metadata_update", listener: MetadataUpdateListener): void public on(event: "publish_confirm", listener: PublishConfirmListener): void public on(event: "publish_error", listener: PublishErrorListener): void - public on(event: "deliver", listener: DeliverListener): void + public on(event: "deliverV1", listener: DeliverListener): void + public on(event: "deliverV2", listener: DeliverV2Listener): void public on(event: "consumer_update_query", listener: ConsumerUpdateQueryListener): void public on( - event: "metadata_update" | "publish_confirm" | "publish_error" | "deliver" | "consumer_update_query", + event: + | "metadata_update" + | "publish_confirm" + | "publish_error" + | "deliverV1" + | "deliverV2" + | "consumer_update_query", listener: | MetadataUpdateListener | PublishConfirmListener | PublishErrorListener | DeliverListener + | DeliverV2Listener | ConsumerUpdateQueryListener ) { switch (event) { @@ -172,8 +185,11 @@ export class Connection { case "publish_error": this.decoder.on("publish_error", listener as PublishErrorListener) break - case "deliver": - this.decoder.on("deliver", listener as DeliverListener) + case "deliverV1": + this.decoder.on("deliverV1", listener as DeliverListener) + break + case "deliverV2": + this.decoder.on("deliverV2", listener as DeliverV2Listener) break case "consumer_update_query": this.decoder.on("consumer_update_query", listener as ConsumerUpdateQueryListener) @@ -187,7 +203,8 @@ export class Connection { if (listeners?.metadata_update) this.decoder.on("metadata_update", listeners.metadata_update) if (listeners?.publish_confirm) this.decoder.on("publish_confirm", listeners.publish_confirm) if (listeners?.publish_error) this.decoder.on("publish_error", listeners.publish_error) - if (listeners?.deliver) this.decoder.on("deliver", listeners.deliver) + if (listeners?.deliverV1) this.decoder.on("deliverV1", listeners.deliverV1) + if (listeners?.deliverV2) this.decoder.on("deliverV2", listeners.deliverV2) if (listeners?.consumer_update_query) this.decoder.on("consumer_update_query", listeners.consumer_update_query) } @@ -332,6 +349,10 @@ export class Connection { return this.peerProperties.version } + public get isFilteringEnabled() { + return this.filteringEnabled + } + private async auth(params: { username: string; password: string }) { this.logger.debug(`Start authentication process ...`) this.logger.debug(`Start SASL handshake ...`) diff --git a/src/publisher.ts b/src/publisher.ts index 6c211972..b6f08355 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -11,7 +11,6 @@ import { DEFAULT_UNLIMITED_FRAME_MAX } from "./util" import { MetadataUpdateListener } from "./response_decoder" import { ConnectionInfo, Connection } from "./connection" import { ConnectionPool } from "./connection_pool" -import { coerce, lt } from "semver" import { PublishRequestV2 } from "./requests/publish_request_v2" export type MessageApplicationProperties = Record From 106928572c71f0b151cc1d5d0a166f82586ed495 Mon Sep 17 00:00:00 2001 From: magne Date: Mon, 29 Jan 2024 15:43:00 +0100 Subject: [PATCH 4/8] Revert of request interfaces --- src/requests/abstract_request.ts | 120 +++++++++++++++++- src/requests/close_request.ts | 12 +- src/requests/consumer_update_response.ts | 12 +- src/requests/create_stream_request.ts | 12 +- src/requests/create_super_stream_request.ts | 12 +- src/requests/credit_request.ts | 12 +- src/requests/declare_publisher_request.ts | 12 +- src/requests/delete_publisher_request.ts | 12 +- src/requests/delete_stream_request.ts | 12 +- src/requests/delete_super_stream_request.ts | 12 +- .../exchange_command_versions_request.ts | 12 +- src/requests/heartbeat_request.ts | 12 +- src/requests/metadata_request.ts | 12 +- src/requests/metadata_update_request.ts | 12 +- src/requests/open_request.ts | 12 +- src/requests/partitions_query.ts | 12 +- src/requests/peer_properties_request.ts | 12 +- src/requests/publish_request.ts | 12 +- src/requests/publish_request_v2.ts | 8 +- src/requests/query_offset_request.ts | 12 +- src/requests/query_publisher_request.ts | 12 +- src/requests/route_query.ts | 12 +- src/requests/sasl_authenticate_request.ts | 12 +- src/requests/sasl_handshake_request.ts | 12 +- src/requests/store_offset_request.ts | 12 +- src/requests/stream_stats_request.ts | 12 +- .../sub_entry_batch_publish_request.ts | 15 +-- src/requests/subscribe_request.ts | 12 +- src/requests/tune_request.ts | 12 +- src/requests/unsubscribe_request.ts | 12 +- 30 files changed, 177 insertions(+), 290 deletions(-) diff --git a/src/requests/abstract_request.ts b/src/requests/abstract_request.ts index 5448a020..b2adb430 100644 --- a/src/requests/abstract_request.ts +++ b/src/requests/abstract_request.ts @@ -1,11 +1,127 @@ -import { BufferDataWriter } from "./buffer_data_writer" +import { DEFAULT_UNLIMITED_FRAME_MAX } from "../util" import { DataWriter } from "./data_writer" import { BufferSizeParams, Request } from "./request" +export class BufferDataWriter implements DataWriter { + private _offset = 0 + private readonly maxBufferSize: number + private readonly growthTriggerRatio: number + private readonly sizeMultiplier: number + + constructor(private buffer: Buffer, startFrom: number, bufferSizeParameters?: BufferSizeParams) { + this._offset = startFrom + this.maxBufferSize = bufferSizeParameters?.maxSize ?? 1048576 + this.growthTriggerRatio = bufferSizeParameters?.maxRatio ?? 0.9 + this.sizeMultiplier = bufferSizeParameters?.multiplier ?? 2 + } + + get offset() { + return this._offset + } + + writePrefixSize() { + this.buffer.writeUInt32BE(this._offset - 4, 0) + } + + writeData(data: string | Buffer): void { + this.growIfNeeded(Buffer.byteLength(data, "utf-8")) + if (Buffer.isBuffer(data)) { + this._offset += data.copy(this.buffer, this._offset) + return + } + this._offset += this.buffer.write(data, this._offset) + } + + writeByte(data: number): void { + const bytes = 1 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeUInt8(data, this._offset) + } + + writeInt8(data: number) { + const bytes = 1 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeInt8(data, this._offset) + } + + writeInt16(data: number) { + const bytes = 2 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeInt16BE(data, this._offset) + } + + writeUInt8(data: number): void { + const bytes = 1 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeUInt8(data, this._offset) + } + + writeUInt16(data: number) { + const bytes = 2 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeUInt16BE(data, this._offset) + } + + writeUInt32(data: number): void { + const bytes = 4 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeUInt32BE(data, this._offset) + } + + writeInt32(data: number): void { + const bytes = 4 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeInt32BE(data, this._offset) + } + + writeUInt64(data: bigint): void { + const bytes = 8 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeBigUInt64BE(data, this._offset) + } + + writeInt64(data: bigint): void { + const bytes = 8 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeBigInt64BE(data, this._offset) + } + + writeString(data: string): void { + const bytes = 2 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeInt16BE(data.length, this._offset) + this.writeData(data) + } + + toBuffer(): Buffer { + return this.buffer.slice(0, this._offset) + } + + private growIfNeeded(additionalBytes: number) { + if ((this._offset + additionalBytes) / this.buffer.length > this.growthTriggerRatio) { + this.growBuffer(additionalBytes) + } + } + + private growBuffer(requiredBytes: number) { + const newSize = this.getNewSize(requiredBytes) + const data = Buffer.from(this.buffer) + this.buffer = Buffer.alloc(newSize) + data.copy(this.buffer, 0) + } + + private getNewSize(requiredBytes: number) { + const requiredNewSize = this.buffer.length * this.sizeMultiplier + this._offset + requiredBytes + if (this.maxBufferSize === DEFAULT_UNLIMITED_FRAME_MAX) return requiredNewSize + return Math.min(requiredNewSize, this.maxBufferSize) + } +} export abstract class AbstractRequest implements Request { abstract get key(): number abstract get responseKey(): number - abstract get version(): number + get version() { + return 1 + } toBuffer(bufferSizeParams?: BufferSizeParams, correlationId?: number): Buffer { const initialSize = bufferSizeParams?.initialSize ?? 65536 diff --git a/src/requests/close_request.ts b/src/requests/close_request.ts index f39f468f..cadbbed8 100644 --- a/src/requests/close_request.ts +++ b/src/requests/close_request.ts @@ -3,8 +3,10 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class CloseRequest extends AbstractRequest { + readonly responseKey = CloseResponse.key static readonly Key = 0x0016 static readonly Version = 1 + readonly key = CloseRequest.Key constructor(private params: { closingCode: number; closingReason: string }) { super() @@ -14,14 +16,4 @@ export class CloseRequest extends AbstractRequest { writer.writeUInt16(this.params.closingCode) writer.writeString(this.params.closingReason) } - - get key(): number { - return CloseRequest.Key - } - get responseKey(): number { - return CloseResponse.key - } - get version(): number { - return CloseRequest.Version - } } diff --git a/src/requests/consumer_update_response.ts b/src/requests/consumer_update_response.ts index 57767c98..1f4e17d3 100644 --- a/src/requests/consumer_update_response.ts +++ b/src/requests/consumer_update_response.ts @@ -4,8 +4,10 @@ import { DataWriter } from "./data_writer" import { Offset } from "./subscribe_request" export class ConsumerUpdateResponse extends AbstractRequest { + readonly responseKey = ConsumerUpdateQuery.key static readonly Key = 0x801a static readonly Version = 1 + readonly key = ConsumerUpdateResponse.Key constructor(private params: { correlationId: number; responseCode: number; offset: Offset }) { super() @@ -16,14 +18,4 @@ export class ConsumerUpdateResponse extends AbstractRequest { b.writeUInt16(this.params.responseCode) this.params.offset.write(b) } - - get key(): number { - return ConsumerUpdateResponse.Key - } - get responseKey(): number { - return ConsumerUpdateQuery.key - } - get version(): number { - return ConsumerUpdateResponse.Version - } } diff --git a/src/requests/create_stream_request.ts b/src/requests/create_stream_request.ts index a1931d22..85b8b221 100644 --- a/src/requests/create_stream_request.ts +++ b/src/requests/create_stream_request.ts @@ -11,8 +11,10 @@ export interface CreateStreamArguments { } export class CreateStreamRequest extends AbstractRequest { + readonly responseKey = CreateStreamResponse.key static readonly Key = 0x000d static readonly Version = 1 + readonly key = CreateStreamRequest.Key private readonly _arguments: { key: keyof CreateStreamArguments; value: string | number }[] = [] private readonly stream: string @@ -38,14 +40,4 @@ export class CreateStreamRequest extends AbstractRequest { writer.writeString(value.toString()) }) } - - get key(): number { - return CreateStreamRequest.Key - } - get responseKey(): number { - return CreateStreamResponse.key - } - get version(): number { - return CreateStreamRequest.Version - } } diff --git a/src/requests/create_super_stream_request.ts b/src/requests/create_super_stream_request.ts index e52c1a15..df300387 100644 --- a/src/requests/create_super_stream_request.ts +++ b/src/requests/create_super_stream_request.ts @@ -11,8 +11,10 @@ export interface CreateSuperStreamParams { } export class CreateSuperStreamRequest extends AbstractRequest { + readonly responseKey = CreateSuperStreamResponse.key static readonly Key = 0x001d static readonly Version = 1 + readonly key = CreateSuperStreamRequest.Key private readonly _arguments: { key: keyof CreateStreamArguments; value: string | number }[] = [] private readonly streamName: string private readonly partitions: string[] @@ -45,14 +47,4 @@ export class CreateSuperStreamRequest extends AbstractRequest { writer.writeString(value.toString()) }) } - - get key(): number { - return CreateSuperStreamRequest.Key - } - get responseKey(): number { - return CreateSuperStreamResponse.key - } - get version(): number { - return CreateSuperStreamRequest.Version - } } diff --git a/src/requests/credit_request.ts b/src/requests/credit_request.ts index ce3f97a9..65e72883 100644 --- a/src/requests/credit_request.ts +++ b/src/requests/credit_request.ts @@ -8,7 +8,9 @@ export type CreditRequestParams = { export class CreditRequest extends AbstractRequest { static readonly Key = 0x09 + readonly key = CreditRequest.Key static readonly Version = 1 + readonly responseKey = -1 constructor(private params: CreditRequestParams) { super() @@ -18,14 +20,4 @@ export class CreditRequest extends AbstractRequest { writer.writeUInt8(this.params.subscriptionId) writer.writeUInt16(this.params.credit) } - - get key(): number { - return CreditRequest.Key - } - get responseKey(): number { - return -1 - } - get version(): number { - return CreditRequest.Version - } } diff --git a/src/requests/declare_publisher_request.ts b/src/requests/declare_publisher_request.ts index 78f78564..190c1f52 100644 --- a/src/requests/declare_publisher_request.ts +++ b/src/requests/declare_publisher_request.ts @@ -3,8 +3,10 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class DeclarePublisherRequest extends AbstractRequest { + readonly responseKey = DeclarePublisherResponse.key static readonly Key = 0x0001 static readonly Version = 1 + readonly key = DeclarePublisherRequest.Key constructor(private params: { stream: string; publisherId: number; publisherRef?: string }) { super() @@ -15,14 +17,4 @@ export class DeclarePublisherRequest extends AbstractRequest { writer.writeString(this.params.publisherRef || "") writer.writeString(this.params.stream) } - - get key(): number { - return DeclarePublisherRequest.Key - } - get responseKey(): number { - return DeclarePublisherResponse.key - } - get version(): number { - return DeclarePublisherRequest.Version - } } diff --git a/src/requests/delete_publisher_request.ts b/src/requests/delete_publisher_request.ts index 39ad6768..3b558b1d 100644 --- a/src/requests/delete_publisher_request.ts +++ b/src/requests/delete_publisher_request.ts @@ -3,8 +3,10 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class DeletePublisherRequest extends AbstractRequest { + readonly responseKey = DeletePublisherResponse.key static readonly Key = 0x0006 static readonly Version = 1 + readonly key = DeletePublisherRequest.Key constructor(private publisherId: number) { super() @@ -13,14 +15,4 @@ export class DeletePublisherRequest extends AbstractRequest { writeContent(writer: DataWriter) { writer.writeUInt8(this.publisherId) } - - get key(): number { - return DeletePublisherRequest.Key - } - get responseKey(): number { - return DeletePublisherResponse.key - } - get version(): number { - return DeletePublisherRequest.Version - } } diff --git a/src/requests/delete_stream_request.ts b/src/requests/delete_stream_request.ts index 63705a76..f22f3690 100644 --- a/src/requests/delete_stream_request.ts +++ b/src/requests/delete_stream_request.ts @@ -4,7 +4,9 @@ import { DataWriter } from "./data_writer" export class DeleteStreamRequest extends AbstractRequest { static readonly Key = 0x000e + readonly key = DeleteStreamRequest.Key static readonly Version = 1 + readonly responseKey = DeleteStreamResponse.key private readonly stream: string constructor(stream: string) { @@ -15,14 +17,4 @@ export class DeleteStreamRequest extends AbstractRequest { protected writeContent(writer: DataWriter): void { writer.writeString(this.stream) } - - get key(): number { - return DeleteStreamRequest.Key - } - get responseKey(): number { - return DeleteStreamResponse.key - } - get version(): number { - return DeleteStreamRequest.Version - } } diff --git a/src/requests/delete_super_stream_request.ts b/src/requests/delete_super_stream_request.ts index 5b94212f..e607c5bd 100644 --- a/src/requests/delete_super_stream_request.ts +++ b/src/requests/delete_super_stream_request.ts @@ -4,7 +4,9 @@ import { DataWriter } from "./data_writer" export class DeleteSuperStreamRequest extends AbstractRequest { static readonly Key = 0x001e + readonly key = DeleteSuperStreamRequest.Key static readonly Version = 1 + readonly responseKey = DeleteSuperStreamResponse.key private readonly streamName: string constructor(streamName: string) { @@ -15,14 +17,4 @@ export class DeleteSuperStreamRequest extends AbstractRequest { protected writeContent(writer: DataWriter): void { writer.writeString(this.streamName) } - - get key(): number { - return DeleteSuperStreamRequest.Key - } - get responseKey(): number { - return DeleteSuperStreamResponse.key - } - get version(): number { - return DeleteSuperStreamRequest.Version - } } diff --git a/src/requests/exchange_command_versions_request.ts b/src/requests/exchange_command_versions_request.ts index 34f004c3..abbf7288 100644 --- a/src/requests/exchange_command_versions_request.ts +++ b/src/requests/exchange_command_versions_request.ts @@ -5,7 +5,9 @@ import { DataWriter } from "./data_writer" export class ExchangeCommandVersionsRequest extends AbstractRequest { static readonly Key = 0x001b + readonly key = ExchangeCommandVersionsRequest.Key static readonly Version = 1 + readonly responseKey = ExchangeCommandVersionsResponse.key constructor(readonly versions: Version[]) { super() } @@ -18,14 +20,4 @@ export class ExchangeCommandVersionsRequest extends AbstractRequest { writer.writeUInt16(entry.maxVersion) }) } - - get key(): number { - return ExchangeCommandVersionsRequest.Key - } - get responseKey(): number { - return ExchangeCommandVersionsResponse.key - } - get version(): number { - return ExchangeCommandVersionsRequest.Version - } } diff --git a/src/requests/heartbeat_request.ts b/src/requests/heartbeat_request.ts index 50b7cf3a..3d5bec93 100644 --- a/src/requests/heartbeat_request.ts +++ b/src/requests/heartbeat_request.ts @@ -3,20 +3,12 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class HeartbeatRequest extends AbstractRequest { + readonly responseKey = HeartbeatResponse.key static readonly Key = 0x0017 static readonly Version = 1 + readonly key = HeartbeatRequest.Key writeContent(_b: DataWriter) { return } - - get key(): number { - return HeartbeatRequest.Key - } - get responseKey(): number { - return HeartbeatResponse.key - } - get version(): number { - return HeartbeatRequest.Version - } } diff --git a/src/requests/metadata_request.ts b/src/requests/metadata_request.ts index 4d81aa70..d6311a18 100644 --- a/src/requests/metadata_request.ts +++ b/src/requests/metadata_request.ts @@ -3,8 +3,10 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class MetadataRequest extends AbstractRequest { + readonly responseKey = MetadataResponse.key static readonly Key = 0x000f static readonly Version = 1 + readonly key = MetadataRequest.Key constructor(private params: { streams: string[] }) { super() @@ -16,14 +18,4 @@ export class MetadataRequest extends AbstractRequest { writer.writeString(s) }) } - - get key(): number { - return MetadataRequest.Key - } - get responseKey(): number { - return MetadataResponse.key - } - get version(): number { - return MetadataRequest.Version - } } diff --git a/src/requests/metadata_update_request.ts b/src/requests/metadata_update_request.ts index f3a3d567..ba158aa2 100644 --- a/src/requests/metadata_update_request.ts +++ b/src/requests/metadata_update_request.ts @@ -3,8 +3,10 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class MetadataUpdateRequest extends AbstractRequest { + readonly responseKey = -1 static readonly Key = 0x0010 static readonly Version = 1 + readonly key = MetadataUpdateRequest.Key constructor(private params: { metadataInfo: MetadataInfo }) { super() @@ -14,14 +16,4 @@ export class MetadataUpdateRequest extends AbstractRequest { b.writeUInt16(this.params.metadataInfo.code) b.writeString(this.params.metadataInfo.stream) } - - get key(): number { - return MetadataUpdateRequest.Key - } - get responseKey(): number { - return -1 - } - get version(): number { - return MetadataUpdateRequest.Version - } } diff --git a/src/requests/open_request.ts b/src/requests/open_request.ts index d75aac5b..f5edb4c9 100644 --- a/src/requests/open_request.ts +++ b/src/requests/open_request.ts @@ -3,8 +3,10 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class OpenRequest extends AbstractRequest { + readonly responseKey = OpenResponse.key static readonly Key = 0x0015 static readonly Version = 1 + readonly key = OpenRequest.Key constructor(private params: { virtualHost: string }) { super() @@ -13,14 +15,4 @@ export class OpenRequest extends AbstractRequest { writeContent(writer: DataWriter) { writer.writeString(this.params.virtualHost) } - - get key(): number { - return OpenRequest.Key - } - get responseKey(): number { - return OpenResponse.key - } - get version(): number { - return OpenRequest.Version - } } diff --git a/src/requests/partitions_query.ts b/src/requests/partitions_query.ts index 3c60bee5..826c2480 100644 --- a/src/requests/partitions_query.ts +++ b/src/requests/partitions_query.ts @@ -3,8 +3,10 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class PartitionsQuery extends AbstractRequest { + readonly responseKey = PartitionsResponse.key static readonly Key = 0x0019 static readonly Version = 1 + readonly key = PartitionsQuery.Key constructor(private params: { superStream: string }) { super() @@ -13,14 +15,4 @@ export class PartitionsQuery extends AbstractRequest { writeContent(writer: DataWriter) { writer.writeString(this.params.superStream) } - - get key(): number { - return PartitionsQuery.Key - } - get responseKey(): number { - return PartitionsResponse.key - } - get version(): number { - return PartitionsQuery.Version - } } diff --git a/src/requests/peer_properties_request.ts b/src/requests/peer_properties_request.ts index 91a3a6bc..135a3b89 100644 --- a/src/requests/peer_properties_request.ts +++ b/src/requests/peer_properties_request.ts @@ -16,6 +16,8 @@ export const PROPERTIES = { export class PeerPropertiesRequest extends AbstractRequest { static readonly Key = 0x11 static readonly Version = 1 + readonly key = PeerPropertiesRequest.Key + readonly responseKey = PeerPropertiesResponse.key private readonly _properties: { key: string; value: string }[] = [] constructor(properties: Record = PROPERTIES) { @@ -30,14 +32,4 @@ export class PeerPropertiesRequest extends AbstractRequest { writer.writeString(value) }) } - - get key(): number { - return PeerPropertiesRequest.Key - } - get responseKey(): number { - return PeerPropertiesResponse.key - } - get version(): number { - return PeerPropertiesRequest.Version - } } diff --git a/src/requests/publish_request.ts b/src/requests/publish_request.ts index 564cb895..8bfe0f4b 100644 --- a/src/requests/publish_request.ts +++ b/src/requests/publish_request.ts @@ -17,6 +17,8 @@ interface PublishRequestParams { export class PublishRequest extends AbstractRequest { static readonly Key = 0x02 static readonly Version = 1 + readonly key = PublishRequest.Key + readonly responseKey = -1 constructor(private params: PublishRequestParams) { super() @@ -30,14 +32,4 @@ export class PublishRequest extends AbstractRequest { amqpEncode(writer, message) }) } - - get key(): number { - return PublishRequest.Key - } - get responseKey(): number { - return -1 - } - get version(): number { - return PublishRequest.Version - } } diff --git a/src/requests/publish_request_v2.ts b/src/requests/publish_request_v2.ts index 456dc876..b6158de5 100644 --- a/src/requests/publish_request_v2.ts +++ b/src/requests/publish_request_v2.ts @@ -11,6 +11,8 @@ interface PublishRequestParams { export class PublishRequestV2 extends AbstractRequest { static readonly Key = 0x02 static readonly Version = 2 + readonly key = PublishRequestV2.Key + readonly responseKey = -1 constructor(private params: PublishRequestParams) { super() @@ -26,12 +28,6 @@ export class PublishRequestV2 extends AbstractRequest { }) } - get key(): number { - return PublishRequestV2.Key - } - get responseKey(): number { - return -1 - } get version(): number { return PublishRequestV2.Version } diff --git a/src/requests/query_offset_request.ts b/src/requests/query_offset_request.ts index 78668d22..d64c69a9 100644 --- a/src/requests/query_offset_request.ts +++ b/src/requests/query_offset_request.ts @@ -3,8 +3,10 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class QueryOffsetRequest extends AbstractRequest { + readonly responseKey = QueryOffsetResponse.key static readonly Key = 0x000b static readonly Version = 1 + readonly key = QueryOffsetRequest.Key private readonly reference: string private readonly stream: string @@ -18,14 +20,4 @@ export class QueryOffsetRequest extends AbstractRequest { writer.writeString(this.reference) writer.writeString(this.stream) } - - get key(): number { - return QueryOffsetRequest.Key - } - get responseKey(): number { - return QueryOffsetResponse.key - } - get version(): number { - return QueryOffsetRequest.Version - } } diff --git a/src/requests/query_publisher_request.ts b/src/requests/query_publisher_request.ts index 1d5ba79f..d6491c02 100644 --- a/src/requests/query_publisher_request.ts +++ b/src/requests/query_publisher_request.ts @@ -5,6 +5,8 @@ import { DataWriter } from "./data_writer" export class QueryPublisherRequest extends AbstractRequest { static readonly Key = 0x0005 static readonly Version = 1 + readonly key = QueryPublisherRequest.Key + readonly responseKey = QueryPublisherResponse.key constructor(private params: { stream: string; publisherRef: string }) { super() @@ -14,14 +16,4 @@ export class QueryPublisherRequest extends AbstractRequest { writer.writeString(this.params.publisherRef) writer.writeString(this.params.stream) } - - get key(): number { - return QueryPublisherRequest.Key - } - get responseKey(): number { - return QueryPublisherResponse.key - } - get version(): number { - return QueryPublisherRequest.Version - } } diff --git a/src/requests/route_query.ts b/src/requests/route_query.ts index 9783b13b..0c4a1aac 100644 --- a/src/requests/route_query.ts +++ b/src/requests/route_query.ts @@ -3,8 +3,10 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class RouteQuery extends AbstractRequest { + readonly responseKey = RouteResponse.key static readonly Key = 0x0018 static readonly Version = 1 + readonly key = RouteQuery.Key constructor(private params: { routingKey: string; superStream: string }) { super() @@ -14,14 +16,4 @@ export class RouteQuery extends AbstractRequest { writer.writeString(this.params.routingKey) writer.writeString(this.params.superStream) } - - get key(): number { - return RouteQuery.Key - } - get responseKey(): number { - return RouteResponse.key - } - get version(): number { - return RouteQuery.Version - } } diff --git a/src/requests/sasl_authenticate_request.ts b/src/requests/sasl_authenticate_request.ts index 5929d8f0..96a0f910 100644 --- a/src/requests/sasl_authenticate_request.ts +++ b/src/requests/sasl_authenticate_request.ts @@ -3,8 +3,10 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class SaslAuthenticateRequest extends AbstractRequest { + readonly responseKey = SaslAuthenticateResponse.key static readonly Key = 0x0013 static readonly Version = 1 + readonly key = SaslAuthenticateRequest.Key constructor(private params: { mechanism: string; username: string; password: string }) { super() @@ -18,14 +20,4 @@ export class SaslAuthenticateRequest extends AbstractRequest { writer.writeUInt8(0) writer.writeData(this.params.password) } - - get key(): number { - return SaslAuthenticateRequest.Key - } - get responseKey(): number { - return SaslAuthenticateResponse.key - } - get version(): number { - return SaslAuthenticateRequest.Version - } } diff --git a/src/requests/sasl_handshake_request.ts b/src/requests/sasl_handshake_request.ts index e80c104a..6d7d6b08 100644 --- a/src/requests/sasl_handshake_request.ts +++ b/src/requests/sasl_handshake_request.ts @@ -3,20 +3,12 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class SaslHandshakeRequest extends AbstractRequest { + readonly responseKey = SaslHandshakeResponse.key static readonly Key = 0x0012 static readonly Version = 1 + readonly key = SaslHandshakeRequest.Key protected writeContent(_dw: DataWriter) { // do nothing } - - get key(): number { - return SaslHandshakeRequest.Key - } - get responseKey(): number { - return SaslHandshakeResponse.key - } - get version(): number { - return SaslHandshakeRequest.Version - } } diff --git a/src/requests/store_offset_request.ts b/src/requests/store_offset_request.ts index 5e13891b..9851f653 100644 --- a/src/requests/store_offset_request.ts +++ b/src/requests/store_offset_request.ts @@ -3,8 +3,10 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class StoreOffsetRequest extends AbstractRequest { + readonly responseKey = StoreOffsetResponse.key static readonly Key = 0x000a static readonly Version = 1 + readonly key = StoreOffsetRequest.Key private readonly reference: string private readonly stream: string private readonly offsetValue: bigint @@ -21,14 +23,4 @@ export class StoreOffsetRequest extends AbstractRequest { writer.writeString(this.stream) writer.writeUInt64(this.offsetValue) } - - get key(): number { - return StoreOffsetRequest.Key - } - get responseKey(): number { - return StoreOffsetResponse.key - } - get version(): number { - return StoreOffsetRequest.Version - } } diff --git a/src/requests/stream_stats_request.ts b/src/requests/stream_stats_request.ts index c0786df0..9d8f8e49 100644 --- a/src/requests/stream_stats_request.ts +++ b/src/requests/stream_stats_request.ts @@ -3,8 +3,10 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class StreamStatsRequest extends AbstractRequest { + readonly responseKey = StreamStatsResponse.key static readonly Key = 0x001c static readonly Version = 1 + readonly key = StreamStatsRequest.Key constructor(private streamName: string) { super() @@ -13,14 +15,4 @@ export class StreamStatsRequest extends AbstractRequest { writeContent(writer: DataWriter) { writer.writeString(this.streamName) } - - get key(): number { - return StreamStatsRequest.Key - } - get responseKey(): number { - return StreamStatsResponse.key - } - get version(): number { - return StreamStatsRequest.Version - } } diff --git a/src/requests/sub_entry_batch_publish_request.ts b/src/requests/sub_entry_batch_publish_request.ts index d61f8862..b584cb33 100644 --- a/src/requests/sub_entry_batch_publish_request.ts +++ b/src/requests/sub_entry_batch_publish_request.ts @@ -1,8 +1,7 @@ import { amqpEncode, messageSize } from "../amqp10/encoder" import { Compression, CompressionType } from "../compression" import { Message } from "../publisher" -import { AbstractRequest } from "./abstract_request" -import { BufferDataWriter } from "./buffer_data_writer" +import { AbstractRequest, BufferDataWriter } from "./abstract_request" import { DataWriter } from "./data_writer" interface SubEntryBatchPublishRequestParams { @@ -16,6 +15,8 @@ interface SubEntryBatchPublishRequestParams { export class SubEntryBatchPublishRequest extends AbstractRequest { static readonly Key = 0x02 static readonly Version = 1 + readonly key = SubEntryBatchPublishRequest.Key + readonly responseKey = -1 private readonly maxFrameSize: number constructor(private params: SubEntryBatchPublishRequestParams) { @@ -47,14 +48,4 @@ export class SubEntryBatchPublishRequest extends AbstractRequest { private encodeCompressionType(compressionType: CompressionType) { return 0x80 | (compressionType << 4) } - - get key(): number { - return SubEntryBatchPublishRequest.Key - } - get responseKey(): number { - return -1 - } - get version(): number { - return SubEntryBatchPublishRequest.Version - } } diff --git a/src/requests/subscribe_request.ts b/src/requests/subscribe_request.ts index f4e95de7..418958e9 100755 --- a/src/requests/subscribe_request.ts +++ b/src/requests/subscribe_request.ts @@ -45,6 +45,8 @@ export class Offset { export class SubscribeRequest extends AbstractRequest { static readonly Key = 0x0007 static readonly Version = 1 + readonly key = SubscribeRequest.Key + readonly responseKey = SubscribeResponse.key private readonly _properties: { key: string; value: string }[] = [] constructor( @@ -72,14 +74,4 @@ export class SubscribeRequest extends AbstractRequest { writer.writeString(value) }) } - - get key(): number { - return SubscribeRequest.Key - } - get responseKey(): number { - return SubscribeResponse.key - } - get version(): number { - return SubscribeRequest.Version - } } diff --git a/src/requests/tune_request.ts b/src/requests/tune_request.ts index e9353f82..cba98ec6 100644 --- a/src/requests/tune_request.ts +++ b/src/requests/tune_request.ts @@ -3,8 +3,10 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export class TuneRequest extends AbstractRequest { + readonly responseKey = TuneResponse.key static readonly Key = 0x0014 static readonly Version = 1 + readonly key = TuneRequest.Key constructor(private params: { frameMax: number; heartbeat: number }) { super() @@ -14,14 +16,4 @@ export class TuneRequest extends AbstractRequest { b.writeUInt32(this.params.frameMax) b.writeUInt32(this.params.heartbeat) } - - get key(): number { - return TuneRequest.Key - } - get responseKey(): number { - return TuneResponse.key - } - get version(): number { - return TuneRequest.Version - } } diff --git a/src/requests/unsubscribe_request.ts b/src/requests/unsubscribe_request.ts index edc86de5..ba42da99 100644 --- a/src/requests/unsubscribe_request.ts +++ b/src/requests/unsubscribe_request.ts @@ -5,6 +5,8 @@ import { DataWriter } from "./data_writer" export class UnsubscribeRequest extends AbstractRequest { static readonly Key = 0x000c static readonly Version = 1 + readonly key = UnsubscribeRequest.Key + readonly responseKey = UnsubscribeResponse.key constructor(private subscriptionId: number) { super() @@ -13,14 +15,4 @@ export class UnsubscribeRequest extends AbstractRequest { protected writeContent(writer: DataWriter): void { writer.writeUInt8(this.subscriptionId) } - - get key(): number { - return UnsubscribeRequest.Key - } - get responseKey(): number { - return UnsubscribeResponse.key - } - get version(): number { - return UnsubscribeRequest.Version - } } From 40f7aa2c4c43054d6f90b08c84d53f7439e8befd Mon Sep 17 00:00:00 2001 From: magne Date: Mon, 29 Jan 2024 15:45:37 +0100 Subject: [PATCH 5/8] Revert to standard time out for test --- test/e2e/declare_consumer.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index cd5f4485..9c2613f7 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -76,7 +76,7 @@ describe("declare consumer", () => { ) await eventually(() => expect(messages).eql([Buffer.from("hello")])) - }).timeout(15000) + }).timeout(10000) it("declaring multiple active consumers on an existing stream - only one consumer should handle the message", async () => { const messages: Buffer[] = [] From 34c7802159dac1cca83de98224f9a961b8ab8fc1 Mon Sep 17 00:00:00 2001 From: magne Date: Mon, 29 Jan 2024 16:04:04 +0100 Subject: [PATCH 6/8] PR review fixes --- src/client.ts | 1 - src/response_decoder.ts | 20 ++++---------------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/src/client.ts b/src/client.ts index ecd34b85..2197b7f7 100644 --- a/src/client.ts +++ b/src/client.ts @@ -139,7 +139,6 @@ export class Client { throw new Error(`Declare Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } if (filter && !connection.isFilteringEnabled) { - await connection.close() throw new Error(`Broker does not support message filtering.`) } const publisher = new StreamPublisher( diff --git a/src/response_decoder.ts b/src/response_decoder.ts index 802c6975..b438b5c4 100644 --- a/src/response_decoder.ts +++ b/src/response_decoder.ts @@ -120,33 +120,21 @@ function decodeResponse( ): PossibleRawResponses { const key = dataResponse.readUInt16() const version = dataResponse.readUInt16() - if (key === DeliverResponse.key && version === 1) { - const { subscriptionId, messages } = decodeDeliverResponse(dataResponse, getCompressionBy, logger) - const response: RawDeliverResponse = { - size, - key: key as DeliverResponse["key"], - version, - subscriptionId, - messages, - } - return response - } - if (key === DeliverResponseV2.key && version === 2) { + if (key === DeliverResponse.key) { const { subscriptionId, committedChunkId, messages } = decodeDeliverResponse( dataResponse, getCompressionBy, logger, version ) - const response: RawDeliverResponseV2 = { + return { size, key: key as DeliverResponse["key"], version, subscriptionId, - committedChunkId: committedChunkId!, + committedChunkId, messages, } - return response } if (key === TuneResponse.key) { const frameMax = dataResponse.readUInt32() @@ -216,7 +204,7 @@ function decodeDeliverResponse( dataResponse: DataReader, getCompressionBy: (type: CompressionType) => Compression, logger: Logger, - version: 1 | 2 = 1 + version = 1 ): DeliveryResponseDecoded { const subscriptionId = dataResponse.readUInt8() const committedChunkId = version === 2 ? dataResponse.readUInt64() : undefined From 620338c4e3d95cd64db58947548cb4e6d3e7bf8d Mon Sep 17 00:00:00 2001 From: magne Date: Mon, 29 Jan 2024 16:09:32 +0100 Subject: [PATCH 7/8] Rename of test --- test/e2e/filtering.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/filtering.test.ts b/test/e2e/filtering.test.ts index 4547aa50..7010398f 100644 --- a/test/e2e/filtering.test.ts +++ b/test/e2e/filtering.test.ts @@ -28,7 +28,7 @@ describe("filtering", () => { } catch (e) {} }) - it("is seen by rabbit and filtered", async () => { + it("can publish with filter value", async () => { const publisher = await client.declarePublisher( { stream: streamName, publisherRef: `my-publisher-${randomUUID()}` }, (msg) => msg.applicationProperties!["test"].toString() From f6f3cfd5f8955c4621c8f98f4102c48ff0d0b65d Mon Sep 17 00:00:00 2001 From: magne Date: Mon, 29 Jan 2024 16:31:46 +0100 Subject: [PATCH 8/8] Increase timeout for cluster --- test/e2e/filtering.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/filtering.test.ts b/test/e2e/filtering.test.ts index 7010398f..8d02b15f 100644 --- a/test/e2e/filtering.test.ts +++ b/test/e2e/filtering.test.ts @@ -45,6 +45,6 @@ describe("filtering", () => { await eventually(async () => { expect((await rabbit.getQueueInfo(streamName)).messages).eql(3) - }, 5000) + }, 10000) }).timeout(10000) })