Skip to content

Commit

Permalink
Merge pull request #195 from ozangunalp/dependabot/maven/main/kafka.v…
Browse files Browse the repository at this point in the history
…ersion-3.7.1

Bump kafka.version from 3.7.0 to 3.7.1
  • Loading branch information
ozangunalp committed Jul 3, 2024
2 parents be25624 + f05fecd commit ffe897c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.ozangunalp.kafka.server;

import static kafka.zk.KafkaZkClient.createZkClient;
import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;

import java.io.Closeable;
Expand All @@ -11,24 +10,19 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.zookeeper.client.ZKClientConfig;
import org.jboss.logging.Logger;

import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import kafka.server.KafkaServer;
import kafka.server.Server;
import kafka.zk.AdminZkClient;
import scala.Option;
import scala.jdk.javaapi.StreamConverters;

Expand Down Expand Up @@ -223,7 +217,7 @@ public synchronized EmbeddedKafkaBroker start() {
var scramParser = new ScramParser();
var parsedCredentials = scramCredentials.stream().map(scramParser::parseScram).toList();
if (zkMode) {
createScramUsersInZookeeper(parsedCredentials);
ZkUtils.createScramUsersInZookeeper(config, parsedCredentials);
server = new KafkaServer(config, Time.SYSTEM, Option.apply(KAFKA_PREFIX), false);
} else {
// Default the metadata version from the IBP version in the same way as kafka.tools.StorageTool.
Expand Down Expand Up @@ -290,22 +284,4 @@ public String getClusterId() {
return this.clusterId;
}


private void createScramUsersInZookeeper(List<UserScramCredentialRecord> parsedCredentials) {
if (!parsedCredentials.isEmpty()) {
ZKClientConfig zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config, false);
try (var zkClient = createZkClient("Kafka native", Time.SYSTEM, config, zkClientConfig)) {
var adminZkClient = new AdminZkClient(zkClient, Option.empty());
var userEntityType = "users";

parsedCredentials.forEach(uscr -> {
var userConfig = adminZkClient.fetchEntityConfig(userEntityType, uscr.name());
var credentialsString = ScramCredentialUtils.credentialToString(ScramUtils.asScramCredential(uscr));

userConfig.setProperty(ScramMechanism.fromType(uscr.mechanism()).mechanismName(), credentialsString);
adminZkClient.changeConfigs(userEntityType, uscr.name(), userConfig, false);
});
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.ozangunalp.kafka.server;

import java.util.List;

import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.client.ZKClientConfig;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import scala.Option;

public class ZkUtils {

public static void createScramUsersInZookeeper(KafkaConfig config, List<UserScramCredentialRecord> parsedCredentials) {
if (!parsedCredentials.isEmpty()) {
ZKClientConfig zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config, false);
try (var zkClient = createZkClient("Kafka native", Time.SYSTEM, config, zkClientConfig)) {
var adminZkClient = new AdminZkClient(zkClient, Option.empty());
var userEntityType = "users";

parsedCredentials.forEach(uscr -> {
var userConfig = adminZkClient.fetchEntityConfig(userEntityType, uscr.name());
var credentialsString = ScramCredentialUtils.credentialToString(ScramUtils.asScramCredential(uscr));

userConfig.setProperty(ScramMechanism.fromType(uscr.mechanism()).mechanismName(), credentialsString);
adminZkClient.changeConfigs(userEntityType, uscr.name(), userConfig, false);
});
}
}
}

private static Option<String> zooKeeperClientProperty(ZKClientConfig zkClientConfig, String property) {
return Option.apply(zkClientConfig.getProperty(property));
}

private static boolean zkTlsClientAuthEnabled(ZKClientConfig zkClientConfig) {
return zooKeeperClientProperty(zkClientConfig, KafkaConfig.ZkSslClientEnableProp()).contains("true") &&
zooKeeperClientProperty(zkClientConfig, KafkaConfig.ZkClientCnxnSocketProp()).isDefined() &&
zooKeeperClientProperty(zkClientConfig, KafkaConfig.ZkSslKeyStoreLocationProp()).isDefined();
}

private static KafkaZkClient createZkClient(String name, Time time, KafkaConfig config, ZKClientConfig zkClientConfig) {
var secureAclsEnabled = config.zkEnableSecureAcls();
var isZkSecurityEnabled = JaasUtils.isZkSaslEnabled() || zkTlsClientAuthEnabled(zkClientConfig);

if (secureAclsEnabled && !isZkSecurityEnabled)
throw new java.lang.SecurityException(
KafkaConfig.ZkEnableSecureAclsProp() + " is true, but ZooKeeper client TLS configuration identifying at least " +
KafkaConfig.ZkSslClientEnableProp() + ", " + KafkaConfig.ZkClientCnxnSocketProp() + ", and " +
KafkaConfig.ZkSslKeyStoreLocationProp() + " was not present and the verification of the JAAS login file failed " +
JaasUtils.zkSecuritySysConfigString());

return KafkaZkClient.apply(config.zkConnect(), secureAclsEnabled, config.zkSessionTimeoutMs(), config.zkConnectionTimeoutMs(),
config.zkMaxInFlightRequests(), time, name, zkClientConfig,
"kafka.server", "SessionExpireListener", false, false);
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.11.2</quarkus.platform.version>

<kafka.version>3.7.0</kafka.version>
<kafka.version>3.7.1</kafka.version>
<zookeeper.version>3.9.2</zookeeper.version>
<scala.version>2.13.14</scala.version>

Expand Down

0 comments on commit ffe897c

Please sign in to comment.