Skip to content

Commit

Permalink
feat: publishing in batch and compression on superstream (#168)
Browse files Browse the repository at this point in the history
* feat: publishing in batch and compression on superstream

---------

Co-authored-by: Luca <lmenghini@coders51.com>
  • Loading branch information
tarzacodes and tarzacodes committed Jan 24, 2024
1 parent da44340 commit 0e98d78
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 16 deletions.
68 changes: 52 additions & 16 deletions src/super_stream_publisher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Client, RoutingStrategy } from "./client"
import { CompressionType } from "./compression"
import { murmur32 } from "./hash/murmur32"
import { MessageOptions, Publisher } from "./publisher"
import { Message, MessageOptions, Publisher } from "./publisher"
import { bigIntMax } from "./util"

export type MessageKeyExtractorFunction = (content: string, opts: MessageOptions) => string | undefined
Expand All @@ -21,6 +22,7 @@ export class SuperStreamPublisher {
private publisherRef: string | undefined
private keyExtractor: MessageKeyExtractorFunction
private routingStrategy: RoutingStrategy
private routingCache: Map<string, string> = new Map()

private constructor(params: SuperStreamPublisherParams) {
this.locator = params.locator
Expand Down Expand Up @@ -57,6 +59,36 @@ export class SuperStreamPublisher {
return publisher.basicSend(publishingId, message, opts)
}

public async sendSubEntries(messages: Message[], compressionType: CompressionType = CompressionType.None) {
// route all messages
const messagesByPartition: Map<string, Message[]> = new Map()
await Promise.all(
messages.map(async (m) => {
const partition = await this.routeMessage(m.content, m)
let msgs = messagesByPartition.get(partition)
if (!msgs) {
msgs = []
messagesByPartition.set(partition, msgs)
}
msgs.push(m)
})
)

// init all publishers, in sequence in order to avoid instantiating two publishers for the same node
const partitions = [...messagesByPartition.keys()]
for (const p of partitions) {
await this.getPublisher(p)
}

// send all messages in parallel
await Promise.all(
partitions.map(async (p) => {
const pub = await this.getPublisher(p)
return pub.sendSubEntries(messagesByPartition.get(p) ?? [], compressionType)
})
)
}

public async getLastPublishingId(): Promise<bigint> {
return bigIntMax(await Promise.all([...this.publishers.values()].map((p) => p.getLastPublishingId()))) ?? 0n
}
Expand All @@ -66,23 +98,27 @@ export class SuperStreamPublisher {
if (!routingKey) {
throw new Error(`Routing key is empty or undefined with the provided extractor`)
}
if (this.routingStrategy === "hash") {
const hash = murmur32(routingKey)
const partitionIndex = hash % this.partitions.length
return this.partitions[partitionIndex]!
} else {
const targetPartitions = await this.locator.routeQuery({ routingKey, superStream: this.superStream })
if (!targetPartitions.length) {
throw new Error(`The server did not return any partition for routing key: ${routingKey}`)
}
for (const tp of targetPartitions) {
const foundPartition = this.partitions.find((p) => p === tp)
if (foundPartition) return foundPartition
let partition = this.routingCache.get(routingKey)
if (!partition) {
if (this.routingStrategy === "hash") {
const hash = murmur32(routingKey)
const partitionIndex = hash % this.partitions.length
partition = this.partitions[partitionIndex]!
} else {
const targetPartitions = await this.locator.routeQuery({ routingKey, superStream: this.superStream })
if (!targetPartitions.length) {
throw new Error(`The server did not return any partition for routing key: ${routingKey}`)
}
partition = targetPartitions.find((tp) => this.partitions.find((p) => p === tp))
if (!partition) {
throw new Error(
`Key routing strategy failed: server returned partitions ${targetPartitions} but no match was found`
)
}
}
throw new Error(
`Key routing strategy failed: server returned partitions ${targetPartitions} but no match was found`
)
}
this.routingCache.set(routingKey, partition)
return partition
}

private async getPublisher(partition: string): Promise<Publisher> {
Expand Down
87 changes: 87 additions & 0 deletions test/e2e/superstream_publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { Message, MessageOptions } from "../../src/publisher"
import { createClient, createStreamName } from "../support/fake_data"
import { Rabbit } from "../support/rabbit"
import { eventually, expectToThrowAsync, password, username, wait } from "../support/util"
import { CompressionType } from "../../src/compression"
import { range } from "../../src/util"

describe("super stream publisher", () => {
let superStreamName: string
Expand Down Expand Up @@ -246,4 +248,89 @@ describe("super stream publisher", () => {
/The server did not return any partition for routing key/
)
})

it("publishing a batch of messages without compression - should not raise error", async () => {
const messageContents = range(5).map((_, i) => `Hello world ${i}`)
const messages = messageContents.map((m, i) => ({
content: Buffer.from(m),
messageProperties: { messageId: `${i}` },
}))
const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => {
return opts.messageProperties?.messageId ?? "-1"
})

await publisher.sendSubEntries(messages, CompressionType.None)
})

it("publishing a batch of messages without compression - receive the same number of messages", async () => {
const receivedMessages: Message[] = []
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (m: Message) => receivedMessages.push(m))
const messageContents = range(5).map((_, i) => `Hello world ${i}`)
const messages = messageContents.map((m, i) => ({
content: Buffer.from(m),
messageProperties: { messageId: `${i}` },
}))
const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => {
return opts.messageProperties?.messageId ?? "-1"
})

await publisher.sendSubEntries(messages, CompressionType.None)

await eventually(async () => {
expect(receivedMessages.length).eql(messages.length)
}, 5000)
})

it("publishing a batch of messages with compression - should not raise error", async () => {
const messageContents = range(5).map((_, i) => `Hello world ${i}`)
const messages = messageContents.map((m, i) => ({
content: Buffer.from(m),
messageProperties: { messageId: `${i}` },
}))
const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => {
return opts.messageProperties?.messageId ?? "-1"
})

await publisher.sendSubEntries(messages, CompressionType.Gzip)
})

it("publishing a batch of messages with compression - receive the same number of messages", async () => {
const receivedMessages: Message[] = []
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (m: Message) => receivedMessages.push(m))
const messageContents = range(5).map((_, i) => `Hello world ${i}`)
const messages = messageContents.map((m, i) => ({
content: Buffer.from(m),
messageProperties: { messageId: `${i}` },
}))
const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => {
return opts.messageProperties?.messageId ?? "-1"
})

await publisher.sendSubEntries(messages, CompressionType.Gzip)

await eventually(async () => {
expect(receivedMessages.length).eql(messages.length)
}, 5000)
})

it("publishing a batch of messages with compression - content is readable", async () => {
const receivedMessages: Message[] = []
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (m: Message) => receivedMessages.push(m))
const messageContents = range(5).map((_, i) => `Hello world ${i}`)
const messages = messageContents.map((m, i) => ({
content: Buffer.from(m),
messageProperties: { messageId: `${i}` },
}))
const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, (_, opts) => {
return opts.messageProperties?.messageId ?? "-1"
})

await publisher.sendSubEntries(messages, CompressionType.Gzip)

await eventually(async () => {
for (const rm of receivedMessages) {
expect(rm.content.toString()).to.match(/Hello world \d/)
}
}, 5000)
})
})

0 comments on commit 0e98d78

Please sign in to comment.