MaxGekk commented on a change in pull request #28137:
URL: https://github.com/apache/spark/pull/28137#discussion_r411897679
##########
File path:
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
##########
@@ -1531,15 +1530,63 @@ abstract class AvroSuite extends QueryTest with
SharedSparkSession {
test("SPARK-31183: compatibility with Spark 2.4 in reading
dates/timestamps") {
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
- checkAnswer(
- readResourceAvroFile("before_1582_date_v2_4.avro"),
- Row(java.sql.Date.valueOf("1001-01-01")))
- checkAnswer(
- readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
- checkAnswer(
- readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
+ // test reading the existing 2.4 files and 3.0 newly written files
together.
+ withTempPath { path =>
+ val path2_4 = getResourceAvroFilePath("before_1582_date_v2_4.avro")
+ val path3_0 = path.getCanonicalPath
+ val dateStr = "1001-01-01"
+ Seq(dateStr).toDF("str").select($"str".cast("date").as("date"))
Review comment:
If you test this things together, I would write by Spark 3.0 with rebase
on as well
##########
File path:
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
##########
@@ -1531,15 +1530,63 @@ abstract class AvroSuite extends QueryTest with
SharedSparkSession {
test("SPARK-31183: compatibility with Spark 2.4 in reading
dates/timestamps") {
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
- checkAnswer(
- readResourceAvroFile("before_1582_date_v2_4.avro"),
- Row(java.sql.Date.valueOf("1001-01-01")))
- checkAnswer(
- readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
- checkAnswer(
- readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
+ // test reading the existing 2.4 files and 3.0 newly written files
together.
+ withTempPath { path =>
+ val path2_4 = getResourceAvroFilePath("before_1582_date_v2_4.avro")
+ val path3_0 = path.getCanonicalPath
+ val dateStr = "1001-01-01"
+ Seq(dateStr).toDF("str").select($"str".cast("date").as("date"))
+ .write.format("avro").save(path3_0)
+ checkAnswer(
+ spark.read.format("avro").load(path2_4, path3_0),
+ Seq(
+ Row(java.sql.Date.valueOf(dateStr)),
+ Row(java.sql.Date.valueOf(dateStr))))
+ }
+
+ withTempPath { path =>
+ val path2_4 =
getResourceAvroFilePath("before_1582_ts_micros_v2_4.avro")
+ val path3_0 = path.getCanonicalPath
+ val avroSchema =
+ """
+ |{
+ | "type" : "record",
+ | "name" : "test_schema",
+ | "fields" : [
+ | {"name": "ts", "type": {"type": "long", "logicalType":
"timestamp-micros"}}
+ | ]
+ |}""".stripMargin
+ val tsStr = "1001-01-01 01:02:03.123456"
+ Seq(tsStr).toDF("str").select($"str".cast("timestamp").as("ts"))
+ .write.format("avro").option("avroSchema", avroSchema).save(path3_0)
+ checkAnswer(
+ spark.read.format("avro").load(path2_4, path3_0),
+ Seq(
+ Row(java.sql.Timestamp.valueOf(tsStr)),
+ Row(java.sql.Timestamp.valueOf(tsStr))))
+ }
+
+ withTempPath { path =>
Review comment:
It seems it possible to write a generic function which takes as
parameters: before_1582_ts_millis_v2_4.avro, timestamp-millis, 1001-01-01
01:02:03.124
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
##########
@@ -31,16 +31,20 @@ import org.apache.spark.sql.types.StructType
* @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
* @param schemaConverter A Parquet-Catalyst schema converter that helps
initializing row converters
+ * @param convertTz the optional time zone to convert to for int96 data
Review comment:
`to for`. some of them could be removed?
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/package.scala
##########
@@ -54,4 +54,10 @@ package object sql {
* Note that Hive table property `spark.sql.create.version` also has Spark
version.
*/
private[sql] val SPARK_VERSION_METADATA_KEY = "org.apache.spark.version"
+
+ /**
+ * Parquet/Avro file metadata key to indicate that the file was written with
legacy datetime
+ * values.
+ */
+ private[sql] val SPARK_LEGACY_DATETIME = "org.apache.spark.legacyDatetime"
Review comment:
not important but we use `DateTime` in other places. How about
`org.apache.spark.legacyDateTime`
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
##########
@@ -885,16 +886,55 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
Seq(false, true).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key ->
vectorized.toString) {
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key ->
"true") {
- checkAnswer(
-
readResourceParquetFile("test-data/before_1582_date_v2_4.snappy.parquet"),
- Row(java.sql.Date.valueOf("1001-01-01")))
- checkAnswer(readResourceParquetFile(
- "test-data/before_1582_timestamp_micros_v2_4.snappy.parquet"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
- checkAnswer(readResourceParquetFile(
- "test-data/before_1582_timestamp_millis_v2_4.snappy.parquet"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123")))
+ // test reading the existing 2.4 files and 3.0 newly written files
together.
+ withTempPath { path =>
+ val path2_4 = getResourceParquetFilePath(
+ "test-data/before_1582_date_v2_4.snappy.parquet")
+ val path3_0 = path.getCanonicalPath
+ val dateStr = "1001-01-01"
+ Seq(dateStr).toDF("str").select($"str".cast("date").as("date"))
+ .write.parquet(path3_0)
+ checkAnswer(
+ spark.read.format("parquet").load(path2_4, path3_0),
+ Seq(
+ Row(java.sql.Date.valueOf(dateStr)),
+ Row(java.sql.Date.valueOf(dateStr))))
+ }
+
+ withTempPath { path =>
+ withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
"TIMESTAMP_MICROS") {
+ val path2_4 = getResourceParquetFilePath(
+ "test-data/before_1582_timestamp_micros_v2_4.snappy.parquet")
+ val path3_0 = path.getCanonicalPath
+ val tsStr = "1001-01-01 01:02:03.123456"
+ Seq(tsStr).toDF("str").select($"str".cast("timestamp").as("ts"))
+ .write.parquet(path3_0)
+ checkAnswer(
+ spark.read.format("parquet").load(path2_4, path3_0),
+ Seq(
+ Row(java.sql.Timestamp.valueOf(tsStr)),
+ Row(java.sql.Timestamp.valueOf(tsStr))))
+ }
+ }
+
+ withTempPath { path =>
Review comment:
The same comment as for Avro. I think we could fold the tests
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]