diff --git a/src/super_stream_publisher.ts b/src/super_stream_publisher.ts index 51a7d60..7b717e5 100644 --- a/src/super_stream_publisher.ts +++ b/src/super_stream_publisher.ts @@ -1,6 +1,7 @@ import { Client, RoutingStrategy } from "./client" +import { CompressionType } from "./compression" import { murmur32 } from "./hash/murmur32" -import { MessageOptions, Publisher } from "./publisher" +import { Message, MessageOptions, Publisher } from "./publisher" import { bigIntMax } from "./util" export type MessageKeyExtractorFunction = (content: string, opts: MessageOptions) => string | undefined @@ -21,6 +22,7 @@ export class SuperStreamPublisher { private publisherRef: string | undefined private keyExtractor: MessageKeyExtractorFunction private routingStrategy: RoutingStrategy + private routingCache: Map = new Map() private constructor(params: SuperStreamPublisherParams) { this.locator = params.locator @@ -57,6 +59,36 @@ export class SuperStreamPublisher { return publisher.basicSend(publishingId, message, opts) } + public async sendSubEntries(messages: Message[], compressionType: CompressionType = CompressionType.None) { + // route all messages + const messagesByPartition: Map = new Map() + await Promise.all( + messages.map(async (m) => { + const partition = await this.routeMessage(m.content, m) + let msgs = messagesByPartition.get(partition) + if (!msgs) { + msgs = [] + messagesByPartition.set(partition, msgs) + } + msgs.push(m) + }) + ) + + // init all publishers, in sequence in order to avoid instantiating two publishers for the same node + const partitions = [...messagesByPartition.keys()] + for (const p of partitions) { + await this.getPublisher(p) + } + + // send all messages in parallel + await Promise.all( + partitions.map(async (p) => { + const pub = await this.getPublisher(p) + return pub.sendSubEntries(messagesByPartition.get(p) ?? [], compressionType) + }) + ) + } + public async getLastPublishingId(): Promise { return bigIntMax(await Promise.all([...this.publishers.values()].map((p) => p.getLastPublishingId()))) ?? 0n } @@ -66,23 +98,27 @@ export class SuperStreamPublisher { if (!routingKey) { throw new Error(`Routing key is empty or undefined with the provided extractor`) } - if (this.routingStrategy === "hash") { - const hash = murmur32(routingKey) - const partitionIndex = hash % this.partitions.length - return this.partitions[partitionIndex]! - } else { - const targetPartitions = await this.locator.routeQuery({ routingKey, superStream: this.superStream }) - if (!targetPartitions.length) { - throw new Error(`The server did not return any partition for routing key: ${routingKey}`) - } - for (const tp of targetPartitions) { - const foundPartition = this.partitions.find((p) => p === tp) - if (foundPartition) return foundPartition + let partition = this.routingCache.get(routingKey) + if (!partition) { + if (this.routingStrategy === "hash") { + const hash = murmur32(routingKey) + const partitionIndex = hash % this.partitions.length + partition = this.partitions[partitionIndex]! + } else { + const targetPartitions = await this.locator.routeQuery({ routingKey, superStream: this.superStream }) + if (!targetPartitions.length) { + throw new Error(`The server did not return any partition for routing key: ${routingKey}`) + } + partition = targetPartitions.find((tp) => this.partitions.find((p) => p === tp)) + if (!partition) { + throw new Error( + `Key routing strategy failed: server returned partitions ${targetPartitions} but no match was found` + ) + } } - throw new Error( - `Key routing strategy failed: server returned partitions ${targetPartitions} but no match was found` - ) } + this.routingCache.set(routingKey, partition) + return partition } private async getPublisher(partition: string): Promise { diff --git a/test/e2e/superstream_publisher.test.ts b/test/e2e/superstream_publisher.test.ts index d9ae54d..5d83e30 100644 --- a/test/e2e/superstream_publisher.test.ts +++ b/test/e2e/superstream_publisher.test.ts @@ -5,6 +5,8 @@ import { Message, MessageOptions } from "../../src/publisher" import { createClient, createStreamName } from "../support/fake_data" import { Rabbit } from "../support/rabbit" import { eventually, expectToThrowAsync, password, username, wait } from "../support/util" +import { CompressionType } from "../../src/compression" +import { range } from "../../src/util" describe("super stream publisher", () => { let superStreamName: string @@ -246,4 +248,89 @@ describe("super stream publisher", () => { /The server did not return any partition for routing key/ ) }) + + it("publishing a batch of messages without compression - should not raise error", async () => { + const messageContents = range(5).map((_, i) => `Hello world ${i}`) + const messages = messageContents.map((m, i) => ({ + content: Buffer.from(m), + messageProperties: { messageId: `${i}` }, + })) + const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => { + return opts.messageProperties?.messageId ?? "-1" + }) + + await publisher.sendSubEntries(messages, CompressionType.None) + }) + + it("publishing a batch of messages without compression - receive the same number of messages", async () => { + const receivedMessages: Message[] = [] + await client.declareSuperStreamConsumer({ superStream: superStreamName }, (m: Message) => receivedMessages.push(m)) + const messageContents = range(5).map((_, i) => `Hello world ${i}`) + const messages = messageContents.map((m, i) => ({ + content: Buffer.from(m), + messageProperties: { messageId: `${i}` }, + })) + const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => { + return opts.messageProperties?.messageId ?? "-1" + }) + + await publisher.sendSubEntries(messages, CompressionType.None) + + await eventually(async () => { + expect(receivedMessages.length).eql(messages.length) + }, 5000) + }) + + it("publishing a batch of messages with compression - should not raise error", async () => { + const messageContents = range(5).map((_, i) => `Hello world ${i}`) + const messages = messageContents.map((m, i) => ({ + content: Buffer.from(m), + messageProperties: { messageId: `${i}` }, + })) + const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => { + return opts.messageProperties?.messageId ?? "-1" + }) + + await publisher.sendSubEntries(messages, CompressionType.Gzip) + }) + + it("publishing a batch of messages with compression - receive the same number of messages", async () => { + const receivedMessages: Message[] = [] + await client.declareSuperStreamConsumer({ superStream: superStreamName }, (m: Message) => receivedMessages.push(m)) + const messageContents = range(5).map((_, i) => `Hello world ${i}`) + const messages = messageContents.map((m, i) => ({ + content: Buffer.from(m), + messageProperties: { messageId: `${i}` }, + })) + const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => { + return opts.messageProperties?.messageId ?? "-1" + }) + + await publisher.sendSubEntries(messages, CompressionType.Gzip) + + await eventually(async () => { + expect(receivedMessages.length).eql(messages.length) + }, 5000) + }) + + it("publishing a batch of messages with compression - content is readable", async () => { + const receivedMessages: Message[] = [] + await client.declareSuperStreamConsumer({ superStream: superStreamName }, (m: Message) => receivedMessages.push(m)) + const messageContents = range(5).map((_, i) => `Hello world ${i}`) + const messages = messageContents.map((m, i) => ({ + content: Buffer.from(m), + messageProperties: { messageId: `${i}` }, + })) + const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => { + return opts.messageProperties?.messageId ?? "-1" + }) + + await publisher.sendSubEntries(messages, CompressionType.Gzip) + + await eventually(async () => { + for (const rm of receivedMessages) { + expect(rm.content.toString()).to.match(/Hello world \d/) + } + }, 5000) + }) })