Skip to content

Commit

Permalink
Kafka connector native image updates
Browse files Browse the repository at this point in the history
  • Loading branch information
danielkec committed Oct 4, 2023
1 parent e02fdb4 commit 75de77b
Show file tree
Hide file tree
Showing 14 changed files with 61 additions and 465 deletions.
11 changes: 1 addition & 10 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
<version.lib.jersey>3.1.3</version.lib.jersey>
<version.lib.jgit>6.7.0.202309050840-r</version.lib.jgit>
<version.lib.junit>5.9.3</version.lib.junit>
<version.lib.kafka>3.4.0</version.lib.kafka>
<version.lib.kafka>3.5.1</version.lib.kafka>
<version.lib.kotlin>1.8.0</version.lib.kotlin>
<version.lib.log4j>2.18.0</version.lib.log4j>
<version.lib.logback>1.4.0</version.lib.logback>
Expand Down Expand Up @@ -944,15 +944,6 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.lib.kafka}</version>
<!-- Snip transitive dependency on Snappy (which should be optional anyways). -->
<!-- This can be removed once kafka-clients is upgraded -->
<!-- to 3.4.2 or newer. See https://issues.apache.org/jira/browse/KAFKA-15096 -->
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<!-- required transitively by okhttp (used in OpenTelemetry through Jaeger) -->
Expand Down
6 changes: 3 additions & 3 deletions examples/messaging/docker/kafka/Dockerfile.kafka
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2019, 2021 Oracle and/or its affiliates.
# Copyright (c) 2019, 2023 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,12 +14,12 @@
# limitations under the License.
#

FROM openjdk:17-jdk-slim-buster
FROM container-registry.oracle.com/java/openjdk:21

ENV VERSION=2.7.0
ENV SCALA_VERSION=2.13

RUN apt-get -qq update && apt-get -qq -y install bash curl wget netcat jq
RUN dnf update && dnf -y install wget jq nc

RUN REL_PATH=kafka/${VERSION}/kafka_${SCALA_VERSION}-${VERSION}.tgz \
&& BACKUP_ARCHIVE=https://archive.apache.org/dist/ \
Expand Down
34 changes: 32 additions & 2 deletions examples/messaging/docker/kafka/init_topics.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
#
# Copyright (c) 2020, 2021 Oracle and/or its affiliates.
# Copyright (c) 2020, 2023 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,9 +40,39 @@ while sleep 2; do
--replication-factor 1 \
--partitions 10 \
--topic messaging-test-topic-2
bash $KAFKA_TOPICS \
--create \
--replication-factor 1 \
--partitions 10 \
--config compression.type=snappy \
--topic messaging-test-topic-snappy-compressed
bash $KAFKA_TOPICS \
--create \
--replication-factor 1 \
--partitions 10 \
--config compression.type=lz4 \
--topic messaging-test-topic-lz4-compressed
bash $KAFKA_TOPICS \
--create \
--replication-factor 1 \
--partitions 10 \
--config compression.type=zstd \
--topic messaging-test-topic-zstd-compressed
bash $KAFKA_TOPICS \
--create \
--replication-factor 1 \
--partitions 10 \
--config compression.type=gzip \
--topic messaging-test-topic-gzip-compressed

echo
echo "Example topics messaging-test-topic-1 and messaging-test-topic-2 created"
echo "Example topics created:"
echo " messaging-test-topic-1"
echo " messaging-test-topic-2"
echo " messaging-test-topic-snappy-compressed"
echo " messaging-test-topic-lz4-compressed"
echo " messaging-test-topic-zstd-compressed"
echo " messaging-test-topic-gzip-compressed"
echo
echo "================== Kafka is ready, stop it with Ctrl+C =================="
exit 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ class SendingService implements HttpService {

String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
String compression = config.get("app.kafka.compression").asString().orElse("none");

// Prepare channel for connecting processor -> kafka connector with specific subscriber configuration,
// channel -> connector mapping is automatic when using KafkaConnector.configBuilder()
Channel<String> toKafka = Channel.<String>builder()
.subscriberConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.topic(topic)
.compressionType(compression)
.keySerializer(StringSerializer.class)
.valueSerializer(StringSerializer.class)
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void onOpen(WsSession session) {

String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
String compression = config.get("app.kafka.compression").asString().orElse("none");

// Prepare channel for connecting kafka connector with specific publisher configuration -> listener,
// channel -> connector mapping is automatic when using KafkaConnector.configBuilder()
Expand All @@ -60,6 +61,7 @@ public void onOpen(WsSession session) {
.enableAutoCommit(true)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.compressionType(compression)
.build()
)
.build();
Expand All @@ -72,7 +74,7 @@ public void onOpen(WsSession session) {
.listener(fromKafka, payload -> {
System.out.println("Kafka says: " + payload);
// Send message received from Kafka over websocket
session.send(payload, false);
session.send(payload, true);
})
.build()
.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2020 Oracle and/or its affiliates.
# Copyright (c) 2020, 2023 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,11 @@
app:
kafka:
bootstrap.servers: localhost:9092
topic: messaging-test-topic-1
compression: snappy
# compression: lz4
# compression: zstd
# compression: gzip
topic: messaging-test-topic-${app.kafka.compression}-compressed

server:
port: 7001
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 75de77b

Please sign in to comment.