From 35dac8bcc4ddf025e183139da6e7ea94c31650bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Barril=C3=A1?= Date: Tue, 12 Dec 2023 15:45:22 +0100 Subject: [PATCH] adds tls support --- src/connection.ts | 46 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index 6c93299c..a5b211c9 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -49,9 +49,10 @@ import { QueryOffsetRequest } from "./requests/query_offset_request" import { StoreOffsetRequest } from "./requests/store_offset_request" import { Logger, NullLogger } from "./logger" import { Compression, CompressionType, GzipCompression, NoneCompression } from "./compression" +import tls from "node:tls" export class Connection { - private readonly socket = new Socket() + private socket: Socket private correlationId = 100 private decoder: ResponseDecoder private receivedResponses: Response[] = [] @@ -61,8 +62,17 @@ export class Connection { private consumerId = 0 private consumers = new Map() private compressions = new Map() + private frameMax: number = DEFAULT_FRAME_MAX + + private constructor(private readonly logger: Logger, private readonly params: ConnectionParams) { + if (params.frameMax) this.frameMax = params.frameMax + if (params.ssl) { + this.socket = tls.connect(params.port, params.hostname, { ...params.ssl, rejectUnauthorized: false }) + } else { + this.socket = new Socket() + this.socket.connect(this.params.port, this.params.hostname) + } - constructor(private readonly logger: Logger, private frameMax: number = DEFAULT_FRAME_MAX) { this.heartbeat = new Heartbeat(this, this.logger) this.compressions.set(CompressionType.None, NoneCompression.create()) this.compressions.set(CompressionType.Gzip, GzipCompression.create()) @@ -89,29 +99,31 @@ export class Connection { } static connect(params: ConnectionParams, logger?: Logger): Promise { - return new Connection(logger ?? new NullLogger(), params.frameMax).start(params) + return new Connection(logger ?? new NullLogger(), params).start() } - public start(params: ConnectionParams): Promise { - this.registerListeners(params.listeners) + public start(): Promise { + this.registerListeners(this.params.listeners) this.registerDelivers() return new Promise((res, rej) => { this.socket.on("error", (err) => { - this.logger.warn(`Error on connection ${params.hostname}:${params.port} vhost:${params.vhost} err: ${err}`) + this.logger.warn( + `Error on connection ${this.params.hostname}:${this.params.port} vhost:${this.params.vhost} err: ${err}` + ) return rej(err) }) this.socket.on("connect", async () => { - this.logger.info(`Connected to RabbitMQ ${params.hostname}:${params.port}`) + this.logger.info(`Connected to RabbitMQ ${this.params.hostname}:${this.params.port}`) await this.exchangeProperties() - await this.auth({ username: params.username, password: params.password }) - const { heartbeat } = await this.tune(params.heartbeat ?? 0) - await this.open({ virtualHost: params.vhost }) + await this.auth({ username: this.params.username, password: this.params.password }) + const { heartbeat } = await this.tune(this.params.heartbeat ?? 0) + await this.open({ virtualHost: this.params.vhost }) this.heartbeat.start(heartbeat) return res(this) }) - this.socket.on("drain", () => this.logger.warn(`Draining ${params.hostname}:${params.port}`)) + this.socket.on("drain", () => this.logger.warn(`Draining ${this.params.hostname}:${this.params.port}`)) this.socket.on("timeout", () => { - this.logger.error(`Timeout ${params.hostname}:${params.port}`) + this.logger.error(`Timeout ${this.params.hostname}:${this.params.port}`) return rej() }) this.socket.on("data", (data) => { @@ -121,7 +133,6 @@ export class Connection { this.socket.on("close", (had_error) => { this.logger.info(`Close event on socket, close cloud had_error? ${had_error}`) }) - this.socket.connect(params.port, params.hostname) }) } public on(event: "metadata_update", listener: MetadataUpdateListener): void @@ -154,7 +165,7 @@ export class Connection { this.logger.debug(`Close ...`) const closeResponse = await this.sendAndWait(new CloseRequest(params)) this.logger.debug(`Close response: ${closeResponse.ok} - '${inspect(closeResponse)}'`) - return new Promise((res, _rej) => this.socket.end(() => res())) + this.socket.end() } public async declarePublisher(params: DeclarePublisherParams): Promise { @@ -482,6 +493,12 @@ export type ListenersParams = { publish_error?: PublishErrorListener } +export interface SSLConnectionParams { + key: string + cert: string + ca?: string +} + export interface ConnectionParams { hostname: string port: number @@ -491,6 +508,7 @@ export interface ConnectionParams { frameMax?: number heartbeat?: number listeners?: ListenersParams + ssl?: SSLConnectionParams } export interface DeclarePublisherParams {