[
https://issues.apache.org/jira/browse/KAFKA-7429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16636036#comment-16636036
]
ASF GitHub Bot commented on KAFKA-7429:
---------------------------------------
cmccabe closed pull request #5699: KAFKA-7429: Enable key/truststore update
with same filename/password
URL: https://github.com/apache/kafka/pull/5699
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index b1f7df87690..b9b52037c52 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -24,6 +24,8 @@
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
@@ -47,6 +49,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
@@ -54,8 +57,9 @@
import java.util.Set;
import java.util.HashSet;
-
public class SslFactory implements Reconfigurable {
+ private static final Logger log =
LoggerFactory.getLogger(SslFactory.class);
+
private final Mode mode;
private final String clientAuthConfigOverride;
private final boolean keystoreVerifiableUsingTruststore;
@@ -183,6 +187,9 @@ private SecurityStore maybeCreateNewKeystore(Map<String, ?>
configs) {
!Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
keystore.password) ||
!Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG),
keystore.keyPassword);
+ if (!keystoreChanged) {
+ keystoreChanged = keystore.modified();
+ }
if (keystoreChanged) {
return createKeystore((String)
configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
(String)
configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
@@ -197,6 +204,9 @@ private SecurityStore maybeCreateNewTruststore(Map<String,
?> configs) {
!Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
truststore.path) ||
!Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG),
truststore.password);
+ if (!truststoreChanged) {
+ truststoreChanged = truststore.modified();
+ }
if (truststoreChanged) {
return createTruststore((String)
configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
(String)
configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
@@ -306,6 +316,7 @@ private SecurityStore createTruststore(String type, String
path, Password passwo
private final String path;
private final Password password;
private final Password keyPassword;
+ private Long fileLastModifiedMs;
SecurityStore(String type, String path, Password password, Password
keyPassword) {
Objects.requireNonNull(type, "type must not be null");
@@ -327,11 +338,29 @@ KeyStore load() {
// If a password is not set access to the truststore is still
available, but integrity checking is disabled.
char[] passwordChars = password != null ?
password.value().toCharArray() : null;
ks.load(in, passwordChars);
+ fileLastModifiedMs = lastModifiedMs(path);
+
+ log.debug("Loaded key store with path {} modification time
{}", path,
+ fileLastModifiedMs == null ? null : new
Date(fileLastModifiedMs));
return ks;
} catch (GeneralSecurityException | IOException e) {
throw new KafkaException("Failed to load SSL keystore " + path
+ " of type " + type, e);
}
}
+
+ private Long lastModifiedMs(String path) {
+ try {
+ return Files.getLastModifiedTime(Paths.get(path)).toMillis();
+ } catch (IOException e) {
+ log.error("Modification time of key store could not be
obtained: " + path, e);
+ return null;
+ }
+ }
+
+ boolean modified() {
+ Long modifiedMs = lastModifiedMs(path);
+ return modifiedMs != null && !Objects.equals(modifiedMs,
this.fileLastModifiedMs);
+ }
}
/**
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index a134104c266..97021e3908a 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.security.ssl;
import java.io.File;
+import java.nio.file.Files;
import java.security.KeyStore;
import java.util.Map;
@@ -32,8 +33,11 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -81,6 +85,48 @@ public void testClientMode() throws Exception {
assertTrue(engine.getUseClientMode());
}
+ @Test
+ public void testReconfiguration() throws Exception {
+ File trustStoreFile = File.createTempFile("truststore", ".jks");
+ Map<String, Object> sslConfig = TestSslUtils.createSslConfig(false,
true, Mode.SERVER, trustStoreFile, "server");
+ SslFactory sslFactory = new SslFactory(Mode.SERVER);
+ sslFactory.configure(sslConfig);
+ SSLContext sslContext = sslFactory.sslContext();
+ assertNotNull("SSL context not created", sslContext);
+ assertSame("SSL context recreated unnecessarily", sslContext,
sslFactory.sslContext());
+ assertFalse(sslContext.createSSLEngine("localhost",
0).getUseClientMode());
+
+ // Verify that context is not recreated on reconfigure() if config and
file are not changed
+ sslFactory.reconfigure(sslConfig);
+ assertSame("SSL context recreated unnecessarily", sslContext,
sslFactory.sslContext());
+
+ // Verify that context is recreated on reconfigure() if config is
changed
+ trustStoreFile = File.createTempFile("truststore", ".jks");
+ sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER,
trustStoreFile, "server");
+ sslFactory.reconfigure(sslConfig);
+ assertNotSame("SSL context not recreated", sslContext,
sslFactory.sslContext());
+ sslContext = sslFactory.sslContext();
+
+ // Verify that context is recreated on reconfigure() if config is not
changed, but truststore file was modified
+ trustStoreFile.setLastModified(System.currentTimeMillis() + 10000);
+ sslFactory.reconfigure(sslConfig);
+ assertNotSame("SSL context not recreated", sslContext,
sslFactory.sslContext());
+ sslContext = sslFactory.sslContext();
+
+ // Verify that context is recreated on reconfigure() if config is not
changed, but keystore file was modified
+ File keyStoreFile = new File((String)
sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
+ keyStoreFile.setLastModified(System.currentTimeMillis() + 10000);
+ sslFactory.reconfigure(sslConfig);
+ assertNotSame("SSL context not recreated", sslContext,
sslFactory.sslContext());
+ sslContext = sslFactory.sslContext();
+
+ // Verify that the context is not recreated if modification time
cannot be determined
+ keyStoreFile.setLastModified(System.currentTimeMillis() + 20000);
+ Files.delete(keyStoreFile.toPath());
+ sslFactory.reconfigure(sslConfig);
+ assertSame("SSL context recreated unnecessarily", sslContext,
sslFactory.sslContext());
+ }
+
@Test
public void testKeyStoreTrustStoreValidation() throws Exception {
File trustStoreFile = File.createTempFile("truststore", ".jks");
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala
b/core/src/main/scala/kafka/server/AdminManager.scala
index e9598e365ab..11f6f1c44cc 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -386,6 +386,8 @@ class AdminManager(val config: KafkaConfig,
this.config.dynamicConfig.validate(configProps, perBrokerConfig)
validateConfigPolicy(ConfigResource.Type.BROKER)
if (!validateOnly) {
+ if (perBrokerConfig)
+
this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
adminZkClient.changeBrokerConfig(brokerId,
this.config.dynamicConfig.toPersistentProps(configProps,
perBrokerConfig))
}
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index bcaaa02ee69..2c0f6c1b52c 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -87,6 +87,8 @@ object DynamicBrokerConfig {
DynamicListenerConfig.ReconfigurableConfigs
private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp)
+ private val ReloadableFileConfigs =
Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
+
val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
private val DynamicPasswordConfigs = {
@@ -267,6 +269,27 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
}
}
+ /**
+ * All config updates through ZooKeeper are triggered through actual changes
in values stored in ZooKeeper.
+ * For some configs like SSL keystores and truststores, we also want to
reload the store if it was modified
+ * in-place, even though the actual value of the file path and password
haven't changed. This scenario alone
+ * is handled here when a config update request using admin client is
processed by AdminManager. If any of
+ * the SSL configs have changed, then the update will not be done here, but
will be handled later when ZK
+ * changes are processed. At the moment, only listener configs are
considered for reloading.
+ */
+ private[server] def reloadUpdatedFilesWithoutConfigChange(newProps:
Properties): Unit = CoreUtils.inWriteLock(lock) {
+ reconfigurables
+ .filter(reconfigurable =>
ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+ .foreach {
+ case reconfigurable: ListenerReconfigurable =>
+ val kafkaProps = validatedKafkaProps(newProps, perBrokerConfig =
true)
+ val newConfig = new KafkaConfig(kafkaProps.asJava, false, None)
+ processListenerReconfigurable(reconfigurable, newConfig,
Collections.emptyMap(), validateOnly = false, reloadOnly = true)
+ case reconfigurable =>
+ trace(s"Files will not be reloaded without config change for
$reconfigurable")
+ }
+ }
+
private def maybeCreatePasswordEncoder(secret: Option[Password]):
Option[PasswordEncoder] = {
secret.map { secret =>
new PasswordEncoder(secret,
@@ -355,17 +378,28 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
props
}
- private[server] def validate(props: Properties, perBrokerConfig: Boolean):
Unit = CoreUtils.inReadLock(lock) {
- validateConfigs(props, perBrokerConfig)
+ /**
+ * Validate the provided configs `propsOverride` and return the full Kafka
configs with
+ * the configured defaults and these overrides.
+ *
+ * Note: The caller must acquire the read or write lock before invoking this
method.
+ */
+ private def validatedKafkaProps(propsOverride: Properties, perBrokerConfig:
Boolean): Map[String, String] = {
+ validateConfigs(propsOverride, perBrokerConfig)
val newProps = mutable.Map[String, String]()
newProps ++= staticBrokerConfigs
if (perBrokerConfig) {
overrideProps(newProps, dynamicDefaultConfigs)
- overrideProps(newProps, props.asScala)
+ overrideProps(newProps, propsOverride.asScala)
} else {
- overrideProps(newProps, props.asScala)
+ overrideProps(newProps, propsOverride.asScala)
overrideProps(newProps, dynamicBrokerConfigs)
}
+ newProps
+ }
+
+ private[server] def validate(props: Properties, perBrokerConfig: Boolean):
Unit = CoreUtils.inReadLock(lock) {
+ val newProps = validatedKafkaProps(props, perBrokerConfig)
processReconfiguration(newProps, validateOnly = true)
}
@@ -445,12 +479,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
newConfig.valuesFromThisConfig.keySet.asScala.foreach(customConfigs.remove)
reconfigurables.foreach {
case listenerReconfigurable: ListenerReconfigurable =>
- val listenerName = listenerReconfigurable.listenerName
- val oldValues =
currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
- val newValues =
newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
- val updatedKeys = updatedConfigs(newValues, oldValues).keySet
- if
(needsReconfiguration(listenerReconfigurable.reconfigurableConfigs,
updatedKeys))
- processReconfigurable(listenerReconfigurable, updatedKeys,
newValues, customConfigs, validateOnly)
+ processListenerReconfigurable(listenerReconfigurable, newConfig,
customConfigs, validateOnly, reloadOnly = false)
case reconfigurable =>
if (needsReconfiguration(reconfigurable.reconfigurableConfigs,
updatedMap.keySet))
processReconfigurable(reconfigurable, updatedMap.keySet,
newConfig.valuesFromThisConfig, customConfigs, validateOnly)
@@ -481,6 +510,21 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty
}
+ private def processListenerReconfigurable(listenerReconfigurable:
ListenerReconfigurable,
+ newConfig: KafkaConfig,
+ customConfigs: util.Map[String,
Object],
+ validateOnly: Boolean,
+ reloadOnly: Boolean): Unit = {
+ val listenerName = listenerReconfigurable.listenerName
+ val oldValues =
currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
+ val newValues =
newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
+ val updatedKeys = updatedConfigs(newValues, oldValues).keySet
+ val configsChanged =
needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys)
+ // if `reloadOnly`, reconfigure if configs haven't changed. Otherwise
reconfigure if configs have changed
+ if (reloadOnly != configsChanged)
+ processReconfigurable(listenerReconfigurable, updatedKeys, newValues,
customConfigs, validateOnly)
+ }
+
private def processReconfigurable(reconfigurable: Reconfigurable,
updatedConfigNames: Set[String],
allNewConfigs: util.Map[String, _],
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index f772e586d6e..5d15cc46ed9 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -300,6 +300,15 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
(s"$prefix$SSL_TRUSTSTORE_LOCATION_CONFIG",
sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)))
verifyAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties2).build())
verifySslProduceConsume(sslProperties1, "alter-truststore-3")
+
+ // Update same truststore file to contain both certificates without
changing any configs.
+ // Clients should connect successfully with either keystore after admin
client AlterConfigsRequest completes.
+
Files.copy(Paths.get(combinedStoreProps.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
+ Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
+ StandardCopyOption.REPLACE_EXISTING)
+ TestUtils.alterConfigs(servers, adminClients.head, oldTruststoreProps,
perBrokerConfig = true)
+ verifySslProduceConsume(sslProperties1, "alter-truststore-4")
+ verifySslProduceConsume(sslProperties2, "alter-truststore-5")
}
@Test
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Enable dynamic key/truststore update with same filename/password
> ----------------------------------------------------------------
>
> Key: KAFKA-7429
> URL: https://issues.apache.org/jira/browse/KAFKA-7429
> Project: Kafka
> Issue Type: Improvement
> Components: security
> Reporter: Rajini Sivaram
> Assignee: Rajini Sivaram
> Priority: Major
> Fix For: 2.1.0
>
>
> At the moment, SSL keystores and truststores on brokers can be dynamically
> updated using AdminClient by providing a new keystore or truststore. But we
> require either the filename or password to be modified to trigger the update.
> In some scenarios, we may want to perform the update using the same file (and
> password). So it will be good to provide a way to trigger reload of existing
> keystores and truststores using the same AdminClient update mechanism.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)