This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 87ca6140c3b Enable ssl use for cluster in CassandraIO (#29302) 87ca6140c3b is described below commit 87ca6140c3b023daada11d60ea5a1142a6289b5d Author: Calvin Swenson Jr <142366140+niv-...@users.noreply.github.com> AuthorDate: Fri Nov 3 17:57:46 2023 -0700 Enable ssl use for cluster in CassandraIO (#29302) * Add properties / methods to facilitate optional ssl use Add properties / methods to facilitate optional ssl use by surfacing datastax driver functionality to programmatically configure ssl options when building Cluster. * Add comments with link to driver documentation for programmatic config Add comments with link to driver documentation for programmatic config of ssl * Apply suggested change to address spotless check failure. --- .../apache/beam/sdk/io/cassandra/CassandraIO.java | 55 ++++++++++++++++++++-- .../beam/sdk/io/cassandra/ConnectionManager.java | 3 +- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java index d33642b9c3a..1429253d194 100644 --- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java @@ -24,6 +24,7 @@ import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.PlainTextAuthProvider; import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.SSLOptions; import com.datastax.driver.core.Session; import com.datastax.driver.core.SocketOptions; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; @@ -192,6 +193,9 @@ public class CassandraIO { @Nullable abstract ValueProvider<Set<RingRange>> ringRanges(); + @Nullable + abstract ValueProvider<SSLOptions> sslOptions(); + abstract Builder<T> builder(); /** Specify the hosts of the Apache Cassandra instances. */ @@ -385,6 +389,22 @@ public class CassandraIO { return builder().setRingRanges(ringRange).build(); } + /** + * Optionally, specify {@link SSLOptions} configuration to utilize SSL. See + * https://docs.datastax.com/en/developer/java-driver/3.11/manual/ssl/#jsse-programmatic + */ + public Read<T> withSsl(SSLOptions sslOptions) { + return withSsl(ValueProvider.StaticValueProvider.of(sslOptions)); + } + + /** + * Optionally, specify {@link SSLOptions} configuration to utilize SSL. See + * https://docs.datastax.com/en/developer/java-driver/3.11/manual/ssl/#jsse-programmatic + */ + public Read<T> withSsl(ValueProvider<SSLOptions> sslOptions) { + return builder().setSslOptions(sslOptions).build(); + } + @Override public PCollection<T> expand(PBegin input) { checkArgument((hosts() != null && port() != null), "WithHosts() and withPort() are required"); @@ -422,7 +442,8 @@ public class CassandraIO { read.localDc(), read.consistencyLevel(), read.connectTimeout(), - read.readTimeout())) { + read.readTimeout(), + read.sslOptions())) { if (isMurmur3Partitioner(cluster)) { LOG.info("Murmur3Partitioner detected, splitting"); Integer splitCount; @@ -495,6 +516,8 @@ public class CassandraIO { abstract Builder<T> setRingRanges(ValueProvider<Set<RingRange>> ringRange); + abstract Builder<T> setSslOptions(ValueProvider<SSLOptions> sslOptions); + abstract Read<T> autoBuild(); public Read<T> build() { @@ -543,6 +566,8 @@ public class CassandraIO { abstract @Nullable ValueProvider<Integer> readTimeout(); + abstract @Nullable ValueProvider<SSLOptions> sslOptions(); + abstract @Nullable SerializableFunction<Session, Mapper> mapperFactoryFn(); abstract Builder<T> builder(); @@ -725,6 +750,22 @@ public class CassandraIO { return builder().setMapperFactoryFn(mapperFactoryFn).build(); } + /** + * Optionally, specify {@link SSLOptions} configuration to utilize SSL. See + * https://docs.datastax.com/en/developer/java-driver/3.11/manual/ssl/#jsse-programmatic + */ + public Write<T> withSsl(SSLOptions sslOptions) { + return withSsl(ValueProvider.StaticValueProvider.of(sslOptions)); + } + + /** + * Optionally, specify {@link SSLOptions} configuration to utilize SSL. See + * https://docs.datastax.com/en/developer/java-driver/3.11/manual/ssl/#jsse-programmatic + */ + public Write<T> withSsl(ValueProvider<SSLOptions> sslOptions) { + return builder().setSslOptions(sslOptions).build(); + } + @Override public void validate(PipelineOptions pipelineOptions) { checkState( @@ -799,6 +840,8 @@ public class CassandraIO { abstract Optional<SerializableFunction<Session, Mapper>> mapperFactoryFn(); + abstract Builder<T> setSslOptions(ValueProvider<SSLOptions> sslOptions); + abstract Write<T> autoBuild(); // not public public Write<T> build() { @@ -880,7 +923,8 @@ public class CassandraIO { ValueProvider<String> localDc, ValueProvider<String> consistencyLevel, ValueProvider<Integer> connectTimeout, - ValueProvider<Integer> readTimeout) { + ValueProvider<Integer> readTimeout, + ValueProvider<SSLOptions> sslOptions) { Cluster.Builder builder = Cluster.builder().addContactPoints(hosts.get().toArray(new String[0])).withPort(port.get()); @@ -913,6 +957,10 @@ public class CassandraIO { socketOptions.setReadTimeoutMillis(readTimeout.get()); } + if (sslOptions != null) { + builder.withSSL(sslOptions.get()); + } + return builder.build(); } @@ -941,7 +989,8 @@ public class CassandraIO { spec.localDc(), spec.consistencyLevel(), spec.connectTimeout(), - spec.readTimeout()); + spec.readTimeout(), + spec.sslOptions()); this.session = cluster.connect(spec.keyspace().get()); this.mapperFactoryFn = spec.mapperFactoryFn(); this.mutateFutures = new ArrayList<>(); diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ConnectionManager.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ConnectionManager.java index 21e7d257dca..962e8ad8ec0 100644 --- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ConnectionManager.java +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ConnectionManager.java @@ -71,7 +71,8 @@ public class ConnectionManager { read.localDc(), read.consistencyLevel(), read.connectTimeout(), - read.readTimeout())); + read.readTimeout(), + read.sslOptions())); return sessionMap.computeIfAbsent( readToSessionHash(read), k -> cluster.connect(Objects.requireNonNull(read.keyspace()).get()));