From 42c8e9e1626690685c7fc693778e676f0f746f97 Mon Sep 17 00:00:00 2001 From: Andrea Giulianelli Date: Fri, 19 May 2023 19:26:44 +0200 Subject: [PATCH] chore: add base code for kafka client --- .../infrastructure/events/KafkaClient.kt | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 src/main/kotlin/infrastructure/events/KafkaClient.kt diff --git a/src/main/kotlin/infrastructure/events/KafkaClient.kt b/src/main/kotlin/infrastructure/events/KafkaClient.kt new file mode 100644 index 0000000..e8d4326 --- /dev/null +++ b/src/main/kotlin/infrastructure/events/KafkaClient.kt @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2023. Smart Operating Block + * + * Use of this source code is governed by an MIT-style + * license that can be found in the LICENSE file or at + * https://opensource.org/licenses/MIT. + */ + +package infrastructure.events + +import org.apache.kafka.clients.consumer.KafkaConsumer +import java.time.Duration + +/** + * This class manage the Kafka client needed to consume events. + */ +class KafkaClient { + init { + checkNotNull(System.getenv(BOOTSTRAP_SERVER_URL_VARIABLE)) { "kafka bootstrap server url required" } + checkNotNull(System.getenv(SCHEMA_REGISTRY_URL_VARIABLE)) { "kafka schema registry url required" } + } + + private val kafkaConsumer = KafkaConsumer( + loadConsumerProperties( + System.getenv(BOOTSTRAP_SERVER_URL_VARIABLE), + System.getenv(SCHEMA_REGISTRY_URL_VARIABLE), + ), + ) + + /** + * Start consuming event on Kafka. + */ + fun start() { + this.kafkaConsumer.subscribe(listOf(PROCESS_SUMMARY_EVENTS_TOPIC)).run { + while (true) { + kafkaConsumer.poll(Duration.ofMillis(POLLING_TIME)).forEach { event -> + println("${event.key()} - ${event.value()}") + } + } + } + } + + companion object { + private const val BOOTSTRAP_SERVER_URL_VARIABLE = "BOOTSTRAP_SERVER_URL" + private const val SCHEMA_REGISTRY_URL_VARIABLE = "SCHEMA_REGISTRY_URL" + private const val PROCESS_SUMMARY_EVENTS_TOPIC = "process-summary-events" + private const val POLLING_TIME = 100L + } +}