Repository: camel
Updated Branches:
  refs/heads/master 9584f3851 -> 39742f911


CAMEL-10705 - Allow to use an SSLContextParameters object for Kafka


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/39742f91
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/39742f91
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/39742f91

Branch: refs/heads/master
Commit: 39742f911c7cf7aeec71f880712ccd808e51ccbe
Parents: 9584f38
Author: Antoine DESSAIGNE <antoine.dessai...@gmail.com>
Authored: Fri Jan 13 14:35:42 2017 +0100
Committer: Antoine DESSAIGNE <antoine.dessai...@gmail.com>
Committed: Fri Jan 13 14:35:42 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          | 50 ++++++++++++-
 .../component/kafka/KafkaConfiguration.java     | 78 ++++++++++++++++++++
 2 files changed, 127 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/39742f91/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index d100a32..8eacb10 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -49,7 +49,7 @@ The Kafka component supports 1 options which are listed below.
 
 
 // endpoint options: START
-The Kafka component supports 78 endpoint options which are listed below:
+The Kafka component supports 79 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -120,6 +120,7 @@ The Kafka component supports 78 endpoint options which are 
listed below:
 | saslMechanism | security | GSSAPI | String | The Simple Authentication and 
Security Layer (SASL) Mechanism used. For the valid values see 
http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml
 | securityProtocol | security | PLAINTEXT | String | Protocol used to 
communicate with brokers. Currently only PLAINTEXT and SSL are supported.
 | sslCipherSuites | security |  | String | A list of cipher suites. This is a 
named combination of authentication encryption MAC and key exchange algorithm 
used to negotiate the security settings for a network connection using TLS or 
SSL network protocol.By default all the available cipher suites are supported.
+| sslContextParameters | security |  | SSLContextParameters | SSL 
configuration using a Camel SSLContextParameters object. If configured it's 
applied before the other SSL endpoint parameters.
 | sslEnabledProtocols | security | TLSv1.2,TLSv1.1,TLSv1 | String | The list 
of protocols enabled for SSL connections. TLSv1.2 TLSv1.1 and TLSv1 are enabled 
by default.
 | sslEndpointAlgorithm | security |  | String | The endpoint identification 
algorithm to validate server hostname using server certificate.
 | sslKeymanagerAlgorithm | security | SunX509 | String | The algorithm used by 
key manager factory for SSL connections. Default value is the key manager 
factory algorithm configured for the Java Virtual Machine.
@@ -229,6 +230,53 @@ from("direct:start")
     .to("kafka:localhost:9092?topic=test");
 ----------------------------------------------------------------------------
 
