This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new e82fcea CASSANDRA-19223: Column type mapping error for timestamp type during bulk writes e82fcea is described below commit e82fceaecfe5ea04ac3ddff92be5a6a41456333c Author: Francisco Guerrero <fran...@apache.org> AuthorDate: Thu Dec 21 11:42:14 2023 -0800 CASSANDRA-19223: Column type mapping error for timestamp type during bulk writes Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19223 --- CHANGES.txt | 1 + .../spark/bulkwriter/SqlToCqlTypeConverter.java | 82 ++++++++++-- .../cassandra/spark/bulkwriter/TableSchema.java | 4 +- .../cassandra/spark/data/CassandraDataLayer.java | 2 +- .../testing/CassandraSidecarTestContext.java | 18 ++- .../SharedClusterSparkIntegrationTestBase.java | 66 ++-------- .../analytics/SparkIntegrationTestBase.java | 103 +++++++++++++++ .../apache/cassandra/analytics/SparkTestUtils.java | 130 +++++++++++++++++++ .../analytics/TimestampIntegrationTest.java | 139 +++++++++++++++++++++ .../cassandra/spark/reader/SchemaBuilder.java | 2 +- 10 files changed, 475 insertions(+), 72 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c66bea0..d04b0ff 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Column type mapping error for timestamp type during bulk writes (CASSANDRA-19223) * Speed up integration tests (CASSANDRA-19251) * Make bulk writer resilient to cluster resize events (CASSANDRA-18852) * Remove write option VALIDATE_SSTABLES to enforce validation (CASSANDRA-19199) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java index 5749a07..a19ca21 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java @@ -25,6 +25,7 @@ import java.math.BigInteger; import java.net.InetAddress; import java.nio.ByteBuffer; import java.sql.Timestamp; +import java.time.Instant; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -85,6 +86,8 @@ public final class SqlToCqlTypeConverter implements Serializable private static final BigDecimalConverter BIG_DECIMAL_CONVERTER = new BigDecimalConverter(); private static final IntegerConverter INTEGER_CONVERTER = new IntegerConverter(); private static final TimestampConverter TIMESTAMP_CONVERTER = new TimestampConverter(); + private static final MicroSecondsTimestampConverter MICROSECONDS_TIMESTAMP_CONVERTER = + new MicroSecondsTimestampConverter(); private static final TimeConverter TIME_CONVERTER = new TimeConverter(); private static final UUIDConverter UUID_CONVERTER = new UUIDConverter(); private static final BigIntegerConverter BIG_INTEGER_CONVERTER = new BigIntegerConverter(); @@ -171,14 +174,27 @@ public final class SqlToCqlTypeConverter implements Serializable } } - public static Converter<?> getIntegerConverter() + public static Converter<?> integerConverter() { return INTEGER_CONVERTER; } - public static Converter<?> getLongConverter() + public static Converter<?> microsecondsTimestampConverter() { - return LONG_CONVERTER; + return MICROSECONDS_TIMESTAMP_CONVERTER; + } + + static boolean canConvertToLong(Object object) + { + return object instanceof Long + || object instanceof Integer + || object instanceof Short + || object instanceof Byte; + } + + static long convertToLong(Object object) + { + return ((Number) object).longValue(); } private static Converter<?> determineCustomConvert(CqlField.CqlCustom customType) @@ -233,12 +249,9 @@ public final class SqlToCqlTypeConverter implements Serializable @Override public Long convertInternal(Object object) { - if (object instanceof Long - || object instanceof Integer - || object instanceof Short - || object instanceof Byte) + if (canConvertToLong(object)) { - return ((Number) object).longValue(); + return convertToLong(object); } else { @@ -358,8 +371,8 @@ public final class SqlToCqlTypeConverter implements Serializable public Integer convertInternal(Object object) { if (object instanceof Integer - || object instanceof Short - || object instanceof Byte) + || object instanceof Short + || object instanceof Byte) { return ((Number) object).intValue(); } @@ -414,8 +427,57 @@ public final class SqlToCqlTypeConverter implements Serializable } } + @SuppressWarnings("serial") + static class MicroSecondsTimestampConverter extends Converter<Long> + { + /** + * Returns the time since epoch (January 1, 1970) in microseconds, as specified by + * <a + * href="https://docs.datastax.com/en/cql-oss/3.x/cql/cql_reference/cqlInsert.html#cqlInsert__timestamp-value"> + * the documentation</a>. + * + * @param object the input object to convert + * @return the time since epoch in microseconds + * @throws RuntimeException when the object cannot be converted to timestamp + */ + @Override + public Long convertInternal(Object object) throws RuntimeException + { + if (object instanceof Date) + { + Instant instant = ((Date) object).toInstant(); + return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + + TimeUnit.NANOSECONDS.toMicros(instant.getNano()); + } + else if (canConvertToLong(object)) + { + return convertToLong(object); + } + else + { + throw new RuntimeException("Unsupported conversion for TIMESTAMP from " + object.getClass().getTypeName()); + } + } + + @Override + public String toString() + { + return "Timestamp"; + } + } + static class TimestampConverter extends NullableConverter<Date> { + /** + * Returns a Date representing the number of milliseconds since the standard base time known as the epoch + * (January 1 1970 at 00:00:00 GMT), as specified by <a + * href="https://docs.datastax.com/en/cql-oss/3.x/cql/cql_reference/timestamp_type_r.html">the + * documentation</a>. + * + * @param object the input object to convert + * @return the Date + * @throws RuntimeException when the object cannot be converted to timestamp + */ @Override public Date convertInternal(Object object) throws RuntimeException { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java index 72358eb..8170440 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java @@ -136,11 +136,11 @@ public class TableSchema implements Serializable .map(fieldName -> { if (fieldName.equals(ttlOption.columnName())) { - return SqlToCqlTypeConverter.getIntegerConverter(); + return SqlToCqlTypeConverter.integerConverter(); } if (fieldName.equals(timestampOption.columnName())) { - return SqlToCqlTypeConverter.getLongConverter(); + return SqlToCqlTypeConverter.microsecondsTimestampConverter(); } CqlField.CqlType cqlType = tableInfo.getColumnType(fieldName); return SqlToCqlTypeConverter.getConverter(cqlType); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index d761cfc..675cd1f 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -215,7 +215,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV LOGGER.info("Starting Cassandra Spark job snapshotName={} keyspace={} table={} dc={}", snapshotName, keyspace, table, datacenter); - // Load cluster config from Discovery + // Load cluster config from options clusterConfig = initializeClusterConfig(options); initInstanceMap(); diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java index 15f8d12..02e4ea7 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java @@ -19,6 +19,7 @@ package org.apache.cassandra.sidecar.testing; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; @@ -225,12 +226,21 @@ public class CassandraSidecarTestContext implements AutoCloseable for (int i = 0; i < configs.size(); i++) { IInstanceConfig config = configs.get(i); - String hostName = JMXUtil.getJmxHost(config); + String ipAddress = JMXUtil.getJmxHost(config); + String hostName; + try + { + hostName = dnsResolver.reverseResolve(ipAddress); + } + catch (UnknownHostException e) + { + hostName = ipAddress; + } int nativeTransportPort = tryGetIntConfig(config, "native_transport_port", 9042); // The in-jvm dtest framework sometimes returns a cluster before all the jmx infrastructure is initialized. // In these cases, we want to wait longer than the default retry/delay settings to connect. JmxClient jmxClient = JmxClient.builder() - .host(hostName) + .host(ipAddress) .port(config.jmxPort()) .connectionMaxRetries(20) .connectionRetryDelayMillis(1000L) @@ -252,11 +262,11 @@ public class CassandraSidecarTestContext implements AutoCloseable jmxClient, new DriverUtils(), "1.0-TEST", - hostName, + ipAddress, nativeTransportPort); metadata.add(InstanceMetadataImpl.builder() .id(i + 1) - .host(config.broadcastAddress().getAddress().getHostAddress()) + .host(hostName) .port(nativeTransportPort) .dataDirs(Arrays.asList(dataDirectories)) .stagingDir(stagingDir) diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java index 4712b2c..699f705 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java @@ -20,7 +20,6 @@ package org.apache.cassandra.analytics; import java.net.UnknownHostException; -import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -30,19 +29,19 @@ import org.junit.jupiter.api.extension.ExtendWith; import io.vertx.junit5.VertxExtension; import org.apache.cassandra.distributed.shared.JMXUtil; -import org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase; import org.apache.cassandra.sidecar.testing.QualifiedName; -import org.apache.cassandra.spark.KryoRegister; -import org.apache.cassandra.spark.bulkwriter.BulkSparkConf; +import org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase; import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; +import static org.apache.cassandra.analytics.SparkTestUtils.defaultBulkReaderDataFrame; +import static org.apache.cassandra.analytics.SparkTestUtils.defaultBulkWriterDataFrameWriter; +import static org.apache.cassandra.analytics.SparkTestUtils.defaultSparkConf; + /** * Extends functionality from {@link SharedClusterIntegrationTestBase} and provides additional functionality for running * Spark integration tests. @@ -63,32 +62,11 @@ public abstract class SharedClusterSparkIntegrationTestBase extends SharedCluste */ protected DataFrameReader bulkReaderDataFrame(QualifiedName tableName) { - SparkConf sparkConf = getOrCreateSparkConf(); - SparkSession spark = getOrCreateSparkSession(); - SQLContext sql = spark.sqlContext(); - SparkContext sc = spark.sparkContext(); - - int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1); - int numExecutors = sparkConf.getInt("spark.dynamicAllocation.maxExecutors", - sparkConf.getInt("spark.executor.instances", 1)); - int numCores = coresPerExecutor * numExecutors; - - return sql.read() - .format("org.apache.cassandra.spark.sparksql.CassandraDataSource") - .option("sidecar_instances", sidecarInstancesOption()) - .option("keyspace", tableName.keyspace()) // unquoted - .option("table", tableName.table()) // unquoted - .option("DC", "datacenter1") - .option("snapshotName", UUID.randomUUID().toString()) - .option("createSnapshot", "true") - // Shutdown hooks are called after the job ends, and in the case of integration tests - // the sidecar is already shut down before this. Since the cluster will be torn - // down anyway, the integration job skips clearing snapshots. - .option("clearSnapshot", "false") - .option("defaultParallelism", sc.defaultParallelism()) - .option("numCores", numCores) - .option("sizing", "default") - .option("sidecar_port", server.actualPort()); + return defaultBulkReaderDataFrame(getOrCreateSparkConf(), + getOrCreateSparkSession(), + tableName, + sidecarInstancesOption(), + server.actualPort()); } /** @@ -101,16 +79,7 @@ public abstract class SharedClusterSparkIntegrationTestBase extends SharedCluste */ protected DataFrameWriter<Row> bulkWriterDataFrameWriter(Dataset<Row> df, QualifiedName tableName) { - return df.write() - .format("org.apache.cassandra.spark.sparksql.CassandraDataSink") - .option("sidecar_instances", sidecarInstancesOption()) - .option("keyspace", tableName.keyspace()) - .option("table", tableName.table()) - .option("local_dc", "datacenter1") - .option("bulk_writer_cl", "LOCAL_QUORUM") - .option("number_splits", "-1") - .option("sidecar_port", server.actualPort()) - .mode("append"); + return defaultBulkWriterDataFrameWriter(df, tableName, sidecarInstancesOption(), server.actualPort()); } /** @@ -137,18 +106,7 @@ public abstract class SharedClusterSparkIntegrationTestBase extends SharedCluste { if (sparkConf == null) { - sparkConf = new SparkConf() - .setAppName("Integration test Spark Cassandra Bulk Analytics Job") - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - // Spark is not case-sensitive by default, but we want to make it case-sensitive for - // the quoted identifiers tests where we test mixed case - .set("spark.sql.caseSensitive", "True") - .set("spark.master", "local[8,4]") - .set("spark.cassandra_analytics.sidecar.request.retries", "5") - .set("spark.cassandra_analytics.sidecar.request.retries.delay.milliseconds", "500") - .set("spark.cassandra_analytics.sidecar.request.retries.max.delay.milliseconds", "500"); - BulkSparkConf.setupSparkConf(sparkConf, true); - KryoRegister.setup(sparkConf); + sparkConf = defaultSparkConf(); } return sparkConf; } diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkIntegrationTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkIntegrationTestBase.java new file mode 100644 index 0000000..40f57e2 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkIntegrationTestBase.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.stream.Collectors; + +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import static org.apache.cassandra.analytics.SparkTestUtils.defaultBulkReaderDataFrame; +import static org.apache.cassandra.analytics.SparkTestUtils.defaultBulkWriterDataFrameWriter; +import static org.apache.cassandra.analytics.SparkTestUtils.defaultSparkConf; + +/** + * Extends the {@link org.apache.cassandra.sidecar.testing.IntegrationTestBase} with Spark functionality for + * test classes. + */ +public class SparkIntegrationTestBase extends IntegrationTestBase +{ + protected SparkConf sparkConf; + protected SparkSession sparkSession; + + /** + * A preconfigured {@link DataFrameReader} with pre-populated required options that can be overridden + * with additional options for every specific test. + * + * @param tableName the qualified name for the Cassandra table + * @return a {@link DataFrameReader} for Cassandra bulk reads + */ + protected DataFrameReader bulkReaderDataFrame(QualifiedName tableName) + { + String sidecarInstances = sidecarTestContext.instancesConfig() + .instances() + .stream().map(f -> f.host()) + .collect(Collectors.joining(",")); + return defaultBulkReaderDataFrame(getOrCreateSparkConf(), + getOrCreateSparkSession(), + tableName, + sidecarInstances, + server.actualPort()); + } + + /** + * A preconfigured {@link DataFrameWriter} with pre-populated required options that can be overridden + * with additional options for every specific test. + * + * @param df the source dataframe to write + * @param tableName the qualified name for the Cassandra table + * @return a {@link DataFrameWriter} for Cassandra bulk writes + */ + protected DataFrameWriter<Row> bulkWriterDataFrameWriter(Dataset<Row> df, QualifiedName tableName) + { + String sidecarInstances = sidecarTestContext.instancesConfig() + .instances() + .stream().map(f -> f.host()) + .collect(Collectors.joining(",")); + return defaultBulkWriterDataFrameWriter(df, tableName, sidecarInstances, server.actualPort()); + } + + protected SparkConf getOrCreateSparkConf() + { + if (sparkConf == null) + { + sparkConf = defaultSparkConf(); + } + return sparkConf; + } + + protected SparkSession getOrCreateSparkSession() + { + if (sparkSession == null) + { + sparkSession = SparkSession + .builder() + .config(getOrCreateSparkConf()) + .getOrCreate(); + } + return sparkSession; + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java new file mode 100644 index 0000000..8e719f0 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.UUID; + +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.spark.KryoRegister; +import org.apache.cassandra.spark.bulkwriter.BulkSparkConf; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; + +/** + * Helper methods for Spark tests + */ +public final class SparkTestUtils +{ + private SparkTestUtils() + { + throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); + } + + /** + * Returns a {@link DataFrameReader} with default options for performing a bulk read test, including + * required parameters. + * + * @param sparkConf the spark configuration to use + * @param spark the spark session to use + * @param tableName the qualified name of the table + * @param sidecarInstancesOption the comma-separated list of sidecar instances + * @param sidecarPort the sidecar port + * @return a {@link DataFrameReader} with default options for performing a bulk read test + */ + public static DataFrameReader defaultBulkReaderDataFrame(SparkConf sparkConf, + SparkSession spark, + QualifiedName tableName, + String sidecarInstancesOption, + int sidecarPort) + { + SQLContext sql = spark.sqlContext(); + SparkContext sc = spark.sparkContext(); + + int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1); + int numExecutors = sparkConf.getInt("spark.dynamicAllocation.maxExecutors", sparkConf.getInt("spark.executor.instances", 1)); + int numCores = coresPerExecutor * numExecutors; + + return sql.read().format("org.apache.cassandra.spark.sparksql.CassandraDataSource") + .option("sidecar_instances", sidecarInstancesOption) + .option("keyspace", tableName.keyspace()) // unquoted + .option("table", tableName.table()) // unquoted + .option("DC", "datacenter1") + .option("snapshotName", UUID.randomUUID().toString()) + .option("createSnapshot", "true") + // Shutdown hooks are called after the job ends, and in the case of integration tests + // the sidecar is already shut down before this. Since the cluster will be torn + // down anyway, the integration job skips clearing snapshots. + .option("clearSnapshot", "false") + .option("defaultParallelism", sc.defaultParallelism()) + .option("numCores", numCores) + .option("sizing", "default") + .option("sidecar_port", sidecarPort); + } + + /** + * Returns a {@link DataFrameWriter<Row>} with default options for performing a bulk write test, including + * required parameters. + * + * @param df the source data frame + * @param tableName the qualified name of the table + * @param sidecarInstancesOption the comma-separated list of sidecar instances + * @param sidecarPort the sidecar port + * @return a {@link DataFrameWriter<Row>} with default options for performing a bulk write test + */ + public static DataFrameWriter<Row> defaultBulkWriterDataFrameWriter(Dataset<Row> df, + QualifiedName tableName, + String sidecarInstancesOption, + int sidecarPort) + { + return df.write() + .format("org.apache.cassandra.spark.sparksql.CassandraDataSink") + .option("sidecar_instances", sidecarInstancesOption) + .option("keyspace", tableName.keyspace()) + .option("table", tableName.table()) + .option("local_dc", "datacenter1") + .option("bulk_writer_cl", "LOCAL_QUORUM") + .option("number_splits", "-1") + .option("sidecar_port", sidecarPort) + .mode("append"); + } + + public static SparkConf defaultSparkConf() + { + SparkConf sparkConf = new SparkConf() + .setAppName("Integration test Spark Cassandra Bulk Analytics Job") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + // Spark is not case-sensitive by default, but we want to make it case-sensitive for + // the quoted identifiers tests where we test mixed case + .set("spark.sql.caseSensitive", "True") + .set("spark.master", "local[8,4]") + .set("spark.cassandra_analytics.sidecar.request.retries", "5") + .set("spark.cassandra_analytics.sidecar.request.retries.delay.milliseconds", "500") + .set("spark.cassandra_analytics.sidecar.request.retries.max.delay.milliseconds", "500"); + BulkSparkConf.setupSparkConf(sparkConf, true); + KryoRegister.setup(sparkConf); + return sparkConf; + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TimestampIntegrationTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TimestampIntegrationTest.java new file mode 100644 index 0000000..57aeb9d --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TimestampIntegrationTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.time.Instant; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.vertx.junit5.VertxExtension; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICoordinator; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.spark.bulkwriter.TimestampOption; +import org.apache.cassandra.spark.bulkwriter.WriterOptions; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test for the Cassandra timestamps + */ +@ExtendWith(VertxExtension.class) +class TimestampIntegrationTest extends SparkIntegrationTestBase +{ + public static final String CREATE_TABLE_SCHEMA = "CREATE TABLE IF NOT EXISTS %s " + + "(id BIGINT PRIMARY KEY, course TEXT, marks BIGINT);"; + public static final List<String> DATASET = Arrays.asList("a", "b", "c", "d", "e", "f", "g"); + + /** + * Reads from source table with timestamps, and then persist the read data to the target + * table using the timestamp as input + */ + @CassandraIntegrationTest + void testReadingAndWritingTimestamp() + { + long desiredTimestamp = 1432815430948567L; + QualifiedName sourceTableName = uniqueTestTableFullName(TEST_KEYSPACE, "source_tbl"); + QualifiedName targetTableName = uniqueTestTableFullName(TEST_KEYSPACE, "target_tbl"); + + createTestKeyspace(sourceTableName.maybeQuotedKeyspace(), ImmutableMap.of("datacenter1", 1)); + createTestTable(String.format(CREATE_TABLE_SCHEMA, sourceTableName)); + createTestTable(String.format(CREATE_TABLE_SCHEMA, targetTableName)); + populateTable(sourceTableName, DATASET, desiredTimestamp); + waitUntilSidecarPicksUpSchemaChange(sourceTableName.maybeQuotedKeyspace()); + waitUntilSidecarPicksUpSchemaChange(targetTableName.maybeQuotedKeyspace()); + + Dataset<Row> data = bulkReaderDataFrame(sourceTableName).option("lastModifiedColumnName", "lm") + .load(); + assertThat(data.count()).isEqualTo(DATASET.size()); + List<Row> rowList = data.collectAsList().stream() + .sorted(Comparator.comparing(row -> row.getLong(0))) + .collect(Collectors.toList()); + + bulkWriterDataFrameWriter(data, targetTableName).option(WriterOptions.TIMESTAMP.name(), TimestampOption.perRow("lm")) + .save(); + validateWrites(targetTableName, rowList); + } + + void validateWrites(QualifiedName tableName, List<Row> sourceData) + { + // build a set of entries read from Cassandra into a set + // the writetime function must read the timestamp specified for the test + // to ensure that the persisted timestamp is correct + String query = String.format("SELECT id, course, marks, WRITETIME(course) FROM %s;", tableName); + Set<String> actualEntries = Arrays.stream(sidecarTestContext.cassandraTestContext() + .cluster() + .coordinator(1) + .execute(query, ConsistencyLevel.LOCAL_QUORUM)) + .map((Object[] columns) -> String.format("%s:%s:%s:%s", + columns[0], + columns[1], + columns[2], + columns[3])) + .collect(Collectors.toSet()); + + // Number of entries in Cassandra must match the original datasource + assertThat(actualEntries.size()).isEqualTo(sourceData.size()); + + // remove from actual entries to make sure that the data read is the same as the data written + sourceData.forEach(row -> { + Instant instant = row.getTimestamp(3).toInstant(); + long timeInMicros = TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(instant.getNano()); + String key = String.format("%d:%s:%d:%s", + row.getLong(0), + row.getString(1), + row.getLong(2), + timeInMicros); + assertThat(actualEntries.remove(key)).as(key + " is expected to exist in the actual entries") + .isTrue(); + }); + + // If this fails, it means there was more data in the database than we expected + assertThat(actualEntries).as("All entries are expected to be read from database") + .isEmpty(); + } + + void populateTable(QualifiedName tableName, List<String> values, long desiredTimestamp) + { + ICoordinator coordinator = sidecarTestContext.cassandraTestContext() + .cluster() + .getFirstRunningInstance() + .coordinator(); + for (int i = 0; i < values.size(); i++) + { + String value = values.get(i); + String query = String.format("INSERT INTO %s (id, course, marks) VALUES (%d,'%s',%d) USING TIMESTAMP %d", + tableName, i, "course_" + value, 80 + i, desiredTimestamp); + coordinator.execute(query, ConsistencyLevel.ALL); + } + } +} diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java index 1349d11..7535030 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java @@ -143,7 +143,7 @@ public class SchemaBuilder } // Update schema with the given keyspace, table and udt. - // It creates the cooresponding metadata and opens instances for keyspace and table, if needed. + // It creates the corresponding metadata and opens instances for keyspace and table, if needed. // At the end, it validates that the input keyspace and table both should have metadata exist and instance opened. private static Pair<KeyspaceMetadata, TableMetadata> updateSchema(Schema schema, String keyspace, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org