Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds filtering feature on publishing side #171

Merged
merged 8 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 39 additions & 16 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Compression, CompressionType, GzipCompression, NoneCompression } from "
import { Consumer, ConsumerFunc, StreamConsumer } from "./consumer"
import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes"
import { Logger, NullLogger } from "./logger"
import { Message, Publisher, StreamPublisher } from "./publisher"
import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher"
import { ConsumerUpdateResponse } from "./requests/consumer_update_response"
import { CreateStreamArguments, CreateStreamRequest } from "./requests/create_stream_request"
import { CreditRequest, CreditRequestParams } from "./requests/credit_request"
Expand Down Expand Up @@ -41,6 +41,7 @@ import { DeleteSuperStreamRequest } from "./requests/delete_super_stream_request
import { lt, coerce } from "semver"
import { ConnectionInfo, Connection, errorMessageOf } from "./connection"
import { ConnectionPool } from "./connection_pool"
import { DeliverResponseV2 } from "./responses/deliver_response_v2"

export type ConnectionClosedListener = (hadError: boolean) => void

Expand Down Expand Up @@ -125,7 +126,7 @@ export class Client {
return res.streams
}

public async declarePublisher(params: DeclarePublisherParams): Promise<Publisher> {
public async declarePublisher(params: DeclarePublisherParams, filter?: FilterFunc): Promise<Publisher> {
const { stream, publisherRef } = params
const publisherId = this.incPublisherId()

Expand All @@ -137,16 +138,23 @@ export class Client {
await connection.close()
throw new Error(`Declare Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
const publisher = new StreamPublisher({
connection: connection,
stream: params.stream,
publisherId: publisherId,
publisherRef: params.publisherRef,
boot: params.boot,
maxFrameSize: this.maxFrameSize,
maxChunkLength: params.maxChunkLength,
logger: this.logger,
})
if (filter && !connection.isFilteringEnabled) {
tarzacodes marked this conversation as resolved.
Show resolved Hide resolved
await connection.close()
throw new Error(`Broker does not support message filtering.`)
}
const publisher = new StreamPublisher(
{
connection: connection,
stream: params.stream,
publisherId: publisherId,
publisherRef: params.publisherRef,
boot: params.boot,
maxFrameSize: this.maxFrameSize,
maxChunkLength: params.maxChunkLength,
logger: this.logger,
},
filter
)
this.publishers.set(publisherId, { publisher: publisher, connection: connection })
this.logger.info(
`New publisher created with stream name ${params.stream}, publisher id ${publisherId} and publisher reference ${params.publisherRef}`
Expand Down Expand Up @@ -410,14 +418,28 @@ export class Client {
return consumerId
}

private getDeliverCallback() {
private getDeliverV1Callback() {
return async (response: DeliverResponse) => {
const consumer = this.consumers.get(response.subscriptionId)
if (!consumer) {
this.logger.error(`On deliver no consumer found`)
this.logger.error(`On deliverV1 no consumer found`)
return
}
this.logger.debug(`on deliverV1 -> ${consumer.consumerRef}`)
this.logger.debug(`response.messages.length: ${response.messages.length}`)
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId })
response.messages.map((x) => consumer.handle(x))
}
}

private getDeliverV2Callback() {
return async (response: DeliverResponseV2) => {
const consumer = this.consumers.get(response.subscriptionId)
if (!consumer) {
this.logger.error(`On deliverV2 no consumer found`)
return
}
this.logger.debug(`on deliver -> ${consumer.consumerRef}`)
this.logger.debug(`on deliverV2 -> ${consumer.consumerRef}`)
this.logger.debug(`response.messages.length: ${response.messages.length}`)
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId })
response.messages.map((x) => consumer.handle(x))
Expand Down Expand Up @@ -493,7 +515,8 @@ export class Client {
const connectionListeners = {
...this.params.listeners,
connection_closed: connectionClosedListener,
deliver: this.getDeliverCallback(),
deliverV1: this.getDeliverV1Callback(),
deliverV2: this.getDeliverV2Callback(),
consumer_update_query: this.getConsumerUpdateCallback(),
}
return { ...this.params, listeners: connectionListeners, leader: leader, streamName: streamName }
Expand Down
35 changes: 28 additions & 7 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { TuneRequest } from "./requests/tune_request"
import {
ConsumerUpdateQueryListener,
DeliverListener,
DeliverV2Listener,
MetadataUpdateListener,
PublishConfirmListener,
PublishErrorListener,
Expand All @@ -29,7 +30,7 @@ import { Response } from "./responses/response"
import { SaslAuthenticateResponse } from "./responses/sasl_authenticate_response"
import { SaslHandshakeResponse } from "./responses/sasl_handshake_response"
import { TuneResponse } from "./responses/tune_response"
import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX, removeFrom } from "./util"
import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, removeFrom } from "./util"
import { Version, checkServerDeclaredVersions, getClientSupportedVersions } from "./versions"
import { WaitingResponse } from "./waiting_response"
import { ClientListenersParams, ClientParams, ClosingParams, QueryOffsetParams, StoreOffsetParams } from "./client"
Expand All @@ -38,11 +39,13 @@ import { QueryPublisherRequest } from "./requests/query_publisher_request"
import { StoreOffsetRequest } from "./requests/store_offset_request"
import { QueryOffsetResponse } from "./responses/query_offset_response"
import { QueryOffsetRequest } from "./requests/query_offset_request"
import { coerce, lt } from "semver"

export type ConnectionClosedListener = (hadError: boolean) => void

export type ConnectionProxyListenersParams = ClientListenersParams & {
deliver?: DeliverListener
deliverV1?: DeliverListener
deliverV2?: DeliverV2Listener
consumer_update_query?: ConsumerUpdateQueryListener
}

Expand Down Expand Up @@ -82,6 +85,7 @@ export class Connection {
private serverEndpoint: { host: string; port: number } = { host: "", port: 5552 }
private readonly serverDeclaredVersions: Version[] = []
private refs: number = 0
private filteringEnabled: boolean = false

constructor(private readonly params: ConnectionProxyParams, private readonly logger: Logger) {
this.hostname = params.hostname
Expand Down Expand Up @@ -125,6 +129,7 @@ export class Connection {
this.socket.on("connect", async () => {
this.logger.info(`Connected to RabbitMQ ${this.params.hostname}:${this.params.port}`)
this.peerProperties = (await this.exchangeProperties()).properties
this.filteringEnabled = lt(coerce(this.rabbitManagementVersion)!, REQUIRED_MANAGEMENT_VERSION) ? false : true
await this.auth({ username: this.params.username, password: this.params.password })
const { heartbeat } = await this.tune(this.params.heartbeat ?? 0)
await this.open({ virtualHost: this.params.vhost })
Expand All @@ -151,15 +156,23 @@ export class Connection {
public on(event: "metadata_update", listener: MetadataUpdateListener): void
public on(event: "publish_confirm", listener: PublishConfirmListener): void
public on(event: "publish_error", listener: PublishErrorListener): void
public on(event: "deliver", listener: DeliverListener): void
public on(event: "deliverV1", listener: DeliverListener): void
public on(event: "deliverV2", listener: DeliverV2Listener): void
public on(event: "consumer_update_query", listener: ConsumerUpdateQueryListener): void
public on(
event: "metadata_update" | "publish_confirm" | "publish_error" | "deliver" | "consumer_update_query",
event:
| "metadata_update"
| "publish_confirm"
| "publish_error"
| "deliverV1"
| "deliverV2"
| "consumer_update_query",
listener:
| MetadataUpdateListener
| PublishConfirmListener
| PublishErrorListener
| DeliverListener
| DeliverV2Listener
| ConsumerUpdateQueryListener
) {
switch (event) {
Expand All @@ -172,8 +185,11 @@ export class Connection {
case "publish_error":
this.decoder.on("publish_error", listener as PublishErrorListener)
break
case "deliver":
this.decoder.on("deliver", listener as DeliverListener)
case "deliverV1":
this.decoder.on("deliverV1", listener as DeliverListener)
break
case "deliverV2":
this.decoder.on("deliverV2", listener as DeliverV2Listener)
break
case "consumer_update_query":
this.decoder.on("consumer_update_query", listener as ConsumerUpdateQueryListener)
Expand All @@ -187,7 +203,8 @@ export class Connection {
if (listeners?.metadata_update) this.decoder.on("metadata_update", listeners.metadata_update)
if (listeners?.publish_confirm) this.decoder.on("publish_confirm", listeners.publish_confirm)
if (listeners?.publish_error) this.decoder.on("publish_error", listeners.publish_error)
if (listeners?.deliver) this.decoder.on("deliver", listeners.deliver)
if (listeners?.deliverV1) this.decoder.on("deliverV1", listeners.deliverV1)
if (listeners?.deliverV2) this.decoder.on("deliverV2", listeners.deliverV2)
if (listeners?.consumer_update_query) this.decoder.on("consumer_update_query", listeners.consumer_update_query)
}

Expand Down Expand Up @@ -332,6 +349,10 @@ export class Connection {
return this.peerProperties.version
}

public get isFilteringEnabled() {
return this.filteringEnabled
}

private async auth(params: { username: string; password: string }) {
this.logger.debug(`Start authentication process ...`)
this.logger.debug(`Start SASL handshake ...`)
Expand Down
50 changes: 34 additions & 16 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { DEFAULT_UNLIMITED_FRAME_MAX } from "./util"
import { MetadataUpdateListener } from "./response_decoder"
import { ConnectionInfo, Connection } from "./connection"
import { ConnectionPool } from "./connection_pool"
import { PublishRequestV2 } from "./requests/publish_request_v2"

export type MessageApplicationProperties = Record<string, string | number>

Expand Down Expand Up @@ -70,6 +71,7 @@ export interface Publisher {
readonly publisherId: number
}

export type FilterFunc = (msg: Message) => string | undefined
type PublishConfirmCallback = (err: number | null, publishingIds: bigint[]) => void
export class StreamPublisher implements Publisher {
private connection: Connection
Expand All @@ -85,16 +87,19 @@ export class StreamPublisher implements Publisher {
private maxChunkLength: number
private closed = false

constructor(params: {
connection: Connection
stream: string
publisherId: number
publisherRef?: string
boot?: boolean
maxFrameSize: number
maxChunkLength?: number
logger: Logger
}) {
constructor(
params: {
connection: Connection
stream: string
publisherId: number
publisherRef?: string
boot?: boolean
maxFrameSize: number
maxChunkLength?: number
logger: Logger
},
private readonly filter?: FilterFunc
) {
this.connection = params.connection
this.stream = params.stream
this.publisherId = params.publisherId
Expand Down Expand Up @@ -187,6 +192,12 @@ export class StreamPublisher implements Publisher {
}

private async enqueue(publishRequestMessage: PublishRequestMessage) {
if (this.filter) {
publishRequestMessage.filterValue = this.filter(publishRequestMessage.message)
}
if (!this.connection.isFilteringEnabled && this.filter) {
throw new Error(`Your rabbit server management version does not support filtering.`)
}
this.checkMessageSize(publishRequestMessage)
const sendCycleNeeded = this.add(publishRequestMessage)
let sent = false
Expand All @@ -211,12 +222,19 @@ export class StreamPublisher implements Publisher {
private async sendBuffer() {
const chunk = this.popChunk()
if (chunk.length > 0) {
await this.connection.send(
new PublishRequest({
publisherId: this.publisherId,
messages: chunk,
})
)
this.filter
? await this.connection.send(
new PublishRequestV2({
publisherId: this.publisherId,
messages: chunk,
})
)
: await this.connection.send(
new PublishRequest({
publisherId: this.publisherId,
messages: chunk,
})
)
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/requests/abstract_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ export class BufferDataWriter implements DataWriter {
this._offset = this.buffer.writeInt8(data, this._offset)
}

writeInt16(data: number) {
const bytes = 2
this.growIfNeeded(bytes)
this._offset = this.buffer.writeInt16BE(data, this._offset)
}

writeUInt8(data: number): void {
const bytes = 1
this.growIfNeeded(bytes)
Expand Down Expand Up @@ -113,7 +119,9 @@ export class BufferDataWriter implements DataWriter {
export abstract class AbstractRequest implements Request {
abstract get key(): number
abstract get responseKey(): number
readonly version = 1
get version() {
return 1
}

toBuffer(bufferSizeParams?: BufferSizeParams, correlationId?: number): Buffer {
const initialSize = bufferSizeParams?.initialSize ?? 65536
Expand Down
Loading
Loading