This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 87ca6140c3b Enable ssl use for cluster in CassandraIO (#29302)
87ca6140c3b is described below

commit 87ca6140c3b023daada11d60ea5a1142a6289b5d
Author: Calvin Swenson Jr <142366140+niv-...@users.noreply.github.com>
AuthorDate: Fri Nov 3 17:57:46 2023 -0700

    Enable ssl use for cluster in CassandraIO (#29302)
    
    * Add properties / methods to facilitate optional ssl use
    
    Add properties / methods to facilitate optional ssl use by surfacing 
datastax driver functionality to programmatically configure ssl options when 
building Cluster.
    
    * Add comments with link to driver documentation for programmatic config
    
    Add comments with link to driver documentation for programmatic config of 
ssl
    
    * Apply suggested change to address spotless check failure.
---
 .../apache/beam/sdk/io/cassandra/CassandraIO.java  | 55 ++++++++++++++++++++--
 .../beam/sdk/io/cassandra/ConnectionManager.java   |  3 +-
 2 files changed, 54 insertions(+), 4 deletions(-)

diff --git 
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
 
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index d33642b9c3a..1429253d194 100644
--- 
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ 
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -24,6 +24,7 @@ import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.PlainTextAuthProvider;
 import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.SSLOptions;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.SocketOptions;
 import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
@@ -192,6 +193,9 @@ public class CassandraIO {
     @Nullable
     abstract ValueProvider<Set<RingRange>> ringRanges();
 
+    @Nullable
+    abstract ValueProvider<SSLOptions> sslOptions();
+
     abstract Builder<T> builder();
 
     /** Specify the hosts of the Apache Cassandra instances. */
@@ -385,6 +389,22 @@ public class CassandraIO {
       return builder().setRingRanges(ringRange).build();
     }
 
+    /**
+     * Optionally, specify {@link SSLOptions} configuration to utilize SSL. See
+     * 
https://docs.datastax.com/en/developer/java-driver/3.11/manual/ssl/#jsse-programmatic
+     */
+    public Read<T> withSsl(SSLOptions sslOptions) {
+      return withSsl(ValueProvider.StaticValueProvider.of(sslOptions));
+    }
+
+    /**
+     * Optionally, specify {@link SSLOptions} configuration to utilize SSL. See
+     * 
https://docs.datastax.com/en/developer/java-driver/3.11/manual/ssl/#jsse-programmatic
+     */
+    public Read<T> withSsl(ValueProvider<SSLOptions> sslOptions) {
+      return builder().setSslOptions(sslOptions).build();
+    }
+
     @Override
     public PCollection<T> expand(PBegin input) {
       checkArgument((hosts() != null && port() != null), "WithHosts() and 
withPort() are required");
@@ -422,7 +442,8 @@ public class CassandraIO {
                 read.localDc(),
                 read.consistencyLevel(),
                 read.connectTimeout(),
-                read.readTimeout())) {
+                read.readTimeout(),
+                read.sslOptions())) {
           if (isMurmur3Partitioner(cluster)) {
             LOG.info("Murmur3Partitioner detected, splitting");
             Integer splitCount;
@@ -495,6 +516,8 @@ public class CassandraIO {
 
       abstract Builder<T> setRingRanges(ValueProvider<Set<RingRange>> 
ringRange);
 
+      abstract Builder<T> setSslOptions(ValueProvider<SSLOptions> sslOptions);
+
       abstract Read<T> autoBuild();
 
       public Read<T> build() {
@@ -543,6 +566,8 @@ public class CassandraIO {
 
     abstract @Nullable ValueProvider<Integer> readTimeout();
 
+    abstract @Nullable ValueProvider<SSLOptions> sslOptions();
+
     abstract @Nullable SerializableFunction<Session, Mapper> mapperFactoryFn();
 
     abstract Builder<T> builder();
@@ -725,6 +750,22 @@ public class CassandraIO {
       return builder().setMapperFactoryFn(mapperFactoryFn).build();
     }
 
+    /**
+     * Optionally, specify {@link SSLOptions} configuration to utilize SSL. See
+     * 
https://docs.datastax.com/en/developer/java-driver/3.11/manual/ssl/#jsse-programmatic
+     */
+    public Write<T> withSsl(SSLOptions sslOptions) {
+      return withSsl(ValueProvider.StaticValueProvider.of(sslOptions));
+    }
+
+    /**
+     * Optionally, specify {@link SSLOptions} configuration to utilize SSL. See
+     * 
https://docs.datastax.com/en/developer/java-driver/3.11/manual/ssl/#jsse-programmatic
+     */
+    public Write<T> withSsl(ValueProvider<SSLOptions> sslOptions) {
+      return builder().setSslOptions(sslOptions).build();
+    }
+
     @Override
     public void validate(PipelineOptions pipelineOptions) {
       checkState(
@@ -799,6 +840,8 @@ public class CassandraIO {
 
       abstract Optional<SerializableFunction<Session, Mapper>> 
mapperFactoryFn();
 
+      abstract Builder<T> setSslOptions(ValueProvider<SSLOptions> sslOptions);
+
       abstract Write<T> autoBuild(); // not public
 
       public Write<T> build() {
@@ -880,7 +923,8 @@ public class CassandraIO {
       ValueProvider<String> localDc,
       ValueProvider<String> consistencyLevel,
       ValueProvider<Integer> connectTimeout,
-      ValueProvider<Integer> readTimeout) {
+      ValueProvider<Integer> readTimeout,
+      ValueProvider<SSLOptions> sslOptions) {
 
     Cluster.Builder builder =
         Cluster.builder().addContactPoints(hosts.get().toArray(new 
String[0])).withPort(port.get());
@@ -913,6 +957,10 @@ public class CassandraIO {
       socketOptions.setReadTimeoutMillis(readTimeout.get());
     }
 
+    if (sslOptions != null) {
+      builder.withSSL(sslOptions.get());
+    }
+
     return builder.build();
   }
 
@@ -941,7 +989,8 @@ public class CassandraIO {
               spec.localDc(),
               spec.consistencyLevel(),
               spec.connectTimeout(),
-              spec.readTimeout());
+              spec.readTimeout(),
+              spec.sslOptions());
       this.session = cluster.connect(spec.keyspace().get());
       this.mapperFactoryFn = spec.mapperFactoryFn();
       this.mutateFutures = new ArrayList<>();
diff --git 
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ConnectionManager.java
 
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ConnectionManager.java
index 21e7d257dca..962e8ad8ec0 100644
--- 
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ConnectionManager.java
+++ 
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ConnectionManager.java
@@ -71,7 +71,8 @@ public class ConnectionManager {
                     read.localDc(),
                     read.consistencyLevel(),
                     read.connectTimeout(),
-                    read.readTimeout()));
+                    read.readTimeout(),
+                    read.sslOptions()));
     return sessionMap.computeIfAbsent(
         readToSessionHash(read),
         k -> cluster.connect(Objects.requireNonNull(read.keyspace()).get()));

Reply via email to