dongjoon-hyun commented on a change in pull request #24417: 
[SPARK-27522][SQL][TEST] Test migration from INT96 to TIMESTAMP_MICROS for 
timestamps in parquet
URL: https://github.com/apache/spark/pull/24417#discussion_r277177146
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 ##########
 @@ -881,6 +882,48 @@ class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedSQLContext
     }
   }
 
+  test("Migration from INT96 to TIMESTAMP_MICROS timestamp type") {
+    def testMigration(fromTsType: String, toTsType: String): Unit = {
+      def checkAppend(write: DataFrameWriter[_] => Unit, readback: => 
DataFrame): Unit = {
+        def data(start: Int, end: Int): Seq[Row] = (start to end).map { i =>
+          val ts = new java.sql.Timestamp(TimeUnit.SECONDS.toMillis(i))
+          ts.setNanos(123456000)
+          Row(ts)
+        }
+        val schema = new StructType().add("time", TimestampType)
+        val df1 = spark.createDataFrame(sparkContext.parallelize(data(0, 1)), 
schema)
+        withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fromTsType) {
+          write(df1.write)
+        }
+        val df2 = spark.createDataFrame(sparkContext.parallelize(data(2, 10)), 
schema)
+        withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> toTsType) {
+          write(df2.write.mode(SaveMode.Append))
+        }
+        Seq("true", "false").foreach { vectorized =>
+          withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorized) {
+            checkAnswer(readback, df1.unionAll(df2))
+          }
+        }
+      }
+
+      Seq(false, true).foreach { mergeSchema =>
+        withTempPath { file =>
+          checkAppend(_.parquet(file.getCanonicalPath),
+            spark.read.option("mergeSchema", 
mergeSchema).parquet(file.getCanonicalPath))
+        }
+
+        withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> 
mergeSchema.toString) {
+          val tableName = "parquet_timestamp_migration"
+          withTable(tableName) {
+            checkAppend(_.saveAsTable(tableName), spark.table(tableName))
+          }
+        }
+      }
+    }
+
+    testMigration(fromTsType = "INT96", toTsType = "TIMESTAMP_MICROS")
+    testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
 
 Review comment:
   Hi, @MaxGekk .
   Do we need to test `TIMESTAMP_MILLIS` together here for completeness because 
we have three types?
   ```scala
     object ParquetOutputTimestampType extends Enumeration {
       val INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS = Value
     }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to