Skip to content

Commit

Permalink
Create Scram users in Zookeeper mode too
Browse files Browse the repository at this point in the history
  • Loading branch information
k-wall committed Jun 5, 2024
1 parent 3fd9fa9 commit a9abdd7
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,28 @@ void testZookeeperContainer() {
}
}

@Test
void testZookeeperContainerWithScram() {
try (ZookeeperNativeContainer zookeeper = new ZookeeperNativeContainer()
.withFollowOutput(c -> new ToFileConsumer(testOutputName, c))
.withNetwork(Network.SHARED)
.withNetworkAliases("zookeeper")) {
zookeeper.start();
try (var container = createKafkaNativeContainer()
.withNetwork(Network.SHARED)
.withArgs("-Dkafka.zookeeper.connect=zookeeper:2181")
.withEnv("SERVER_SCRAM_CREDENTIALS", "SCRAM-SHA-512=[name=client\\,password=client-secret]")
.withServerProperties(MountableFile.forClasspathResource("sasl_scram_plaintext.properties"))
.withAdvertisedListeners(c -> String.format("SASL_PLAINTEXT://%s:%s", c.getHost(), c.getExposedKafkaPort()))) {
container.start();
checkProduceConsume(container, Map.of(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT",
SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512",
SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"client\" password=\"client-secret\";"));
}
}
}