+
+#### SSL configuration
+
+You have 2 different ways to configure the SSL communication on the Kafka` 
component.
+
+The first way is through the many SSL endpoint parameters
+[source,java]
+-------------------------------------------------------------
+from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +
+             "&groupId=A" +
+             "&sslKeystoreLocation=/path/to/keystore.jks" +
+             "&sslKeystorePassword=changeit" +
+             "&sslKeyPassword=changeit")
+        .to("mock:result");
+-------------------------------------------------------------
+
+The second way is to use the `sslContextParameters` endpoint parameter.
+[source,java]
+--------------------------------------------------------------------------------------------------
+// Configure the SSLContextParameters object
+KeyStoreParameters ksp = new KeyStoreParameters();
+ksp.setResource("/path/to/keystore.jks");
+ksp.setPassword("changeit");
+KeyManagersParameters kmp = new KeyManagersParameters();
+kmp.setKeyStore(ksp);
+kmp.setKeyPassword("changeit");
+SSLContextParameters scp = new SSLContextParameters();
+scp.setKeyManagers(kmp);
+
+// Bind this SSLContextParameters into the Camel registry
+JndiRegistry registry = new JndiRegistry();
+registry.bind("ssl", scp);
+
+// Configure the camel context
+DefaultCamelContext camelContext = new DefaultCamelContext(registry);
+camelContext.addRoutes(new RouteBuilder() {
+    @Override
+    public void configure() throws Exception {
+        from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +  //
+                     "&groupId=A" +                            //
+                     "&sslContextParameters=#ssl")             // Reference 
the SSL configuration
+                .to("mock:result");
+    }
+});
+--------------------------------------------------------------------------------------------------
+
+
 ### Endpoints
 
 Camel supports the link:message-endpoint.html[Message Endpoint] pattern

http://git-wip-us.apache.org/repos/asf/camel/blob/39742f91/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 214fd2f..a10ace4 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.Metadata;
@@ -27,6 +28,12 @@ import org.apache.camel.spi.StateRepository;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
+import org.apache.camel.util.jsse.CipherSuitesParameters;
+import org.apache.camel.util.jsse.KeyManagersParameters;
+import org.apache.camel.util.jsse.KeyStoreParameters;
+import org.apache.camel.util.jsse.SSLContextParameters;
+import org.apache.camel.util.jsse.SecureSocketProtocolsParameters;
+import org.apache.camel.util.jsse.TrustManagersParameters;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -180,6 +187,10 @@ public class KafkaConfiguration {
     //reconnect.backoff.ms
     @UriParam(label = "producer", defaultValue = "50")
     private Integer reconnectBackoffMs = 50;
+
+    // SSL
+    @UriParam(label = "common,security")
+    private SSLContextParameters sslContextParameters;
     // SSL
     // ssl.key.password
     @UriParam(label = "producer,security", secret = true)
@@ -264,6 +275,7 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, ProducerConfig.COMPRESSION_TYPE_CONFIG, 
getCompressionCodec());
         addPropertyIfNotNull(props, ProducerConfig.RETRIES_CONFIG, 
getRetries());
         // SSL
+        applySslConfiguration(props, getSslContextParameters());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, 
getSslKeyPassword());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
getSslKeystoreLocation());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
getSslKeystorePassword());
@@ -322,6 +334,7 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
getSessionTimeoutMs());
         addPropertyIfNotNull(props, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
getMaxPollRecords());
         // SSL
+        applySslConfiguration(props, getSslContextParameters());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, 
getSslKeyPassword());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
getSslKeystoreLocation());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
getSslKeystorePassword());
@@ -368,6 +381,54 @@ public class KafkaConfiguration {
         return props;
     }
 
+    /**
+     * Uses the standard camel {@link SSLContextParameters} object to fill the 
Kafka SSL properties
+     *
+     * @param props Kafka properties
+     * @param sslContextParameters SSL configuration
+     */
+    private void applySslConfiguration(Properties props, SSLContextParameters 
sslContextParameters) {
+        if (sslContextParameters != null) {
+            addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, 
sslContextParameters.getSecureSocketProtocol());
+            addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, 
sslContextParameters.getProvider());
+
+            CipherSuitesParameters cipherSuites = 
sslContextParameters.getCipherSuites();
+            if (cipherSuites != null) {
+                addCommaSeparatedList(props, 
SslConfigs.SSL_CIPHER_SUITES_CONFIG, cipherSuites.getCipherSuite());
+            }
+
+            SecureSocketProtocolsParameters secureSocketProtocols = 
sslContextParameters.getSecureSocketProtocols();
+            if (secureSocketProtocols != null) {
+                addCommaSeparatedList(props, 
SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
secureSocketProtocols.getSecureSocketProtocol());
+            }
+
+            KeyManagersParameters keyManagers = 
sslContextParameters.getKeyManagers();
+            if (keyManagers != null) {
+                addPropertyIfNotNull(props, 
SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, keyManagers.getAlgorithm());
+                addPropertyIfNotNull(props, 
SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyManagers.getKeyPassword());
+
+                KeyStoreParameters keyStore = keyManagers.getKeyStore();
+                if (keyStore != null) {
+                    addPropertyIfNotNull(props, 
SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, keyStore.getType());
+                    addPropertyIfNotNull(props, 
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStore.getResource());
+                    addPropertyIfNotNull(props, 
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStore.getPassword());
+                }
+            }
+
+            TrustManagersParameters trustManagers = 
sslContextParameters.getTrustManagers();
+            if (trustManagers != null) {
+                addPropertyIfNotNull(props, 
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, trustManagers.getAlgorithm());
+
+                KeyStoreParameters keyStore = trustManagers.getKeyStore();
+                if (keyStore != null) {
+                    addPropertyIfNotNull(props, 
SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, keyStore.getType());
+                    addPropertyIfNotNull(props, 
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, keyStore.getResource());
+                    addPropertyIfNotNull(props, 
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStore.getPassword());
+                }
+            }
+        }
+    }
+
     private static <T> void addPropertyIfNotNull(Properties props, String key, 
T value) {
         if (value != null) {
             // Kafka expects all properties as String
@@ -384,6 +445,12 @@ public class KafkaConfiguration {
         }
     }
 
+    private static void addCommaSeparatedList(Properties props, String key, 
List<String> values) {
+        if (values != null && !values.isEmpty()) {
+            props.put(key, values.stream().collect(Collectors.joining(",")));
+        }
+    }
+
     public String getGroupId() {
         return groupId;
     }
@@ -837,6 +904,17 @@ public class KafkaConfiguration {
         this.securityProtocol = securityProtocol;
     }
 
+    public SSLContextParameters getSslContextParameters() {
+        return sslContextParameters;
+    }
+
+    /**
+     * SSL configuration using a Camel {@link SSLContextParameters} object. If 
configured it's applied before the other SSL endpoint parameters.
+     */
+    public void setSslContextParameters(SSLContextParameters 
sslContextParameters) {
+        this.sslContextParameters = sslContextParameters;
+    }
+
     public String getSslKeyPassword() {
         return sslKeyPassword;
     }

Reply via email to