Skip to content

Commit

Permalink
feat: implement kafka events consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreaGiulianelli committed May 20, 2023
1 parent e511347 commit e32b99e
Showing 1 changed file with 41 additions and 2 deletions.
43 changes: 41 additions & 2 deletions src/main/kotlin/infrastructure/events/KafkaClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,42 @@

package infrastructure.events

import application.handler.EventHandler
import application.handler.EventHandlers
import application.presenter.event.serialization.EventSerialization.toEvent
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import usecase.repository.HealthProfessionalRepository
import usecase.repository.HealthcareUserRepository
import usecase.repository.RoomRepository
import usecase.repository.SurgeryReportRepository
import java.time.Duration
import java.time.format.DateTimeParseException

/**
* This class manage the Kafka client needed to consume events.
* It takes the [surgeryReportRepository], the [healthProfessionalRepository], the [roomRepository]
* and the [healthcareUserRepository].
*/
class KafkaClient {
class KafkaClient(
private val surgeryReportRepository: SurgeryReportRepository,
private val healthProfessionalRepository: HealthProfessionalRepository,
private val roomRepository: RoomRepository,
private val healthcareUserRepository: HealthcareUserRepository,
) {
private val eventHandlers: List<EventHandler>

init {
checkNotNull(System.getenv(BOOTSTRAP_SERVER_URL_VARIABLE)) { "kafka bootstrap server url required" }
checkNotNull(System.getenv(SCHEMA_REGISTRY_URL_VARIABLE)) { "kafka schema registry url required" }
this.eventHandlers = listOf(
EventHandlers.SurgicalProcessSummaryEventHandler(
this.surgeryReportRepository,
this.healthProfessionalRepository,
this.roomRepository,
this.healthcareUserRepository,
),
)
}

private val kafkaConsumer = KafkaConsumer<String, String>(
Expand All @@ -34,12 +60,25 @@ class KafkaClient {
this.kafkaConsumer.subscribe(listOf(PROCESS_SUMMARY_EVENTS_TOPIC)).run {
while (true) {
kafkaConsumer.poll(Duration.ofMillis(POLLING_TIME)).forEach { event ->
println("${event.key()} - ${event.value()}")
try {
consumeEvent(event)
} catch (e: IllegalArgumentException) {
println("INFO: Event discarded! - $e")
} catch (e: DateTimeParseException) {
println("ERROR: Invalid Date in event. Event discarded! - $e")
}
}
}
}
}

private fun consumeEvent(event: ConsumerRecord<String, String>) {
val deserializedEvent = event.value().toEvent(event.key())
this.eventHandlers
.filter { it.canHandle(deserializedEvent) }
.forEach { it.consume(deserializedEvent) }
}

companion object {
private const val BOOTSTRAP_SERVER_URL_VARIABLE = "BOOTSTRAP_SERVER_URL"
private const val SCHEMA_REGISTRY_URL_VARIABLE = "SCHEMA_REGISTRY_URL"
Expand Down

0 comments on commit e32b99e

Please sign in to comment.