Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working with a load balancer #135

Merged
merged 9 commits into from
Jan 8, 2024
1 change: 1 addition & 0 deletions cluster/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ RABBITMQ_PASSWORD="rabbit"
RABBIT_MQ_MANAGEMENT_PORT=15673
RABBIT_MQ_AMQP_PORT=5555
RABBIT_MQ_TEST_NODES="node0:5562;node1:5572;node2:5582"
RABBIT_MQ_TEST_ADDRESS_BALANCER="localhost:5553"
```

then run the following
Expand Down
63 changes: 56 additions & 7 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import { DeclarePublisherResponse } from "./responses/declare_publisher_response
import { DeletePublisherResponse } from "./responses/delete_publisher_response"
import { DeleteStreamResponse } from "./responses/delete_stream_response"
import { DeliverResponse } from "./responses/deliver_response"
import { MetadataResponse, StreamMetadata } from "./responses/metadata_response"
import { Broker, MetadataResponse, StreamMetadata } from "./responses/metadata_response"
import { OpenResponse } from "./responses/open_response"
import { PeerPropertiesResponse } from "./responses/peer_properties_response"
import { QueryOffsetResponse } from "./responses/query_offset_response"
Expand Down Expand Up @@ -72,6 +72,7 @@ export class Client {
private frameMax: number = DEFAULT_FRAME_MAX
private connectionId: string
private connectionClosedListener: ConnectionClosedListener | undefined
private serverEndpoint: { host: string; port: number } = { host: "", port: 5552 }

private constructor(private readonly logger: Logger, private readonly params: ConnectionParams) {
if (params.frameMax) this.frameMax = params.frameMax
Expand Down Expand Up @@ -386,7 +387,7 @@ export class Client {
}

public getConnectionInfo(): { host: string; port: number; id: string } {
return { host: this.params.hostname, port: this.params.port, id: this.connectionId }
return { host: this.serverEndpoint.host, port: this.serverEndpoint.port, id: this.connectionId }
}

private responseReceived<T extends Response>(response: T) {
Expand Down Expand Up @@ -469,6 +470,9 @@ export class Client {
this.logger.debug(`Open ...`)
const res = await this.sendAndWait<OpenResponse>(new OpenRequest(params))
this.logger.debug(`Open response: ${res.ok} - '${inspect(res.properties)}'`)
const advertisedHost = res.properties["advertised_host"] ?? ""
const advertisedPort = parseInt(res.properties["advertised_port"] ?? "5552")
this.serverEndpoint = { host: advertisedHost, port: advertisedPort }
return res
}

Expand Down Expand Up @@ -565,18 +569,43 @@ export class Client {
connectionClosedListener?: ConnectionClosedListener
): Promise<Client> {
const [metadata] = await this.queryMetadata({ streams: [streamName] })
const chosenNode = leader ? metadata.leader : sample([metadata.leader, ...(metadata.replicas ?? [])])
const chosenNode = chooseNode(metadata, leader)
if (!chosenNode) {
throw new Error(`Stream was not found on any node`)
}
const listeners = { ...this.params.listeners, connection_closed: connectionClosedListener }
const connectionParams = { ...this.params, listeners: listeners }
const newClient = await connect(
{ ...connectionParams, hostname: chosenNode.host, port: chosenNode.port },
this.logger
)
const newClient = await this.getConnectionOnChosenNode(chosenNode, connectionParams, metadata)
return newClient
}

private async getConnectionOnChosenNode(
chosenNode: { host: string; port: number },
connectionParams: ConnectionParams,
metadata: StreamMetadata
): Promise<Client> {
if (this.params.addressResolver && this.params.addressResolver.enabled) {
const maxAttempts = computeMaxAttempts(metadata)
const resolver = this.params.addressResolver
let currentAttempt = 0
while (currentAttempt < maxAttempts) {
this.logger.debug(`Attempting to connect using the address resolver - attempt ${currentAttempt + 1}`)
const client = await connect(
{ ...connectionParams, hostname: resolver.endpoint.host, port: resolver.endpoint.port },
this.logger
)
if (client.serverEndpoint.host === chosenNode.host && client.serverEndpoint.port === chosenNode.port) {
this.logger.debug(`Correct connection was found!`)
return client
}
this.logger.debug(`The node found was not the right one - closing the connection`)
await client.close()
currentAttempt++
}
throw new Error(`Could not find broker (${chosenNode.host}:${chosenNode.port}) after ${maxAttempts} attempts`)
}
return connect({ ...connectionParams, hostname: chosenNode.host, port: chosenNode.port }, this.logger)
}
}

export type ListenersParams = {
Expand All @@ -592,6 +621,13 @@ export interface SSLConnectionParams {
ca?: string
}

export type AddressResolverParams =
| {
enabled: true
endpoint: { host: string; port: number }
}
| { enabled: false }

export interface ConnectionParams {
hostname: string
port: number
Expand All @@ -604,6 +640,7 @@ export interface ConnectionParams {
ssl?: SSLConnectionParams
bufferSizeSettings?: BufferSizeSettings
socketTimeout?: number
addressResolver?: AddressResolverParams
}

export interface DeclarePublisherParams {
Expand Down Expand Up @@ -676,3 +713,15 @@ const addOffsetFilterToHandle = (handle: ConsumerFunc, offset: Offset): Consumer
}
return handle
}

const chooseNode = (metadata: { leader?: Broker; replicas?: Broker[] }, leader: boolean): Broker | undefined => {
if (leader) {
return metadata.leader
}
const chosenNode = metadata.replicas?.length ? sample(metadata.replicas) : metadata.leader
return chosenNode
}

const computeMaxAttempts = (metadata: { leader?: Broker; replicas?: Broker[] }): number => {
return (2 + (metadata.leader ? 1 : 0) + (metadata.replicas?.length ?? 0)) ^ 2
}
78 changes: 78 additions & 0 deletions test/e2e/address_resolver.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { expect } from "chai"
import { Client, connect } from "../../src"
import { Message } from "../../src/producer"
import { Offset } from "../../src/requests/subscribe_request"
import { getAddressResolverFromEnv } from "../../src/util"
import { createStreamName } from "../support/fake_data"
import { Rabbit } from "../support/rabbit"
import { getTestNodesFromEnv, password, username, wait } from "../support/util"

describe("address resolver", () => {
let streamName: string
const rabbit = new Rabbit(username, password)
let client: Client

beforeEach(async () => {
const [firstNode] = getTestNodesFromEnv()
const resolver = getAddressResolverFromEnv()
client = await connect({
hostname: firstNode.host,
port: firstNode.port,
username,
password,
vhost: "/",
frameMax: 0,
heartbeat: 0,
addressResolver: { enabled: true, endpoint: resolver },
})
streamName = createStreamName()
await rabbit.createStream(streamName)
// wait for replicas to be created
await wait(200)
})

afterEach(async () => {
try {
await client.close()
await rabbit.deleteStream(streamName)
await rabbit.closeAllConnections()
await rabbit.deleteAllQueues({ match: /my-stream-/ })
} catch (e) {}
})

it("declaring a consumer - should not throw", async () => {
await client.declareConsumer({ stream: streamName, offset: Offset.first() }, (_: Message) => {
console.log("Message received")
})
})

it("declaring a consumer - if multiple nodes are present the consumer should be connected to a replica", async () => {
const consumer = await client.declareConsumer({ stream: streamName, offset: Offset.first() }, (_: Message) => {
console.log("Message received")
})

const connectionInfo = consumer.getConnectionInfo()
const queueInfo = await rabbit.getQueueInfo(streamName)
const nodes = await rabbit.getNodes()
if (nodes.length > 1) {
expect(extractNodeName(queueInfo.node)).not.to.be.eql(connectionInfo.host)
}
})

it("declaring a producer - should not throw", async () => {
await client.declarePublisher({ stream: streamName })
})

it("declaring a producer - the producer should be connected to the leader", async () => {
const producer = await client.declarePublisher({ stream: streamName })

const connectionInfo = producer.getConnectionInfo()
const queueInfo = await rabbit.getQueueInfo(streamName)
expect(extractNodeName(queueInfo.node)).to.be.eql(connectionInfo.host)
})
})

const extractNodeName = (node: string): string => {
const [_, name] = node.split("@")
return name
}
10 changes: 10 additions & 0 deletions test/support/rabbit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ interface MessageInfoResponse {
messages_ready: number
messages_unacknowledged: number
types: "stream" | "quorum" | "classic"
node: string
}

interface RabbitPublishersResponse {
Expand Down Expand Up @@ -224,4 +225,13 @@ export class Rabbit {
const l = await this.getQueues()
await Promise.all(l.filter((q) => match && q.name.match(match)).map((q) => this.deleteQueue("%2F", q.name)))
}

async getNodes(): Promise<string[]> {
const ret = await got.get<{ name: string }[]>(`http://${this.firstNode.host}:${this.port}/api/nodes`, {
username: this.username,
password: this.password,
responseType: "json",
})
return ret.body.map((n) => n.name)
}
}
7 changes: 0 additions & 7 deletions test/unit/blah.test.ts

This file was deleted.