diff --git a/src/connection.ts b/src/connection.ts index a5b211c9..b30fd03d 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -50,6 +50,8 @@ import { StoreOffsetRequest } from "./requests/store_offset_request" import { Logger, NullLogger } from "./logger" import { Compression, CompressionType, GzipCompression, NoneCompression } from "./compression" import tls from "node:tls" +import { MetadataResponse, StreamMetadata } from "./responses/metadata_response" +import { MetadataRequest } from "./requests/metadata_request" export class Connection { private socket: Socket @@ -168,6 +170,18 @@ export class Connection { this.socket.end() } + public async queryMetadata(params: QueryMetadataParams): Promise { + const { streams } = params + const res = await this.sendAndWait(new MetadataRequest({ streams })) + if (!res.ok) { + throw new Error(`Query Metadata command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) + } + this.logger.info(`Returned stream metadata for streams with names ${params.streams.join(",")}`) + const { streamInfos } = res + + return streamInfos + } + public async declarePublisher(params: DeclarePublisherParams): Promise { const { stream, publisherRef } = params const publisherId = this.incPublisherId() @@ -541,6 +555,10 @@ export interface QueryOffsetParams { stream: string } +export interface QueryMetadataParams { + streams: string[] +} + export function connect(params: ConnectionParams, logger?: Logger): Promise { return Connection.connect(params, logger) } diff --git a/src/requests/metadata_request.ts b/src/requests/metadata_request.ts new file mode 100644 index 00000000..1b532028 --- /dev/null +++ b/src/requests/metadata_request.ts @@ -0,0 +1,19 @@ +import { MetadataResponse } from "../responses/metadata_response" +import { AbstractRequest } from "./abstract_request" +import { DataWriter } from "./data_writer" + +export class MetadataRequest extends AbstractRequest { + readonly responseKey = MetadataResponse.key + readonly key = 0x000f + + constructor(private params: { streams: string[] }) { + super() + } + + writeContent(writer: DataWriter) { + writer.writeInt32(this.params.streams.length) + this.params.streams.forEach((s) => { + writer.writeString(s) + }) + } +} diff --git a/src/response_decoder.ts b/src/response_decoder.ts index c46fa1a3..563f62fd 100644 --- a/src/response_decoder.ts +++ b/src/response_decoder.ts @@ -43,6 +43,7 @@ import { StreamStatsResponse } from "./responses/stream_stats_response" import { SubscribeResponse } from "./responses/subscribe_response" import { TuneResponse } from "./responses/tune_response" import { UnsubscribeResponse } from "./responses/unsubscribe_response" +import { MetadataResponse } from "./responses/metadata_response" // Frame => Size (Request | Response | Command) // Size => uint32 (size without the 4 bytes of the size element) @@ -158,6 +159,10 @@ function decodeResponse( } const correlationId = dataResponse.readUInt32() const code = dataResponse.readUInt16() + if (key === MetadataResponse.key) { + // metadata response doesn't contain a code + dataResponse.rewind(2) + } const payload = dataResponse.readToEnd() return { size, key, version, correlationId, code, payload } } @@ -589,6 +594,7 @@ export class ResponseDecoder { this.addFactoryFor(StreamStatsResponse) this.addFactoryFor(StoreOffsetResponse) this.addFactoryFor(QueryOffsetResponse) + this.addFactoryFor(MetadataResponse) } add(data: Buffer, getCompressionBy: (type: CompressionType) => Compression) { diff --git a/src/responses/metadata_response.ts b/src/responses/metadata_response.ts new file mode 100644 index 00000000..999484a9 --- /dev/null +++ b/src/responses/metadata_response.ts @@ -0,0 +1,67 @@ +import { AbstractResponse } from "./abstract_response" +import { DataReader, RawResponse } from "./raw_response" + +export interface Broker { + reference: number + host: string + port: number +} + +export interface StreamMetadata { + streamName: string + responseCode: number + leader?: Broker + replicas?: Broker[] +} + +export class MetadataResponse extends AbstractResponse { + static key = 0x800f as const + readonly streamInfos: StreamMetadata[] = [] + + constructor(response: RawResponse) { + super(response) + this.verifyKey(MetadataResponse) + + const payload = response.payload + + const brokers: Broker[] = [] + + const noOfBrokers = payload.readInt32() + for (let i = 0; i < noOfBrokers; i++) { + brokers.push({ + reference: payload.readUInt16(), + host: payload.readString(), + port: payload.readUInt32(), + }) + } + + const noOfStreamInfos = payload.readInt32() + for (let i = 0; i < noOfStreamInfos; i++) { + const streamName = payload.readString() + const streamInfo = { + streamName, + responseCode: payload.readUInt16(), + } + const leaderReference = payload.readUInt16() + const replicasReferences = this.readReplicasReferencesFrom(response.payload) + const leader = brokers?.find((b) => b.reference === leaderReference) + const replicas = brokers?.filter((b) => replicasReferences.includes(b.reference)) + this.streamInfos.push({ ...streamInfo, leader, replicas }) + } + } + + private readReplicasReferencesFrom(payload: DataReader) { + const replicasReferences: number[] = [] + const howMany = payload.readInt32() + for (let index = 0; index < howMany; index++) { + const reference = payload.readUInt16() + replicasReferences.push(reference) + } + + return replicasReferences + } + + get ok(): boolean { + return true + } +} diff --git a/test/e2e/metadata_query.test.ts b/test/e2e/metadata_query.test.ts new file mode 100644 index 00000000..ccb63d18 --- /dev/null +++ b/test/e2e/metadata_query.test.ts @@ -0,0 +1,92 @@ +import { expect } from "chai" +import { Connection } from "../../src" +import { createConnection, createStreamName } from "../support/fake_data" +import { Rabbit } from "../support/rabbit" +import { username, password } from "../support/util" +import { Broker } from "../../src/responses/metadata_response" + +describe("query metadata", () => { + let streamName: string + let nonExistingStreamName: string + const rabbit = new Rabbit(username, password) + let connection: Connection + const RABBIT_TESTING_NODES: Broker[] = [ + { + host: "localhost", + port: 5552, + reference: 0, + }, + { + host: "rabbitmq", + port: 5552, + reference: 0, + }, + ] + + beforeEach(async () => { + connection = await createConnection(username, password) + streamName = createStreamName() + nonExistingStreamName = createStreamName() + await rabbit.createStream(streamName) + }) + + afterEach(async () => { + try { + await connection.close() + await rabbit.deleteStream(streamName) + await rabbit.closeAllConnections() + await rabbit.deleteAllQueues({ match: /my-stream-/ }) + } catch (e) { + console.error("Error on metadata query test teardown", e) + } + }) + + it("query the metadata - the response gets parsed correctly and no exception is thrown", async () => { + await connection.queryMetadata({ streams: [streamName] }) + }) + + it("query the metadata - the server should return streamMetaData", async () => { + const [streamInfo] = await connection.queryMetadata({ streams: [streamName] }) + + expect(streamInfo).to.exist + expect(streamInfo.streamName).to.eql(streamName) + }) + + it("query the metadata - on a non-existing stream the leader or replicas should not be defined", async () => { + const [streamInfo] = await connection.queryMetadata({ streams: [nonExistingStreamName] }) + + expect(streamInfo.streamName).to.eql(nonExistingStreamName) + expect(streamInfo.leader).not.to.exist + expect(streamInfo.replicas).to.have.lengthOf(0) + }) + + it("querying the metadata - on an existing stream on a single node", async () => { + const [streamInfo] = await connection.queryMetadata({ streams: [streamName] }) + + expect(streamInfo.streamName).to.eql(streamName) + expect(streamInfo.responseCode).to.eql(1) + expect(streamInfo.leader).to.be.deep.oneOf(RABBIT_TESTING_NODES) + expect(streamInfo.replicas).to.have.lengthOf(0) + }) + + it("querying the metadata - query for multiple streams", async () => { + const secondStreamName = createStreamName() + await rabbit.createStream(secondStreamName) + + const res = await connection.queryMetadata({ streams: [streamName, secondStreamName] }) + await rabbit.deleteStream(secondStreamName) + + const firstStreamInfo = res.find((i) => i.streamName === streamName) + const secondStreamInfo = res.find((i) => i.streamName === secondStreamName) + expect(firstStreamInfo).to.exist + expect(firstStreamInfo!.streamName).to.eql(streamName) + expect(firstStreamInfo!.responseCode).to.eql(1) + expect(firstStreamInfo!.leader).to.be.deep.oneOf(RABBIT_TESTING_NODES) + expect(firstStreamInfo!.replicas).to.have.lengthOf(0) + expect(secondStreamInfo).to.exist + expect(secondStreamInfo!.streamName).to.eql(secondStreamName) + expect(secondStreamInfo!.responseCode).to.eql(1) + expect(secondStreamInfo!.leader).to.be.deep.oneOf(RABBIT_TESTING_NODES) + expect(secondStreamInfo!.replicas).to.have.lengthOf(0) + }) +})