This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e82fcea  CASSANDRA-19223: Column type mapping error for timestamp type 
during bulk writes
e82fcea is described below

commit e82fceaecfe5ea04ac3ddff92be5a6a41456333c
Author: Francisco Guerrero <fran...@apache.org>
AuthorDate: Thu Dec 21 11:42:14 2023 -0800

    CASSANDRA-19223: Column type mapping error for timestamp type during bulk 
writes
    
    Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19223
---
 CHANGES.txt                                        |   1 +
 .../spark/bulkwriter/SqlToCqlTypeConverter.java    |  82 ++++++++++--
 .../cassandra/spark/bulkwriter/TableSchema.java    |   4 +-
 .../cassandra/spark/data/CassandraDataLayer.java   |   2 +-
 .../testing/CassandraSidecarTestContext.java       |  18 ++-
 .../SharedClusterSparkIntegrationTestBase.java     |  66 ++--------
 .../analytics/SparkIntegrationTestBase.java        | 103 +++++++++++++++
 .../apache/cassandra/analytics/SparkTestUtils.java | 130 +++++++++++++++++++
 .../analytics/TimestampIntegrationTest.java        | 139 +++++++++++++++++++++
 .../cassandra/spark/reader/SchemaBuilder.java      |   2 +-
 10 files changed, 475 insertions(+), 72 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c66bea0..d04b0ff 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Column type mapping error for timestamp type during bulk writes 
(CASSANDRA-19223)
  * Speed up integration tests (CASSANDRA-19251)
  * Make bulk writer resilient to cluster resize events (CASSANDRA-18852)
  * Remove write option VALIDATE_SSTABLES to enforce validation 
(CASSANDRA-19199)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
index 5749a07..a19ca21 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
@@ -25,6 +25,7 @@ import java.math.BigInteger;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.sql.Timestamp;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -85,6 +86,8 @@ public final class SqlToCqlTypeConverter implements 
Serializable
     private static final BigDecimalConverter BIG_DECIMAL_CONVERTER = new 
BigDecimalConverter();
     private static final IntegerConverter INTEGER_CONVERTER = new 
IntegerConverter();
     private static final TimestampConverter TIMESTAMP_CONVERTER = new 
TimestampConverter();
+    private static final MicroSecondsTimestampConverter 
MICROSECONDS_TIMESTAMP_CONVERTER =
+    new MicroSecondsTimestampConverter();
     private static final TimeConverter TIME_CONVERTER = new TimeConverter();
     private static final UUIDConverter UUID_CONVERTER = new UUIDConverter();
     private static final BigIntegerConverter BIG_INTEGER_CONVERTER = new 
BigIntegerConverter();
@@ -171,14 +174,27 @@ public final class SqlToCqlTypeConverter implements 
Serializable
         }
     }
 
-    public static Converter<?> getIntegerConverter()
+    public static Converter<?> integerConverter()
     {
         return INTEGER_CONVERTER;
     }
 
-    public static Converter<?> getLongConverter()
+    public static Converter<?> microsecondsTimestampConverter()
     {
-        return LONG_CONVERTER;
+        return MICROSECONDS_TIMESTAMP_CONVERTER;
+    }
+
+    static boolean canConvertToLong(Object object)
+    {
+        return object instanceof Long
+               || object instanceof Integer
+               || object instanceof Short
+               || object instanceof Byte;
+    }
+
+    static long convertToLong(Object object)
+    {
+        return ((Number) object).longValue();
     }
 
     private static Converter<?> determineCustomConvert(CqlField.CqlCustom 
customType)
@@ -233,12 +249,9 @@ public final class SqlToCqlTypeConverter implements 
Serializable
         @Override
         public Long convertInternal(Object object)
         {
-            if (object instanceof Long
-                    || object instanceof Integer
-                    || object instanceof Short
-                    || object instanceof Byte)
+            if (canConvertToLong(object))
             {
-                return ((Number) object).longValue();
+                return convertToLong(object);
             }
             else
             {
@@ -358,8 +371,8 @@ public final class SqlToCqlTypeConverter implements 
Serializable
         public Integer convertInternal(Object object)
         {
             if (object instanceof Integer
-                    || object instanceof Short
-                    || object instanceof Byte)
+                || object instanceof Short
+                || object instanceof Byte)
             {
                 return ((Number) object).intValue();
             }
@@ -414,8 +427,57 @@ public final class SqlToCqlTypeConverter implements 
Serializable
         }
     }
 
