soarez commented on a change in pull request #10156:
URL: https://github.com/apache/kafka/pull/10156#discussion_r628515373



##########
File path: 
clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -81,12 +94,31 @@
     private String tmfAlgorithm;
     private SecurityStore keystore;
     private SecurityStore truststore;
+    private long keyStoreRefreshIntervalMs = 
DEFAULT_SECURITY_STORE_REFRESH_INTERVAL_MS;
+    private long trustStoreRefreshIntervalMs = 
DEFAULT_SECURITY_STORE_REFRESH_INTERVAL_MS;
+    private final SecurityFileChangeListener securityFileChangeListener;
+    private final Thread securityStoreRefreshThread;
+
     private String[] cipherSuites;
     private String[] enabledProtocols;
     private SecureRandom secureRandomImplementation;
-    private SSLContext sslContext;
+    private AtomicReference<SSLContext> sslContext = new 
AtomicReference<>(null);
     private SslClientAuth sslClientAuth;
 
+    public DefaultSslEngineFactory() throws IOException {
+        this(SystemTime.SYSTEM);
+    }
+
+    // For testing only
+    public DefaultSslEngineFactory(Time time) throws IOException {

Review comment:
       Should this be public?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -127,6 +156,138 @@ public KeyStore truststore() {
         return this.truststore != null ? this.truststore.get() : null;
     }
 
+    class SecurityFileChangeListener implements Runnable {
+        private final Timer keyStoreRefreshTimer;
+        private final Timer trustStoreRefreshTimer;
+        private final WatchService watchService;
+        private final Map<WatchKey, Path> watchKeyPathMap = new HashMap<>();
+        private final Map<Path, SecurityStore> fileToStoreMap = new 
HashMap<>();
+        private final AtomicReference<Exception> lastLoadFailure = new 
AtomicReference<>(null);

Review comment:
       Seeing as the value is only ever set and read, could this be simply 
`private volatile Exception lastLoadFailure`?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -127,6 +156,138 @@ public KeyStore truststore() {
         return this.truststore != null ? this.truststore.get() : null;
     }
 
+    class SecurityFileChangeListener implements Runnable {
+        private final Timer keyStoreRefreshTimer;
+        private final Timer trustStoreRefreshTimer;
+        private final WatchService watchService;
+        private final Map<WatchKey, Path> watchKeyPathMap = new HashMap<>();
+        private final Map<Path, SecurityStore> fileToStoreMap = new 
HashMap<>();
+        private final AtomicReference<Exception> lastLoadFailure = new 
AtomicReference<>(null);
+
+        SecurityFileChangeListener(final Timer keyStoreRefreshTimer,
+                                   final Timer trustStoreRefreshTimer) throws 
IOException {
+            this.keyStoreRefreshTimer = keyStoreRefreshTimer;
+            this.trustStoreRefreshTimer = trustStoreRefreshTimer;
+            try {
+                watchService = FileSystems.getDefault().newWatchService();
+            } catch (IOException e) {
+                log.error("Failed to run the listener thread due to IO 
exception", e);
+                throw e;
+            }
+        }
+
+        void updateStoreKey(SecurityStore store, final String watchFile) {
+            try {
+                Path filePath = Paths.get(watchFile);
+                fileToStoreMap.put(filePath, store);
+
+                Path dirPath = filePath.getParent();
+                if (dirPath == null) {
+                    throw new IOException("Unexpected null path with no 
parent");
+                }
+
+                if (!Files.exists(dirPath)) {
+                    Files.createDirectories(dirPath);
+                }
+                WatchKey watchkey = dirPath.register(watchService,
+                    StandardWatchEventKinds.ENTRY_CREATE,
+                    StandardWatchEventKinds.ENTRY_MODIFY,
+                    StandardWatchEventKinds.ENTRY_DELETE,
+                    StandardWatchEventKinds.OVERFLOW);
+                watchKeyPathMap.put(watchkey, dirPath);
+                log.info("Watch service registered for store path = {}", 
dirPath);
+            } catch (IOException e) {
+                // If the update failed, we will try to use existing store 
path instead.
+                log.error("Could not register store path for file {}", 
watchFile, e);
+            }
+        }
+
+        // For testing purpose now.
+        Exception lastLoadFailure() {
+            return lastLoadFailure.get();
+        }
+
+        public void run() {
+            for (Map.Entry<WatchKey, Path> key : watchKeyPathMap.entrySet()) {
+                log.debug("Starting listening for change key {} for path {}", 
key.getKey(), key.getValue());
+            }
+            resetKeyStoreTimer();
+            resetTrustStoreTimer();
+
+            try {
+                runLoop();
+            } catch (InterruptedException ie) {
+                log.debug("Security file listener {} was interrupted to 
shutdown", watchKeyPathMap);
+            } catch (Exception e) {
+                log.warn("Hit a fatal exception in security file listener", e);
+            }
+        }
+
+        private void runLoop() throws InterruptedException {
+            while (!watchKeyPathMap.isEmpty()) {
+                keyStoreRefreshTimer.update();
+                trustStoreRefreshTimer.update();
+                final long maxPollIntervalMs = 
Math.min(keyStoreRefreshTimer.remainingMs(),
+                    trustStoreRefreshTimer.remainingMs());
+                log.debug("Max poll interval is {} with trust store remaining 
time {} and trust store time {}",
+                    maxPollIntervalMs, trustStoreRefreshTimer.remainingMs(), 
trustStoreRefreshIntervalMs);
+                WatchKey watchKey = watchService.poll(maxPollIntervalMs, 
TimeUnit.MILLISECONDS);
+
+                // Handle file update triggered events.
+                if (watchKey != null && watchKeyPathMap.containsKey(watchKey)) 
{
+                    for (WatchEvent<?> event: watchKey.pollEvents()) {
+                        log.debug("Got file change event: {} {}", 
event.kind(), event.context());
+
+                        @SuppressWarnings("unchecked")
+                        Path filePath = 
watchKeyPathMap.get(watchKey).resolve(((WatchEvent<Path>) event).context());
+
+                        if (fileToStoreMap.containsKey(filePath)) {
+                            maybeReloadStore(fileToStoreMap.get(filePath));
+                            sslContext.set(createSSLContext(keystore, 
truststore));
+                        } else {
+                            log.debug("Unknown file name: {}", filePath);
+                        }
+                    }
+                    if (!watchKey.reset()) {
+                        watchKeyPathMap.remove(watchKey);

Review comment:
       This will stop the watch on the file, right? Should there be a warning 
logged indicating the file is no longer watched?

##########
File path: 
clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
##########
@@ -313,12 +322,124 @@ private String pemFilePath(String pem) throws Exception {
         return TestUtils.tempFile(pem).getAbsolutePath();
     }
 
-    private Password pemAsConfigValue(String... pemValues)  throws Exception {
+    private Password pemAsConfigValue(String... pemValues) {
         StringBuilder builder = new StringBuilder();
         for (String pem : pemValues) {
             builder.append(pem);
             builder.append("\n");
         }
         return new Password(builder.toString().trim());
     }
+
+    @Test
+    public void testKeyStoreFileTriggerReload() throws Exception {
+        MockTime time = new MockTime(0L, 0L, 0L);
+        DefaultSslEngineFactory factory = new DefaultSslEngineFactory(time);
+        configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
+        
configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L);
+        
configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 
Long.MAX_VALUE);
+
+        final String filePath = pemFilePath(pemAsConfigValue(ENCRYPTED_KEY, 
CERTCHAIN).value());
+
+        configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, filePath);
+        configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, KEY_PASSWORD);
+        configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
DefaultSslEngineFactory.PEM_TYPE);
+        factory.configure(configs);
+
+        // Make sure the thread starts to listen for file changes.
+        sleep(1000);

Review comment:
       I don't think this is necessary. The SecurityFileChangeListener thread 
may not yet have started, but the watch services are already registered after 
`factory.configure(configs)`. The file change below should queue a change even 
if the thread hasn't started.

##########
File path: clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
##########
@@ -579,6 +583,9 @@ public SslConfigsBuilder usePem(boolean usePem) {
 
         DefaultSslEngineFactory defaultSslEngineFactory = new 
DefaultSslEngineFactory();
 
+        public TestSslEngineFactory() throws IOException {

Review comment:
       Is this needed?

##########
File path: 
clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
##########
@@ -313,12 +322,124 @@ private String pemFilePath(String pem) throws Exception {
         return TestUtils.tempFile(pem).getAbsolutePath();
     }
 
-    private Password pemAsConfigValue(String... pemValues)  throws Exception {
+    private Password pemAsConfigValue(String... pemValues) {
         StringBuilder builder = new StringBuilder();
         for (String pem : pemValues) {
             builder.append(pem);
             builder.append("\n");
         }
         return new Password(builder.toString().trim());
     }
+
+    @Test
+    public void testKeyStoreFileTriggerReload() throws Exception {
+        MockTime time = new MockTime(0L, 0L, 0L);
+        DefaultSslEngineFactory factory = new DefaultSslEngineFactory(time);
+        configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
+        
configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L);
+        
configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 
Long.MAX_VALUE);
+
+        final String filePath = pemFilePath(pemAsConfigValue(ENCRYPTED_KEY, 
CERTCHAIN).value());
+
+        configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, filePath);
+        configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, KEY_PASSWORD);
+        configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
DefaultSslEngineFactory.PEM_TYPE);
+        factory.configure(configs);
+
+        // Make sure the thread starts to listen for file changes.
+        sleep(1000);
+
+        final FileWriter writer = new FileWriter(filePath);
+        writer.write(pemAsConfigValue(KEY, CERTCHAIN).value());
+        writer.flush();
+        writer.close();
+
+        TestUtils.waitForCondition(() -> 
factory.securityFileChangeListener().lastLoadFailure() != null,
+            "key store not reloaded or encountered expected failure");
+    }
+
+    @Test
+    public void testKeyStoreTimeBasedReload() throws Exception {
+        MockTime time = new MockTime(0L, 0L, 0L);
+        DefaultSslEngineFactory factory = new DefaultSslEngineFactory(time);
+        configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
+        
configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L);
+        
configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 
Long.MAX_VALUE);
+
+        final String filePath = pemFilePath(pemAsConfigValue(ENCRYPTED_KEY, 
CERTCHAIN).value());
+
+        configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, filePath);
+        configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, KEY_PASSWORD);
+        configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
DefaultSslEngineFactory.PEM_TYPE);
+        factory.configure(configs);
+
+        final FileWriter writer = new FileWriter(filePath);
+        writer.write(pemAsConfigValue(KEY, CERTCHAIN).value());
+        writer.flush();
+        writer.close();
+
+        time.setCurrentTimeMs(1200L);
+        TestUtils.waitForCondition(() -> 
factory.securityFileChangeListener().lastLoadFailure() != null,
+            "key store not reloaded or encountered expected failure");
+    }
+
+
+
+    @Test
+    public void testTrustStoreFileTriggerReload() throws Exception {
+        DefaultSslEngineFactory factory = new DefaultSslEngineFactory();
+        configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
+        
configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L);
+        
configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 
1000L);
+
+        final String filePath = pemFilePath(CA1);
+
+        configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, filePath);
+        configs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
DefaultSslEngineFactory.PEM_TYPE);
+        factory.configure(configs);
+
+        // Make sure the thread starts to listen for file changes.
+        sleep(1000);

Review comment:
       Same here

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Timer.java
##########
@@ -158,6 +158,10 @@ public long remainingMs() {
         return Math.max(0, deadlineMs - currentTimeMs);
     }
 
+    public long getDeadlineMs() {

Review comment:
       I don't see where this is used. Is it necessary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to