Skip to content

Commit

Permalink
adds tls support
Browse files Browse the repository at this point in the history
  • Loading branch information
albertobarrila committed Dec 19, 2023
1 parent 033d3cd commit 35dac8b
Showing 1 changed file with 32 additions and 14 deletions.
46 changes: 32 additions & 14 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ import { QueryOffsetRequest } from "./requests/query_offset_request"
import { StoreOffsetRequest } from "./requests/store_offset_request"
import { Logger, NullLogger } from "./logger"
import { Compression, CompressionType, GzipCompression, NoneCompression } from "./compression"
import tls from "node:tls"

export class Connection {
private readonly socket = new Socket()
private socket: Socket
private correlationId = 100
private decoder: ResponseDecoder
private receivedResponses: Response[] = []
Expand All @@ -61,8 +62,17 @@ export class Connection {
private consumerId = 0
private consumers = new Map<number, Consumer>()
private compressions = new Map<CompressionType, Compression>()
private frameMax: number = DEFAULT_FRAME_MAX

private constructor(private readonly logger: Logger, private readonly params: ConnectionParams) {
if (params.frameMax) this.frameMax = params.frameMax
if (params.ssl) {
this.socket = tls.connect(params.port, params.hostname, { ...params.ssl, rejectUnauthorized: false })
} else {
this.socket = new Socket()
this.socket.connect(this.params.port, this.params.hostname)
}

constructor(private readonly logger: Logger, private frameMax: number = DEFAULT_FRAME_MAX) {
this.heartbeat = new Heartbeat(this, this.logger)
this.compressions.set(CompressionType.None, NoneCompression.create())
this.compressions.set(CompressionType.Gzip, GzipCompression.create())
Expand All @@ -89,29 +99,31 @@ export class Connection {
}

static connect(params: ConnectionParams, logger?: Logger): Promise<Connection> {
return new Connection(logger ?? new NullLogger(), params.frameMax).start(params)
return new Connection(logger ?? new NullLogger(), params).start()
}

public start(params: ConnectionParams): Promise<Connection> {
this.registerListeners(params.listeners)
public start(): Promise<Connection> {
this.registerListeners(this.params.listeners)
this.registerDelivers()
return new Promise((res, rej) => {
this.socket.on("error", (err) => {
this.logger.warn(`Error on connection ${params.hostname}:${params.port} vhost:${params.vhost} err: ${err}`)
this.logger.warn(
`Error on connection ${this.params.hostname}:${this.params.port} vhost:${this.params.vhost} err: ${err}`
)
return rej(err)
})
this.socket.on("connect", async () => {
this.logger.info(`Connected to RabbitMQ ${params.hostname}:${params.port}`)
this.logger.info(`Connected to RabbitMQ ${this.params.hostname}:${this.params.port}`)
await this.exchangeProperties()
await this.auth({ username: params.username, password: params.password })
const { heartbeat } = await this.tune(params.heartbeat ?? 0)
await this.open({ virtualHost: params.vhost })
await this.auth({ username: this.params.username, password: this.params.password })
const { heartbeat } = await this.tune(this.params.heartbeat ?? 0)
await this.open({ virtualHost: this.params.vhost })
this.heartbeat.start(heartbeat)
return res(this)
})
this.socket.on("drain", () => this.logger.warn(`Draining ${params.hostname}:${params.port}`))
this.socket.on("drain", () => this.logger.warn(`Draining ${this.params.hostname}:${this.params.port}`))
this.socket.on("timeout", () => {
this.logger.error(`Timeout ${params.hostname}:${params.port}`)
this.logger.error(`Timeout ${this.params.hostname}:${this.params.port}`)
return rej()
})
this.socket.on("data", (data) => {
Expand All @@ -121,7 +133,6 @@ export class Connection {
this.socket.on("close", (had_error) => {
this.logger.info(`Close event on socket, close cloud had_error? ${had_error}`)
})
this.socket.connect(params.port, params.hostname)
})
}
public on(event: "metadata_update", listener: MetadataUpdateListener): void
Expand Down Expand Up @@ -154,7 +165,7 @@ export class Connection {
this.logger.debug(`Close ...`)
const closeResponse = await this.sendAndWait<CloseResponse>(new CloseRequest(params))
this.logger.debug(`Close response: ${closeResponse.ok} - '${inspect(closeResponse)}'`)
return new Promise((res, _rej) => this.socket.end(() => res()))
this.socket.end()
}

public async declarePublisher(params: DeclarePublisherParams): Promise<Producer> {
Expand Down Expand Up @@ -482,6 +493,12 @@ export type ListenersParams = {
publish_error?: PublishErrorListener
}

export interface SSLConnectionParams {
key: string
cert: string
ca?: string
}

export interface ConnectionParams {
hostname: string
port: number
Expand All @@ -491,6 +508,7 @@ export interface ConnectionParams {
frameMax?: number
heartbeat?: number
listeners?: ListenersParams
ssl?: SSLConnectionParams
}

export interface DeclarePublisherParams {
Expand Down

0 comments on commit 35dac8b

Please sign in to comment.