diff --git a/kafka-server/src/main/java/com/ozangunalp/kafka/server/EmbeddedKafkaBroker.java b/kafka-server/src/main/java/com/ozangunalp/kafka/server/EmbeddedKafkaBroker.java index e77690d..8fcebf6 100644 --- a/kafka-server/src/main/java/com/ozangunalp/kafka/server/EmbeddedKafkaBroker.java +++ b/kafka-server/src/main/java/com/ozangunalp/kafka/server/EmbeddedKafkaBroker.java @@ -186,7 +186,7 @@ public synchronized EmbeddedKafkaBroker start() { server = new KafkaServer(config, Time.SYSTEM, Option.apply(KAFKA_PREFIX), false); } else { Storage.formatStorageFromConfig(config, clusterId, true); - server = new KafkaRaftServer(config, Time.SYSTEM, Option.apply(KAFKA_PREFIX)); + server = new KafkaRaftServer(config, Time.SYSTEM); } server.startup(); this.kafkaServer = server; diff --git a/pom.xml b/pom.xml index f357d31..ca8c11d 100644 --- a/pom.xml +++ b/pom.xml @@ -72,7 +72,7 @@ io.quarkus.platform 2.16.5.Final - 3.4.0 + 3.5.0 3.8.1 2.13.10 @@ -118,6 +118,11 @@ kafka-server-common ${kafka.version} + + org.apache.kafka + kafka-storage + ${kafka.version} + org.apache.kafka kafka-clients diff --git a/quarkus-kafka-server-extension/runtime/pom.xml b/quarkus-kafka-server-extension/runtime/pom.xml index faabf9c..0fdc522 100644 --- a/quarkus-kafka-server-extension/runtime/pom.xml +++ b/quarkus-kafka-server-extension/runtime/pom.xml @@ -24,6 +24,10 @@ org.apache.kafka kafka-server-common + + org.apache.kafka + kafka-storage + org.apache.kafka kafka_2.13 diff --git a/quarkus-kafka-server-extension/runtime/src/main/java/com/ozangunalp/kafka/server/extension/runtime/KafkaServerSubstitutions.java b/quarkus-kafka-server-extension/runtime/src/main/java/com/ozangunalp/kafka/server/extension/runtime/KafkaServerSubstitutions.java index e338ca5..ca9cb84 100644 --- a/quarkus-kafka-server-extension/runtime/src/main/java/com/ozangunalp/kafka/server/extension/runtime/KafkaServerSubstitutions.java +++ b/quarkus-kafka-server-extension/runtime/src/main/java/com/ozangunalp/kafka/server/extension/runtime/KafkaServerSubstitutions.java @@ -22,9 +22,9 @@ import com.oracle.svm.core.annotate.InjectAccessors; import com.oracle.svm.core.annotate.Substitute; import com.oracle.svm.core.annotate.TargetClass; -import kafka.log.OffsetIndex; -import kafka.log.TimeIndex; -import kafka.log.TransactionIndex; +import org.apache.kafka.storage.internals.log.OffsetIndex; +import org.apache.kafka.storage.internals.log.TimeIndex; +import org.apache.kafka.storage.internals.log.TransactionIndex; @TargetClass(value = AppInfoParser.class) final class RemoveJMXAccess { @@ -85,28 +85,24 @@ static InetSocketAddress get() { } } -@TargetClass(className = "kafka.log.LogConfig") +@TargetClass(className = "org.apache.kafka.storage.internals.log.LogConfig") final class Target_LogConfig { + @Alias + public long segmentMs; + + @Alias + public long segmentJitterMs; + @Substitute public long randomSegmentJitter() { - if (segmentJitterMs() == 0) { + if (segmentJitterMs == 0) { return 0; } else { - return Utils.abs(new Random().nextInt()) % Math.min(segmentJitterMs(), segmentMs()); + return Utils.abs(new Random().nextInt()) % Math.min(segmentJitterMs, segmentMs); } } - @Alias - public Long segmentJitterMs() { - return null; - } - - @Alias - public Long segmentMs() { - return null; - } - } @TargetClass(className = "kafka.log.LogFlushStats$")