soarez commented on a change in pull request #10156: URL: https://github.com/apache/kafka/pull/10156#discussion_r628515373
########## File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java ########## @@ -81,12 +94,31 @@ private String tmfAlgorithm; private SecurityStore keystore; private SecurityStore truststore; + private long keyStoreRefreshIntervalMs = DEFAULT_SECURITY_STORE_REFRESH_INTERVAL_MS; + private long trustStoreRefreshIntervalMs = DEFAULT_SECURITY_STORE_REFRESH_INTERVAL_MS; + private final SecurityFileChangeListener securityFileChangeListener; + private final Thread securityStoreRefreshThread; + private String[] cipherSuites; private String[] enabledProtocols; private SecureRandom secureRandomImplementation; - private SSLContext sslContext; + private AtomicReference<SSLContext> sslContext = new AtomicReference<>(null); private SslClientAuth sslClientAuth; + public DefaultSslEngineFactory() throws IOException { + this(SystemTime.SYSTEM); + } + + // For testing only + public DefaultSslEngineFactory(Time time) throws IOException { Review comment: Should this be public? ########## File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java ########## @@ -127,6 +156,138 @@ public KeyStore truststore() { return this.truststore != null ? this.truststore.get() : null; } + class SecurityFileChangeListener implements Runnable { + private final Timer keyStoreRefreshTimer; + private final Timer trustStoreRefreshTimer; + private final WatchService watchService; + private final Map<WatchKey, Path> watchKeyPathMap = new HashMap<>(); + private final Map<Path, SecurityStore> fileToStoreMap = new HashMap<>(); + private final AtomicReference<Exception> lastLoadFailure = new AtomicReference<>(null); Review comment: Seeing as the value is only ever set and read, could this be simply `private volatile Exception lastLoadFailure`? ########## File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java ########## @@ -127,6 +156,138 @@ public KeyStore truststore() { return this.truststore != null ? this.truststore.get() : null; } + class SecurityFileChangeListener implements Runnable { + private final Timer keyStoreRefreshTimer; + private final Timer trustStoreRefreshTimer; + private final WatchService watchService; + private final Map<WatchKey, Path> watchKeyPathMap = new HashMap<>(); + private final Map<Path, SecurityStore> fileToStoreMap = new HashMap<>(); + private final AtomicReference<Exception> lastLoadFailure = new AtomicReference<>(null); + + SecurityFileChangeListener(final Timer keyStoreRefreshTimer, + final Timer trustStoreRefreshTimer) throws IOException { + this.keyStoreRefreshTimer = keyStoreRefreshTimer; + this.trustStoreRefreshTimer = trustStoreRefreshTimer; + try { + watchService = FileSystems.getDefault().newWatchService(); + } catch (IOException e) { + log.error("Failed to run the listener thread due to IO exception", e); + throw e; + } + } + + void updateStoreKey(SecurityStore store, final String watchFile) { + try { + Path filePath = Paths.get(watchFile); + fileToStoreMap.put(filePath, store); + + Path dirPath = filePath.getParent(); + if (dirPath == null) { + throw new IOException("Unexpected null path with no parent"); + } + + if (!Files.exists(dirPath)) { + Files.createDirectories(dirPath); + } + WatchKey watchkey = dirPath.register(watchService, + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.ENTRY_DELETE, + StandardWatchEventKinds.OVERFLOW); + watchKeyPathMap.put(watchkey, dirPath); + log.info("Watch service registered for store path = {}", dirPath); + } catch (IOException e) { + // If the update failed, we will try to use existing store path instead. + log.error("Could not register store path for file {}", watchFile, e); + } + } + + // For testing purpose now. + Exception lastLoadFailure() { + return lastLoadFailure.get(); + } + + public void run() { + for (Map.Entry<WatchKey, Path> key : watchKeyPathMap.entrySet()) { + log.debug("Starting listening for change key {} for path {}", key.getKey(), key.getValue()); + } + resetKeyStoreTimer(); + resetTrustStoreTimer(); + + try { + runLoop(); + } catch (InterruptedException ie) { + log.debug("Security file listener {} was interrupted to shutdown", watchKeyPathMap); + } catch (Exception e) { + log.warn("Hit a fatal exception in security file listener", e); + } + } + + private void runLoop() throws InterruptedException { + while (!watchKeyPathMap.isEmpty()) { + keyStoreRefreshTimer.update(); + trustStoreRefreshTimer.update(); + final long maxPollIntervalMs = Math.min(keyStoreRefreshTimer.remainingMs(), + trustStoreRefreshTimer.remainingMs()); + log.debug("Max poll interval is {} with trust store remaining time {} and trust store time {}", + maxPollIntervalMs, trustStoreRefreshTimer.remainingMs(), trustStoreRefreshIntervalMs); + WatchKey watchKey = watchService.poll(maxPollIntervalMs, TimeUnit.MILLISECONDS); + + // Handle file update triggered events. + if (watchKey != null && watchKeyPathMap.containsKey(watchKey)) { + for (WatchEvent<?> event: watchKey.pollEvents()) { + log.debug("Got file change event: {} {}", event.kind(), event.context()); + + @SuppressWarnings("unchecked") + Path filePath = watchKeyPathMap.get(watchKey).resolve(((WatchEvent<Path>) event).context()); + + if (fileToStoreMap.containsKey(filePath)) { + maybeReloadStore(fileToStoreMap.get(filePath)); + sslContext.set(createSSLContext(keystore, truststore)); + } else { + log.debug("Unknown file name: {}", filePath); + } + } + if (!watchKey.reset()) { + watchKeyPathMap.remove(watchKey); Review comment: This will stop the watch on the file, right? Should there be a warning logged indicating the file is no longer watched? ########## File path: clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java ########## @@ -313,12 +322,124 @@ private String pemFilePath(String pem) throws Exception { return TestUtils.tempFile(pem).getAbsolutePath(); } - private Password pemAsConfigValue(String... pemValues) throws Exception { + private Password pemAsConfigValue(String... pemValues) { StringBuilder builder = new StringBuilder(); for (String pem : pemValues) { builder.append(pem); builder.append("\n"); } return new Password(builder.toString().trim()); } + + @Test + public void testKeyStoreFileTriggerReload() throws Exception { + MockTime time = new MockTime(0L, 0L, 0L); + DefaultSslEngineFactory factory = new DefaultSslEngineFactory(time); + configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); + configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L); + configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, Long.MAX_VALUE); + + final String filePath = pemFilePath(pemAsConfigValue(ENCRYPTED_KEY, CERTCHAIN).value()); + + configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, filePath); + configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, KEY_PASSWORD); + configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE); + factory.configure(configs); + + // Make sure the thread starts to listen for file changes. + sleep(1000); Review comment: I don't think this is necessary. The SecurityFileChangeListener thread may not yet have started, but the watch services are already registered after `factory.configure(configs)`. The file change below should queue a change even if the thread hasn't started. ########## File path: clients/src/test/java/org/apache/kafka/test/TestSslUtils.java ########## @@ -579,6 +583,9 @@ public SslConfigsBuilder usePem(boolean usePem) { DefaultSslEngineFactory defaultSslEngineFactory = new DefaultSslEngineFactory(); + public TestSslEngineFactory() throws IOException { Review comment: Is this needed? ########## File path: clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java ########## @@ -313,12 +322,124 @@ private String pemFilePath(String pem) throws Exception { return TestUtils.tempFile(pem).getAbsolutePath(); } - private Password pemAsConfigValue(String... pemValues) throws Exception { + private Password pemAsConfigValue(String... pemValues) { StringBuilder builder = new StringBuilder(); for (String pem : pemValues) { builder.append(pem); builder.append("\n"); } return new Password(builder.toString().trim()); } + + @Test + public void testKeyStoreFileTriggerReload() throws Exception { + MockTime time = new MockTime(0L, 0L, 0L); + DefaultSslEngineFactory factory = new DefaultSslEngineFactory(time); + configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); + configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L); + configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, Long.MAX_VALUE); + + final String filePath = pemFilePath(pemAsConfigValue(ENCRYPTED_KEY, CERTCHAIN).value()); + + configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, filePath); + configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, KEY_PASSWORD); + configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE); + factory.configure(configs); + + // Make sure the thread starts to listen for file changes. + sleep(1000); + + final FileWriter writer = new FileWriter(filePath); + writer.write(pemAsConfigValue(KEY, CERTCHAIN).value()); + writer.flush(); + writer.close(); + + TestUtils.waitForCondition(() -> factory.securityFileChangeListener().lastLoadFailure() != null, + "key store not reloaded or encountered expected failure"); + } + + @Test + public void testKeyStoreTimeBasedReload() throws Exception { + MockTime time = new MockTime(0L, 0L, 0L); + DefaultSslEngineFactory factory = new DefaultSslEngineFactory(time); + configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); + configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L); + configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, Long.MAX_VALUE); + + final String filePath = pemFilePath(pemAsConfigValue(ENCRYPTED_KEY, CERTCHAIN).value()); + + configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, filePath); + configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, KEY_PASSWORD); + configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE); + factory.configure(configs); + + final FileWriter writer = new FileWriter(filePath); + writer.write(pemAsConfigValue(KEY, CERTCHAIN).value()); + writer.flush(); + writer.close(); + + time.setCurrentTimeMs(1200L); + TestUtils.waitForCondition(() -> factory.securityFileChangeListener().lastLoadFailure() != null, + "key store not reloaded or encountered expected failure"); + } + + + + @Test + public void testTrustStoreFileTriggerReload() throws Exception { + DefaultSslEngineFactory factory = new DefaultSslEngineFactory(); + configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); + configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L); + configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L); + + final String filePath = pemFilePath(CA1); + + configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, filePath); + configs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE); + factory.configure(configs); + + // Make sure the thread starts to listen for file changes. + sleep(1000); Review comment: Same here ########## File path: clients/src/main/java/org/apache/kafka/common/utils/Timer.java ########## @@ -158,6 +158,10 @@ public long remainingMs() { return Math.max(0, deadlineMs - currentTimeMs); } + public long getDeadlineMs() { Review comment: I don't see where this is used. Is it necessary? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org