@Test
void testKraftClusterBothControllers() throws Exception {
String clusterId = Uuid.randomUuid().toString();
Expand Down
4 changes: 4 additions & 0 deletions kafka-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
package com.ozangunalp.kafka.server;

import static kafka.zk.KafkaZkClient.*;
import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_BOOTSTRAP_VERSION;

import java.io.Closeable;
import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import kafka.server.KafkaServer;
import kafka.server.Server;
import kafka.zk.AdminZkClient;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.zookeeper.client.ZKClientConfig;
import org.jboss.logging.Logger;

import kafka.cluster.EndPoint;
Expand Down Expand Up @@ -215,12 +219,16 @@ public synchronized EmbeddedKafkaBroker start() {
this.config = KafkaConfig.fromProps(brokerConfig, false);
var zkMode = brokerConfig.containsKey(KafkaConfig.ZkConnectProp());
Server server;

var scramParser = new ScramParser();
var parsedCredentials = scramCredentials.stream().map(scramParser::parseScram).toList();
if (zkMode) {
createScramUsersInZookeeper(parsedCredentials);
server = new KafkaServer(config, Time.SYSTEM, Option.apply(KAFKA_PREFIX), false);
} else {
// Default the metadata version from the IBP version in the same way as kafka.tools.StorageTool.
var metadataVersion = MetadataVersion.fromVersionString(brokerConfig.getProperty(KafkaConfig.InterBrokerProtocolVersionProp(), MetadataVersion.LATEST_PRODUCTION.version()));
Storage.formatStorageFromConfig(config, clusterId, true, metadataVersion, scramCredentials);
Storage.formatStorageFromConfig(config, clusterId, true, metadataVersion, parsedCredentials);
server = new KafkaRaftServer(config, Time.SYSTEM);
}
server.startup();
Expand Down Expand Up @@ -282,4 +290,22 @@ public String getClusterId() {
return this.clusterId;
}


private void createScramUsersInZookeeper(List<UserScramCredentialRecord> parsedCredentials) {
if (!parsedCredentials.isEmpty()) {
ZKClientConfig zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config, false);
try (var zkClient = createZkClient("Kafka native", Time.SYSTEM, config, zkClientConfig)) {
var adminZkClient = new AdminZkClient(zkClient, Option.empty());
var userEntityType = "users";

parsedCredentials.forEach(uscr -> {
var userConfig = adminZkClient.fetchEntityConfig(userEntityType, uscr.name());
var credentialsString = ScramCredentialUtils.credentialToString(ScramUtils.asScramCredential(uscr));

userConfig.setProperty(ScramMechanism.fromType(uscr.mechanism()).mechanismName(), credentialsString);
adminZkClient.changeConfigs(userEntityType, uscr.name(), userConfig, false);
});
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.ozangunalp.kafka.server;

import kafka.tools.StorageTool;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;

class ScramParser {
public UserScramCredentialRecord parseScram(String scramString) {
var nameValueRecord = scramString.split("=", 2);
if (nameValueRecord.length != 2 || nameValueRecord[0].isEmpty() || nameValueRecord[1].isEmpty()) {
throw new IllegalArgumentException("Expecting scram string in the form 'SCRAM-SHA-256=[name=alice,password=alice-secret]', found '%s'. See https://kafka.apache.org/documentation/#security_sasl_scram_credentials".formatted(scramString));
}
return switch (nameValueRecord[0]) {
case "SCRAM-SHA-256", "SCRAM-SHA-512" ->
StorageTool.getUserScramCredentialRecord(nameValueRecord[0], nameValueRecord[1]);
default ->
throw new IllegalArgumentException("The scram mechanism in '" + scramString + "' is not supported.");
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.ozangunalp.kafka.server;

import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.security.scram.ScramCredential;

final class ScramUtils {

private ScramUtils() {
throw new IllegalArgumentException();
}

static ScramCredential asScramCredential(UserScramCredentialRecord uscr) {
return new ScramCredential(uscr.salt(), uscr.storedKey(), uscr.serverKey(), uscr.iterations());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ public static void createAndSetLogDir(Properties properties) {
}
}

public static void formatStorageFromConfig(KafkaConfig config, String clusterId, boolean ignoreFormatted, MetadataVersion metadataVersion, List<String> scramCredentials) {
public static void formatStorageFromConfig(KafkaConfig config, String clusterId, boolean ignoreFormatted, MetadataVersion metadataVersion, List<UserScramCredentialRecord> scramCredentials) {
if (!scramCredentials.isEmpty() && !metadataVersion.isScramSupported()) {
throw new IllegalArgumentException("SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.");
}
Seq<String> directories = StorageTool.configToLogDirectories(config);
MetaProperties metaProperties = StorageTool.buildMetadataProperties(clusterId, config);
var metadataRecords = new ArrayList<ApiMessageAndVersion>();
metadataRecords.add(metadataVersionMessage(metadataVersion));
for (String scramCredential : scramCredentials) {
for (var scramCredential : scramCredentials) {
metadataRecords.add(scramMessage(scramCredential));
}
BootstrapMetadata bootstrapMetadata = BootstrapMetadata.fromRecords(metadataRecords, "kafka-native");
Expand All @@ -82,21 +82,8 @@ private static ApiMessageAndVersion metadataVersionMessage(MetadataVersion metad
setFeatureLevel(metadataVersion.featureLevel()));
}

private static ApiMessageAndVersion scramMessage(String scramString) {
return withVersion(parseScram(scramString));
}

static UserScramCredentialRecord parseScram(String scramString) {
var nameValueRecord = scramString.split("=", 2);
if (nameValueRecord.length != 2 || nameValueRecord[0].isEmpty() || nameValueRecord[1].isEmpty()) {
throw new IllegalArgumentException("Expecting scram string in the form 'SCRAM-SHA-256=[name=alice,password=alice-secret]', found '%s'. See https://kafka.apache.org/documentation/#security_sasl_scram_credentials".formatted(scramString));
}
return switch (nameValueRecord[0]) {
case "SCRAM-SHA-256", "SCRAM-SHA-512" ->
StorageTool.getUserScramCredentialRecord(nameValueRecord[0], nameValueRecord[1]);
default ->
throw new IllegalArgumentException("The scram mechanism in '" + scramString + "' is not supported.");
};
private static ApiMessageAndVersion scramMessage(UserScramCredentialRecord scramRecord) {
return withVersion(scramRecord);
}

public static class LoggingOutputStream extends java.io.OutputStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class StorageTest {
class ScramParserTest {
private static final ScramParser SCRAM_PARSER = new ScramParser();

@ParameterizedTest
@ValueSource(strings = {"SCRAM-SHA-256=[name=alice,password=alice-secret]",
"SCRAM-SHA-256=[name=alice,iterations=8192,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\"]"}
)
void legalScramStringAccepted(String scramString) {
var uscr = Storage.parseScram(scramString);
var uscr = SCRAM_PARSER.parseScram(scramString);
assertThat(uscr)
.extracting(UserScramCredentialRecord::mechanism)
.isEqualTo(ScramMechanism.SCRAM_SHA_256.type());
Expand All @@ -27,14 +28,14 @@ void legalScramStringAccepted(String scramString) {
@ParameterizedTest
@ValueSource(strings = {"", "foo", "foo=", "="})
void illegalScramStringDetected(String scramString) {
assertThatThrownBy(() -> Storage.parseScram(scramString))
assertThatThrownBy(() -> SCRAM_PARSER.parseScram(scramString))
.isInstanceOf(IllegalArgumentException.class);
}

@Test
void badMechanismName() {
var scramString = "UNKNOWN=[name=alice,password=alice-secret]";
assertThatThrownBy(() -> Storage.parseScram(scramString))
assertThatThrownBy(() -> SCRAM_PARSER.parseScram(scramString))
.isInstanceOf(IllegalArgumentException.class);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.ozangunalp.kafka.server;

import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.junit.jupiter.api.Test;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.nio.charset.StandardCharsets;

import static org.assertj.core.api.Assertions.assertThat;

class ScramUtilsTest {
@Test
void asScramCredential() {
int iterations = 4096;
byte[] salt = "salt".getBytes(StandardCharsets.UTF_8);
byte[] server = "key".getBytes(StandardCharsets.UTF_8);
var uscr = new UserScramCredentialRecord()
.setIterations(iterations)
.setSalt(salt)
.setServerKey(server);

var sc = ScramUtils.asScramCredential(uscr);
assertThat(sc).extracting(ScramCredential::iterations).isEqualTo(iterations);
assertThat(sc).extracting(ScramCredential::salt).isEqualTo(salt);
assertThat(sc).extracting(ScramCredential::serverKey).isEqualTo(server);


}
}

0 comments on commit a9abdd7

Please sign in to comment.