+    @SuppressWarnings("serial")
+    static class MicroSecondsTimestampConverter extends Converter<Long>
+    {
+        /**
+         * Returns the time since epoch (January 1, 1970) in microseconds, as 
specified by
+         * <a
+         * 
href="https://docs.datastax.com/en/cql-oss/3.x/cql/cql_reference/cqlInsert.html#cqlInsert__timestamp-value";>
+         * the documentation</a>.
+         *
+         * @param object the input object to convert
+         * @return the time since epoch in microseconds
+         * @throws RuntimeException when the object cannot be converted to 
timestamp
+         */
+        @Override
+        public Long convertInternal(Object object) throws RuntimeException
+        {
+            if (object instanceof Date)
+            {
+                Instant instant = ((Date) object).toInstant();
+                return TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
+                       + TimeUnit.NANOSECONDS.toMicros(instant.getNano());
+            }
+            else if (canConvertToLong(object))
+            {
+                return convertToLong(object);
+            }
+            else
+            {
+                throw new RuntimeException("Unsupported conversion for 
TIMESTAMP from " + object.getClass().getTypeName());
+            }
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Timestamp";
+        }
+    }
+
     static class TimestampConverter extends NullableConverter<Date>
     {
+        /**
+         * Returns a Date representing the number of milliseconds since the 
standard base time known as the epoch
+         * (January 1 1970 at 00:00:00 GMT), as specified by <a
+         * 
href="https://docs.datastax.com/en/cql-oss/3.x/cql/cql_reference/timestamp_type_r.html";>the
+         * documentation</a>.
+         *
+         * @param object the input object to convert
+         * @return the Date
+         * @throws RuntimeException when the object cannot be converted to 
timestamp
+         */
         @Override
         public Date convertInternal(Object object) throws RuntimeException
         {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
index 72358eb..8170440 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
@@ -136,11 +136,11 @@ public class TableSchema implements Serializable
                      .map(fieldName -> {
                          if (fieldName.equals(ttlOption.columnName()))
                          {
-                             return 
SqlToCqlTypeConverter.getIntegerConverter();
+                             return SqlToCqlTypeConverter.integerConverter();
                          }
                          if (fieldName.equals(timestampOption.columnName()))
                          {
-                             return SqlToCqlTypeConverter.getLongConverter();
+                             return 
SqlToCqlTypeConverter.microsecondsTimestampConverter();
                          }
                          CqlField.CqlType cqlType = 
tableInfo.getColumnType(fieldName);
                          return SqlToCqlTypeConverter.getConverter(cqlType);
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index d761cfc..675cd1f 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -215,7 +215,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         LOGGER.info("Starting Cassandra Spark job snapshotName={} keyspace={} 
table={} dc={}",
                     snapshotName, keyspace, table, datacenter);
 
-        // Load cluster config from Discovery
+        // Load cluster config from options
         clusterConfig = initializeClusterConfig(options);
         initInstanceMap();
 
diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
index 15f8d12..02e4ea7 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.sidecar.testing;
 
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -225,12 +226,21 @@ public class CassandraSidecarTestContext implements 
AutoCloseable
         for (int i = 0; i < configs.size(); i++)
         {
             IInstanceConfig config = configs.get(i);
-            String hostName = JMXUtil.getJmxHost(config);
+            String ipAddress = JMXUtil.getJmxHost(config);
+            String hostName;
+            try
+            {
+                hostName = dnsResolver.reverseResolve(ipAddress);
+            }
+            catch (UnknownHostException e)
+            {
+                hostName = ipAddress;
+            }
             int nativeTransportPort = tryGetIntConfig(config, 
"native_transport_port", 9042);
             // The in-jvm dtest framework sometimes returns a cluster before 
all the jmx infrastructure is initialized.
             // In these cases, we want to wait longer than the default 
retry/delay settings to connect.
             JmxClient jmxClient = JmxClient.builder()
-                                           .host(hostName)
+                                           .host(ipAddress)
                                            .port(config.jmxPort())
                                            .connectionMaxRetries(20)
                                            .connectionRetryDelayMillis(1000L)
@@ -252,11 +262,11 @@ public class CassandraSidecarTestContext implements 
AutoCloseable
                                                                              
jmxClient,
                                                                              
new DriverUtils(),
                                                                              
"1.0-TEST",
-                                                                             
hostName,
+                                                                             
ipAddress,
                                                                              
nativeTransportPort);
             metadata.add(InstanceMetadataImpl.builder()
                                              .id(i + 1)
-                                             
.host(config.broadcastAddress().getAddress().getHostAddress())
+                                             .host(hostName)
                                              .port(nativeTransportPort)
                                              
.dataDirs(Arrays.asList(dataDirectories))
                                              .stagingDir(stagingDir)
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
index 4712b2c..699f705 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
@@ -20,7 +20,6 @@
 package org.apache.cassandra.analytics;
 
 import java.net.UnknownHostException;
-import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -30,19 +29,19 @@ import org.junit.jupiter.api.extension.ExtendWith;
 
 import io.vertx.junit5.VertxExtension;
 import org.apache.cassandra.distributed.shared.JMXUtil;
-import org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
-import org.apache.cassandra.spark.KryoRegister;
-import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
+import org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase;
 import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
 import org.apache.spark.sql.DataFrameReader;
 import org.apache.spark.sql.DataFrameWriter;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 
+import static 
org.apache.cassandra.analytics.SparkTestUtils.defaultBulkReaderDataFrame;
+import static 
org.apache.cassandra.analytics.SparkTestUtils.defaultBulkWriterDataFrameWriter;
+import static org.apache.cassandra.analytics.SparkTestUtils.defaultSparkConf;
+
 /**
  * Extends functionality from {@link SharedClusterIntegrationTestBase} and 
provides additional functionality for running
  * Spark integration tests.
@@ -63,32 +62,11 @@ public abstract class SharedClusterSparkIntegrationTestBase 
extends SharedCluste
      */
     protected DataFrameReader bulkReaderDataFrame(QualifiedName tableName)
     {
-        SparkConf sparkConf = getOrCreateSparkConf();
-        SparkSession spark = getOrCreateSparkSession();
-        SQLContext sql = spark.sqlContext();
-        SparkContext sc = spark.sparkContext();
-
-        int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1);
-        int numExecutors = 
sparkConf.getInt("spark.dynamicAllocation.maxExecutors",
-                                            
sparkConf.getInt("spark.executor.instances", 1));
-        int numCores = coresPerExecutor * numExecutors;
-
-        return sql.read()
-                  
.format("org.apache.cassandra.spark.sparksql.CassandraDataSource")
-                  .option("sidecar_instances", sidecarInstancesOption())
-                  .option("keyspace", tableName.keyspace()) // unquoted
-                  .option("table", tableName.table()) // unquoted
-                  .option("DC", "datacenter1")
-                  .option("snapshotName", UUID.randomUUID().toString())
-                  .option("createSnapshot", "true")
-                  // Shutdown hooks are called after the job ends, and in the 
case of integration tests
-                  // the sidecar is already shut down before this. Since the 
cluster will be torn
-                  // down anyway, the integration job skips clearing snapshots.
-                  .option("clearSnapshot", "false")
-                  .option("defaultParallelism", sc.defaultParallelism())
-                  .option("numCores", numCores)
-                  .option("sizing", "default")
-                  .option("sidecar_port", server.actualPort());
+        return defaultBulkReaderDataFrame(getOrCreateSparkConf(),
+                                          getOrCreateSparkSession(),
+                                          tableName,
+                                          sidecarInstancesOption(),
+                                          server.actualPort());
     }
 
     /**
@@ -101,16 +79,7 @@ public abstract class SharedClusterSparkIntegrationTestBase 
extends SharedCluste
      */
     protected DataFrameWriter<Row> bulkWriterDataFrameWriter(Dataset<Row> df, 
QualifiedName tableName)
     {
-        return df.write()
-                 
.format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
-                 .option("sidecar_instances", sidecarInstancesOption())
-                 .option("keyspace", tableName.keyspace())
-                 .option("table", tableName.table())
-                 .option("local_dc", "datacenter1")
-                 .option("bulk_writer_cl", "LOCAL_QUORUM")
-                 .option("number_splits", "-1")
-                 .option("sidecar_port", server.actualPort())
-                 .mode("append");
+        return defaultBulkWriterDataFrameWriter(df, tableName, 
sidecarInstancesOption(), server.actualPort());
     }
 
     /**
@@ -137,18 +106,7 @@ public abstract class 
SharedClusterSparkIntegrationTestBase extends SharedCluste
     {
         if (sparkConf == null)
         {
-            sparkConf = new SparkConf()
-                        .setAppName("Integration test Spark Cassandra Bulk 
Analytics Job")
-                        .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
-                        // Spark is not case-sensitive by default, but we want 
to make it case-sensitive for
-                        // the quoted identifiers tests where we test mixed 
case
-                        .set("spark.sql.caseSensitive", "True")
-                        .set("spark.master", "local[8,4]")
-                        
.set("spark.cassandra_analytics.sidecar.request.retries", "5")
-                        
.set("spark.cassandra_analytics.sidecar.request.retries.delay.milliseconds", 
"500")
-                        
.set("spark.cassandra_analytics.sidecar.request.retries.max.delay.milliseconds",
 "500");
-            BulkSparkConf.setupSparkConf(sparkConf, true);
-            KryoRegister.setup(sparkConf);
+            sparkConf = defaultSparkConf();
         }
         return sparkConf;
     }
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkIntegrationTestBase.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkIntegrationTestBase.java
new file mode 100644
index 0000000..40f57e2
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkIntegrationTestBase.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import static 
org.apache.cassandra.analytics.SparkTestUtils.defaultBulkReaderDataFrame;
+import static 
org.apache.cassandra.analytics.SparkTestUtils.defaultBulkWriterDataFrameWriter;
+import static org.apache.cassandra.analytics.SparkTestUtils.defaultSparkConf;
+
+/**
+ * Extends the {@link 
org.apache.cassandra.sidecar.testing.IntegrationTestBase} with Spark 
functionality for
+ * test classes.
+ */
+public class SparkIntegrationTestBase extends IntegrationTestBase
+{
+    protected SparkConf sparkConf;
+    protected SparkSession sparkSession;
+
+    /**
+     * A preconfigured {@link DataFrameReader} with pre-populated required 
options that can be overridden
+     * with additional options for every specific test.
+     *
+     * @param tableName the qualified name for the Cassandra table
+     * @return a {@link DataFrameReader} for Cassandra bulk reads
+     */
+    protected DataFrameReader bulkReaderDataFrame(QualifiedName tableName)
+    {
+        String sidecarInstances = sidecarTestContext.instancesConfig()
+                                                    .instances()
+                                                    .stream().map(f -> 
f.host())
+                                                    
.collect(Collectors.joining(","));
+        return defaultBulkReaderDataFrame(getOrCreateSparkConf(),
+                                          getOrCreateSparkSession(),
+                                          tableName,
+                                          sidecarInstances,
+                                          server.actualPort());
+    }
+
+    /**
+     * A preconfigured {@link DataFrameWriter} with pre-populated required 
options that can be overridden
+     * with additional options for every specific test.
+     *
+     * @param df        the source dataframe to write
+     * @param tableName the qualified name for the Cassandra table
+     * @return a {@link DataFrameWriter} for Cassandra bulk writes
+     */
+    protected DataFrameWriter<Row> bulkWriterDataFrameWriter(Dataset<Row> df, 
QualifiedName tableName)
+    {
+        String sidecarInstances = sidecarTestContext.instancesConfig()
+                                                    .instances()
+                                                    .stream().map(f -> 
f.host())
+                                                    
.collect(Collectors.joining(","));
+        return defaultBulkWriterDataFrameWriter(df, tableName, 
sidecarInstances, server.actualPort());
+    }
+
+    protected SparkConf getOrCreateSparkConf()
+    {
+        if (sparkConf == null)
+        {
+            sparkConf = defaultSparkConf();
+        }
+        return sparkConf;
+    }
+
+    protected SparkSession getOrCreateSparkSession()
+    {
+        if (sparkSession == null)
+        {
+            sparkSession = SparkSession
+                           .builder()
+                           .config(getOrCreateSparkConf())
+                           .getOrCreate();
+        }
+        return sparkSession;
+    }
+}
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
new file mode 100644
index 0000000..8e719f0
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.UUID;
+
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.spark.KryoRegister;
+import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Helper methods for Spark tests
+ */
+public final class SparkTestUtils
+{
+    private SparkTestUtils()
+    {
+        throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
+    }
+
+    /**
+     * Returns a {@link DataFrameReader} with default options for performing a 
bulk read test, including
+     * required parameters.
+     *
+     * @param sparkConf              the spark configuration to use
+     * @param spark                  the spark session to use
+     * @param tableName              the qualified name of the table
+     * @param sidecarInstancesOption the comma-separated list of sidecar 
instances
+     * @param sidecarPort            the sidecar port
+     * @return a {@link DataFrameReader} with default options for performing a 
bulk read test
+     */
+    public static DataFrameReader defaultBulkReaderDataFrame(SparkConf 
sparkConf,
+                                                             SparkSession 
spark,
+                                                             QualifiedName 
tableName,
+                                                             String 
sidecarInstancesOption,
+                                                             int sidecarPort)
+    {
+        SQLContext sql = spark.sqlContext();
+        SparkContext sc = spark.sparkContext();
+
+        int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1);
+        int numExecutors = 
sparkConf.getInt("spark.dynamicAllocation.maxExecutors", 
sparkConf.getInt("spark.executor.instances", 1));
+        int numCores = coresPerExecutor * numExecutors;
+
+        return 
sql.read().format("org.apache.cassandra.spark.sparksql.CassandraDataSource")
+                  .option("sidecar_instances", sidecarInstancesOption)
+                  .option("keyspace", tableName.keyspace()) // unquoted
+                  .option("table", tableName.table()) // unquoted
+                  .option("DC", "datacenter1")
+                  .option("snapshotName", UUID.randomUUID().toString())
+                  .option("createSnapshot", "true")
+                  // Shutdown hooks are called after the job ends, and in the 
case of integration tests
+                  // the sidecar is already shut down before this. Since the 
cluster will be torn
+                  // down anyway, the integration job skips clearing snapshots.
+                  .option("clearSnapshot", "false")
+                  .option("defaultParallelism", sc.defaultParallelism())
+                  .option("numCores", numCores)
+                  .option("sizing", "default")
+                  .option("sidecar_port", sidecarPort);
+    }
+
+    /**
+     * Returns a {@link DataFrameWriter<Row>} with default options for 
performing a bulk write test, including
+     * required parameters.
+     *
+     * @param df                     the source data frame
+     * @param tableName              the qualified name of the table
+     * @param sidecarInstancesOption the comma-separated list of sidecar 
instances
+     * @param sidecarPort            the sidecar port
+     * @return a {@link DataFrameWriter<Row>} with default options for 
performing a bulk write test
+     */
+    public static DataFrameWriter<Row> 
defaultBulkWriterDataFrameWriter(Dataset<Row> df,
+                                                                        
QualifiedName tableName,
+                                                                        String 
sidecarInstancesOption,
+                                                                        int 
sidecarPort)
+    {
+        return df.write()
+                 
.format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
+                 .option("sidecar_instances", sidecarInstancesOption)
+                 .option("keyspace", tableName.keyspace())
+                 .option("table", tableName.table())
+                 .option("local_dc", "datacenter1")
+                 .option("bulk_writer_cl", "LOCAL_QUORUM")
+                 .option("number_splits", "-1")
+                 .option("sidecar_port", sidecarPort)
+                 .mode("append");
+    }
+
+    public static SparkConf defaultSparkConf()
+    {
+        SparkConf sparkConf = new SparkConf()
+                              .setAppName("Integration test Spark Cassandra 
Bulk Analytics Job")
+                              .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+                              // Spark is not case-sensitive by default, but 
we want to make it case-sensitive for
+                              // the quoted identifiers tests where we test 
mixed case
+                              .set("spark.sql.caseSensitive", "True")
+                              .set("spark.master", "local[8,4]")
+                              
.set("spark.cassandra_analytics.sidecar.request.retries", "5")
+                              
.set("spark.cassandra_analytics.sidecar.request.retries.delay.milliseconds", 
"500")
+                              
.set("spark.cassandra_analytics.sidecar.request.retries.max.delay.milliseconds",
 "500");
+        BulkSparkConf.setupSparkConf(sparkConf, true);
+        KryoRegister.setup(sparkConf);
+        return sparkConf;
+    }
+}
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TimestampIntegrationTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TimestampIntegrationTest.java
new file mode 100644
index 0000000..57aeb9d
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TimestampIntegrationTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.junit5.VertxExtension;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.spark.bulkwriter.TimestampOption;
+import org.apache.cassandra.spark.bulkwriter.WriterOptions;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test for the Cassandra timestamps
+ */
+@ExtendWith(VertxExtension.class)
+class TimestampIntegrationTest extends SparkIntegrationTestBase
+{
+    public static final String CREATE_TABLE_SCHEMA = "CREATE TABLE IF NOT 
EXISTS %s " +
+                                                     "(id BIGINT PRIMARY KEY, 
course TEXT, marks BIGINT);";
+    public static final List<String> DATASET = Arrays.asList("a", "b", "c", 
"d", "e", "f", "g");
+
+    /**
+     * Reads from source table with timestamps, and then persist the read data 
to the target
+     * table using the timestamp as input
+     */
+    @CassandraIntegrationTest
+    void testReadingAndWritingTimestamp()
+    {
+        long desiredTimestamp = 1432815430948567L;
+        QualifiedName sourceTableName = uniqueTestTableFullName(TEST_KEYSPACE, 
"source_tbl");
+        QualifiedName targetTableName = uniqueTestTableFullName(TEST_KEYSPACE, 
"target_tbl");
+
+        createTestKeyspace(sourceTableName.maybeQuotedKeyspace(), 
ImmutableMap.of("datacenter1", 1));
+        createTestTable(String.format(CREATE_TABLE_SCHEMA, sourceTableName));
+        createTestTable(String.format(CREATE_TABLE_SCHEMA, targetTableName));
+        populateTable(sourceTableName, DATASET, desiredTimestamp);
+        
waitUntilSidecarPicksUpSchemaChange(sourceTableName.maybeQuotedKeyspace());
+        
waitUntilSidecarPicksUpSchemaChange(targetTableName.maybeQuotedKeyspace());
+
+        Dataset<Row> data = 
bulkReaderDataFrame(sourceTableName).option("lastModifiedColumnName", "lm")
+                                                                .load();
+        assertThat(data.count()).isEqualTo(DATASET.size());
+        List<Row> rowList = data.collectAsList().stream()
+                                .sorted(Comparator.comparing(row -> 
row.getLong(0)))
+                                .collect(Collectors.toList());
+
+        bulkWriterDataFrameWriter(data, 
targetTableName).option(WriterOptions.TIMESTAMP.name(), 
TimestampOption.perRow("lm"))
+                                                        .save();
+        validateWrites(targetTableName, rowList);
+    }
+
+    void validateWrites(QualifiedName tableName, List<Row> sourceData)
+    {
+        // build a set of entries read from Cassandra into a set
+        // the writetime function must read the timestamp specified for the 
test
+        // to ensure that the persisted timestamp is correct
+        String query = String.format("SELECT id, course, marks, 
WRITETIME(course) FROM %s;", tableName);
+        Set<String> actualEntries = 
Arrays.stream(sidecarTestContext.cassandraTestContext()
+                                                                    .cluster()
+                                                                    
.coordinator(1)
+                                                                    
.execute(query, ConsistencyLevel.LOCAL_QUORUM))
+                                          .map((Object[] columns) -> 
String.format("%s:%s:%s:%s",
+                                                                               
    columns[0],
+                                                                               
    columns[1],
+                                                                               
    columns[2],
+                                                                               
    columns[3]))
+                                          .collect(Collectors.toSet());
+
+        // Number of entries in Cassandra must match the original datasource
+        assertThat(actualEntries.size()).isEqualTo(sourceData.size());
+
+        // remove from actual entries to make sure that the data read is the 
same as the data written
+        sourceData.forEach(row -> {
+            Instant instant = row.getTimestamp(3).toInstant();
+            long timeInMicros = 
TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + 
TimeUnit.NANOSECONDS.toMicros(instant.getNano());
+            String key = String.format("%d:%s:%d:%s",
+                                       row.getLong(0),
+                                       row.getString(1),
+                                       row.getLong(2),
+                                       timeInMicros);
+            assertThat(actualEntries.remove(key)).as(key + " is expected to 
exist in the actual entries")
+                                                 .isTrue();
+        });
+
+        // If this fails, it means there was more data in the database than we 
expected
+        assertThat(actualEntries).as("All entries are expected to be read from 
database")
+                                 .isEmpty();
+    }
+
+    void populateTable(QualifiedName tableName, List<String> values, long 
desiredTimestamp)
+    {
+        ICoordinator coordinator = sidecarTestContext.cassandraTestContext()
+                                                     .cluster()
+                                                     .getFirstRunningInstance()
+                                                     .coordinator();
+        for (int i = 0; i < values.size(); i++)
+        {
+            String value = values.get(i);
+            String query = String.format("INSERT INTO %s (id, course, marks) 
VALUES (%d,'%s',%d) USING TIMESTAMP %d",
+                                         tableName, i, "course_" + value, 80 + 
i, desiredTimestamp);
+            coordinator.execute(query, ConsistencyLevel.ALL);
+        }
+    }
+}
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java
index 1349d11..7535030 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java
@@ -143,7 +143,7 @@ public class SchemaBuilder
     }
 
     // Update schema with the given keyspace, table and udt.
-    // It creates the cooresponding metadata and opens instances for keyspace 
and table, if needed.
+    // It creates the corresponding metadata and opens instances for keyspace 
and table, if needed.
     // At the end, it validates that the input keyspace and table both should 
have metadata exist and instance opened.
     private static Pair<KeyspaceMetadata, TableMetadata> updateSchema(Schema 
schema,
                                                                       String 
keyspace,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to