From e32b99e9201c8e0ee6152e17ee63b1734a74b1d8 Mon Sep 17 00:00:00 2001 From: Andrea Giulianelli Date: Sat, 20 May 2023 12:33:47 +0200 Subject: [PATCH] feat: implement kafka events consumption --- .../infrastructure/events/KafkaClient.kt | 43 ++++++++++++++++++- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/infrastructure/events/KafkaClient.kt b/src/main/kotlin/infrastructure/events/KafkaClient.kt index e8d4326..eb4d4bb 100644 --- a/src/main/kotlin/infrastructure/events/KafkaClient.kt +++ b/src/main/kotlin/infrastructure/events/KafkaClient.kt @@ -8,16 +8,42 @@ package infrastructure.events +import application.handler.EventHandler +import application.handler.EventHandlers +import application.presenter.event.serialization.EventSerialization.toEvent +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer +import usecase.repository.HealthProfessionalRepository +import usecase.repository.HealthcareUserRepository +import usecase.repository.RoomRepository +import usecase.repository.SurgeryReportRepository import java.time.Duration +import java.time.format.DateTimeParseException /** * This class manage the Kafka client needed to consume events. + * It takes the [surgeryReportRepository], the [healthProfessionalRepository], the [roomRepository] + * and the [healthcareUserRepository]. */ -class KafkaClient { +class KafkaClient( + private val surgeryReportRepository: SurgeryReportRepository, + private val healthProfessionalRepository: HealthProfessionalRepository, + private val roomRepository: RoomRepository, + private val healthcareUserRepository: HealthcareUserRepository, +) { + private val eventHandlers: List + init { checkNotNull(System.getenv(BOOTSTRAP_SERVER_URL_VARIABLE)) { "kafka bootstrap server url required" } checkNotNull(System.getenv(SCHEMA_REGISTRY_URL_VARIABLE)) { "kafka schema registry url required" } + this.eventHandlers = listOf( + EventHandlers.SurgicalProcessSummaryEventHandler( + this.surgeryReportRepository, + this.healthProfessionalRepository, + this.roomRepository, + this.healthcareUserRepository, + ), + ) } private val kafkaConsumer = KafkaConsumer( @@ -34,12 +60,25 @@ class KafkaClient { this.kafkaConsumer.subscribe(listOf(PROCESS_SUMMARY_EVENTS_TOPIC)).run { while (true) { kafkaConsumer.poll(Duration.ofMillis(POLLING_TIME)).forEach { event -> - println("${event.key()} - ${event.value()}") + try { + consumeEvent(event) + } catch (e: IllegalArgumentException) { + println("INFO: Event discarded! - $e") + } catch (e: DateTimeParseException) { + println("ERROR: Invalid Date in event. Event discarded! - $e") + } } } } } + private fun consumeEvent(event: ConsumerRecord) { + val deserializedEvent = event.value().toEvent(event.key()) + this.eventHandlers + .filter { it.canHandle(deserializedEvent) } + .forEach { it.consume(deserializedEvent) } + } + companion object { private const val BOOTSTRAP_SERVER_URL_VARIABLE = "BOOTSTRAP_SERVER_URL" private const val SCHEMA_REGISTRY_URL_VARIABLE = "SCHEMA_REGISTRY_URL"