Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: publishing in batch and compression on superstream #168

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 52 additions & 16 deletions src/super_stream_publisher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Client, RoutingStrategy } from "./client"
import { CompressionType } from "./compression"
import { murmur32 } from "./hash/murmur32"
import { MessageOptions, Publisher } from "./publisher"
import { Message, MessageOptions, Publisher } from "./publisher"
import { bigIntMax } from "./util"

export type MessageKeyExtractorFunction = (content: string, opts: MessageOptions) => string | undefined
Expand All @@ -21,6 +22,7 @@ export class SuperStreamPublisher {
private publisherRef: string | undefined
private keyExtractor: MessageKeyExtractorFunction
private routingStrategy: RoutingStrategy
private routingCache: Map<string, string> = new Map()

private constructor(params: SuperStreamPublisherParams) {
this.locator = params.locator
Expand Down Expand Up @@ -57,6 +59,36 @@ export class SuperStreamPublisher {
return publisher.basicSend(publishingId, message, opts)
}

public async sendSubEntries(messages: Message[], compressionType: CompressionType = CompressionType.None) {
// route all messages
const messagesByPartition: Map<string, Message[]> = new Map()
await Promise.all(
messages.map(async (m) => {
const partition = await this.routeMessage(m.content, m)
let msgs = messagesByPartition.get(partition)
if (!msgs) {
msgs = []
messagesByPartition.set(partition, msgs)
}
msgs.push(m)
})
)

// init all publishers, in sequence in order to avoid instantiating two publishers for the same node
const partitions = [...messagesByPartition.keys()]
for (const p of partitions) {
await this.getPublisher(p)
}

// send all messages in parallel
await Promise.all(
partitions.map(async (p) => {
const pub = await this.getPublisher(p)
return pub.sendSubEntries(messagesByPartition.get(p) ?? [], compressionType)
})
)
}

public async getLastPublishingId(): Promise<bigint> {
return bigIntMax(await Promise.all([...this.publishers.values()].map((p) => p.getLastPublishingId()))) ?? 0n
}
Expand All @@ -66,23 +98,27 @@ export class SuperStreamPublisher {
if (!routingKey) {
throw new Error(`Routing key is empty or undefined with the provided extractor`)
}
if (this.routingStrategy === "hash") {
const hash = murmur32(routingKey)
const partitionIndex = hash % this.partitions.length
return this.partitions[partitionIndex]!
} else {
const targetPartitions = await this.locator.routeQuery({ routingKey, superStream: this.superStream })
if (!targetPartitions.length) {
throw new Error(`The server did not return any partition for routing key: ${routingKey}`)
}
for (const tp of targetPartitions) {
const foundPartition = this.partitions.find((p) => p === tp)
if (foundPartition) return foundPartition
let partition = this.routingCache.get(routingKey)
if (!partition) {
if (this.routingStrategy === "hash") {
const hash = murmur32(routingKey)
const partitionIndex = hash % this.partitions.length
partition = this.partitions[partitionIndex]!
} else {
const targetPartitions = await this.locator.routeQuery({ routingKey, superStream: this.superStream })
if (!targetPartitions.length) {
throw new Error(`The server did not return any partition for routing key: ${routingKey}`)
}
partition = targetPartitions.find((tp) => this.partitions.find((p) => p === tp))
if (!partition) {
throw new Error(
`Key routing strategy failed: server returned partitions ${targetPartitions} but no match was found`
)
}
}
throw new Error(
`Key routing strategy failed: server returned partitions ${targetPartitions} but no match was found`
)
}
this.routingCache.set(routingKey, partition)
return partition
}

private async getPublisher(partition: string): Promise<Publisher> {
Expand Down
87 changes: 87 additions & 0 deletions test/e2e/superstream_publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { Message, MessageOptions } from "../../src/publisher"
import { createClient, createStreamName } from "../support/fake_data"
import { Rabbit } from "../support/rabbit"
import { eventually, expectToThrowAsync, password, username, wait } from "../support/util"
import { CompressionType } from "../../src/compression"
import { range } from "../../src/util"

describe("super stream publisher", () => {
let superStreamName: string
Expand Down Expand Up @@ -246,4 +248,89 @@ describe("super stream publisher", () => {
/The server did not return any partition for routing key/
)
})

it("publishing a batch of messages without compression - should not raise error", async () => {
const messageContents = range(5).map((_, i) => `Hello world ${i}`)
const messages = messageContents.map((m, i) => ({
content: Buffer.from(m),
messageProperties: { messageId: `${i}` },
}))
const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => {
return opts.messageProperties?.messageId ?? "-1"
})

await publisher.sendSubEntries(messages, CompressionType.None)
})

it("publishing a batch of messages without compression - receive the same number of messages", async () => {
const receivedMessages: Message[] = []
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (m: Message) => receivedMessages.push(m))
const messageContents = range(5).map((_, i) => `Hello world ${i}`)
const messages = messageContents.map((m, i) => ({
content: Buffer.from(m),
messageProperties: { messageId: `${i}` },
}))
const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => {
return opts.messageProperties?.messageId ?? "-1"
})

await publisher.sendSubEntries(messages, CompressionType.None)

await eventually(async () => {
expect(receivedMessages.length).eql(messages.length)
}, 5000)
})

it("publishing a batch of messages with compression - should not raise error", async () => {
const messageContents = range(5).map((_, i) => `Hello world ${i}`)
const messages = messageContents.map((m, i) => ({
content: Buffer.from(m),
messageProperties: { messageId: `${i}` },
}))
const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => {
return opts.messageProperties?.messageId ?? "-1"
})

await publisher.sendSubEntries(messages, CompressionType.Gzip)
})

it("publishing a batch of messages with compression - receive the same number of messages", async () => {
const receivedMessages: Message[] = []
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (m: Message) => receivedMessages.push(m))
const messageContents = range(5).map((_, i) => `Hello world ${i}`)
const messages = messageContents.map((m, i) => ({
content: Buffer.from(m),
messageProperties: { messageId: `${i}` },
}))
const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => {
return opts.messageProperties?.messageId ?? "-1"
})

await publisher.sendSubEntries(messages, CompressionType.Gzip)

await eventually(async () => {
expect(receivedMessages.length).eql(messages.length)
}, 5000)
})

it("publishing a batch of messages with compression - content is readable", async () => {
const receivedMessages: Message[] = []
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (m: Message) => receivedMessages.push(m))
const messageContents = range(5).map((_, i) => `Hello world ${i}`)
const messages = messageContents.map((m, i) => ({
content: Buffer.from(m),
messageProperties: { messageId: `${i}` },
}))
const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => {
return opts.messageProperties?.messageId ?? "-1"
})

await publisher.sendSubEntries(messages, CompressionType.Gzip)

await eventually(async () => {
for (const rm of receivedMessages) {
expect(rm.content.toString()).to.match(/Hello world \d/)
}
}, 5000)
})
})
Loading