Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Metadata query #124

Merged
merged 3 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import { StoreOffsetRequest } from "./requests/store_offset_request"
import { Logger, NullLogger } from "./logger"
import { Compression, CompressionType, GzipCompression, NoneCompression } from "./compression"
import tls from "node:tls"
import { MetadataResponse, StreamMetadata } from "./responses/metadata_response"
import { MetadataRequest } from "./requests/metadata_request"

export class Connection {
private socket: Socket
Expand Down Expand Up @@ -168,6 +170,18 @@ export class Connection {
this.socket.end()
}

public async queryMetadata(params: QueryMetadataParams): Promise<StreamMetadata[]> {
const { streams } = params
const res = await this.sendAndWait<MetadataResponse>(new MetadataRequest({ streams }))
if (!res.ok) {
throw new Error(`Query Metadata command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
this.logger.info(`Returned stream metadata for streams with names ${params.streams.join(",")}`)
const { streamInfos } = res

return streamInfos
}

public async declarePublisher(params: DeclarePublisherParams): Promise<Producer> {
const { stream, publisherRef } = params
const publisherId = this.incPublisherId()
Expand Down Expand Up @@ -541,6 +555,10 @@ export interface QueryOffsetParams {
stream: string
}

export interface QueryMetadataParams {
streams: string[]
}

export function connect(params: ConnectionParams, logger?: Logger): Promise<Connection> {
return Connection.connect(params, logger)
}
Expand Down
19 changes: 19 additions & 0 deletions src/requests/metadata_request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { MetadataResponse } from "../responses/metadata_response"
import { AbstractRequest } from "./abstract_request"
import { DataWriter } from "./data_writer"

export class MetadataRequest extends AbstractRequest {
readonly responseKey = MetadataResponse.key
readonly key = 0x000f

constructor(private params: { streams: string[] }) {
super()
}

writeContent(writer: DataWriter) {
writer.writeInt32(this.params.streams.length)
this.params.streams.forEach((s) => {
writer.writeString(s)
})
}
}
6 changes: 6 additions & 0 deletions src/response_decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { StreamStatsResponse } from "./responses/stream_stats_response"
import { SubscribeResponse } from "./responses/subscribe_response"
import { TuneResponse } from "./responses/tune_response"
import { UnsubscribeResponse } from "./responses/unsubscribe_response"
import { MetadataResponse } from "./responses/metadata_response"

// Frame => Size (Request | Response | Command)
// Size => uint32 (size without the 4 bytes of the size element)
Expand Down Expand Up @@ -158,6 +159,10 @@ function decodeResponse(
}
const correlationId = dataResponse.readUInt32()
const code = dataResponse.readUInt16()
if (key === MetadataResponse.key) {
// metadata response doesn't contain a code
dataResponse.rewind(2)
}
const payload = dataResponse.readToEnd()
return { size, key, version, correlationId, code, payload }
}
Expand Down Expand Up @@ -589,6 +594,7 @@ export class ResponseDecoder {
this.addFactoryFor(StreamStatsResponse)
this.addFactoryFor(StoreOffsetResponse)
this.addFactoryFor(QueryOffsetResponse)
this.addFactoryFor(MetadataResponse)
}

add(data: Buffer, getCompressionBy: (type: CompressionType) => Compression) {
Expand Down
67 changes: 67 additions & 0 deletions src/responses/metadata_response.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { AbstractResponse } from "./abstract_response"
import { DataReader, RawResponse } from "./raw_response"

export interface Broker {
reference: number
host: string
port: number
}

export interface StreamMetadata {
streamName: string
responseCode: number
leader?: Broker
replicas?: Broker[]
}

export class MetadataResponse extends AbstractResponse {
static key = 0x800f as const
readonly streamInfos: StreamMetadata[] = []

constructor(response: RawResponse) {
super(response)
this.verifyKey(MetadataResponse)

const payload = response.payload

const brokers: Broker[] = []

const noOfBrokers = payload.readInt32()
for (let i = 0; i < noOfBrokers; i++) {
brokers.push({
reference: payload.readUInt16(),
host: payload.readString(),
port: payload.readUInt32(),
})
}

const noOfStreamInfos = payload.readInt32()
for (let i = 0; i < noOfStreamInfos; i++) {
const streamName = payload.readString()
const streamInfo = {
streamName,
responseCode: payload.readUInt16(),
}
const leaderReference = payload.readUInt16()
const replicasReferences = this.readReplicasReferencesFrom(response.payload)
const leader = brokers?.find((b) => b.reference === leaderReference)
const replicas = brokers?.filter((b) => replicasReferences.includes(b.reference))
this.streamInfos.push({ ...streamInfo, leader, replicas })
}
}

private readReplicasReferencesFrom(payload: DataReader) {
const replicasReferences: number[] = []
const howMany = payload.readInt32()
for (let index = 0; index < howMany; index++) {
const reference = payload.readUInt16()
replicasReferences.push(reference)
}

return replicasReferences
}

get ok(): boolean {
return true
}
}
92 changes: 92 additions & 0 deletions test/e2e/metadata_query.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { expect } from "chai"
import { Connection } from "../../src"
import { createConnection, createStreamName } from "../support/fake_data"
import { Rabbit } from "../support/rabbit"
import { username, password } from "../support/util"
import { Broker } from "../../src/responses/metadata_response"

describe("query metadata", () => {
let streamName: string
let nonExistingStreamName: string
const rabbit = new Rabbit(username, password)
let connection: Connection
const RABBIT_TESTING_NODES: Broker[] = [
{
host: "localhost",
port: 5552,
reference: 0,
},
{
host: "rabbitmq",
port: 5552,
reference: 0,
},
]

beforeEach(async () => {
connection = await createConnection(username, password)
streamName = createStreamName()
nonExistingStreamName = createStreamName()
await rabbit.createStream(streamName)
})

afterEach(async () => {
try {
await connection.close()
await rabbit.deleteStream(streamName)
await rabbit.closeAllConnections()
await rabbit.deleteAllQueues({ match: /my-stream-/ })
} catch (e) {
console.error("Error on metadata query test teardown", e)
}
})

it("query the metadata - the response gets parsed correctly and no exception is thrown", async () => {
await connection.queryMetadata({ streams: [streamName] })
})

it("query the metadata - the server should return streamMetaData", async () => {
const [streamInfo] = await connection.queryMetadata({ streams: [streamName] })

expect(streamInfo).to.exist
expect(streamInfo.streamName).to.eql(streamName)
})

it("query the metadata - on a non-existing stream the leader or replicas should not be defined", async () => {
const [streamInfo] = await connection.queryMetadata({ streams: [nonExistingStreamName] })

expect(streamInfo.streamName).to.eql(nonExistingStreamName)
expect(streamInfo.leader).not.to.exist
expect(streamInfo.replicas).to.have.lengthOf(0)
})

it("querying the metadata - on an existing stream on a single node", async () => {
const [streamInfo] = await connection.queryMetadata({ streams: [streamName] })

expect(streamInfo.streamName).to.eql(streamName)
expect(streamInfo.responseCode).to.eql(1)
expect(streamInfo.leader).to.be.deep.oneOf(RABBIT_TESTING_NODES)
expect(streamInfo.replicas).to.have.lengthOf(0)
})

it("querying the metadata - query for multiple streams", async () => {
const secondStreamName = createStreamName()
await rabbit.createStream(secondStreamName)

const res = await connection.queryMetadata({ streams: [streamName, secondStreamName] })
await rabbit.deleteStream(secondStreamName)

const firstStreamInfo = res.find((i) => i.streamName === streamName)
const secondStreamInfo = res.find((i) => i.streamName === secondStreamName)
expect(firstStreamInfo).to.exist
expect(firstStreamInfo!.streamName).to.eql(streamName)
expect(firstStreamInfo!.responseCode).to.eql(1)
expect(firstStreamInfo!.leader).to.be.deep.oneOf(RABBIT_TESTING_NODES)
expect(firstStreamInfo!.replicas).to.have.lengthOf(0)
expect(secondStreamInfo).to.exist
expect(secondStreamInfo!.streamName).to.eql(secondStreamName)
expect(secondStreamInfo!.responseCode).to.eql(1)
expect(secondStreamInfo!.leader).to.be.deep.oneOf(RABBIT_TESTING_NODES)
expect(secondStreamInfo!.replicas).to.have.lengthOf(0)
})
})