Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19769#discussion_r151634730
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
---
@@ -87,4 +95,107 @@ class ParquetInteroperabilitySuite extends
ParquetCompatibilityTest with SharedS
Row(Seq(2, 3))))
}
}
+
+ test("parquet timestamp conversion") {
+ // Make a table with one parquet file written by impala, and one
parquet file written by spark.
+ // We should only adjust the timestamps in the impala file, and only
if the conf is set
+ val impalaFile = "test-data/impala_timestamp.parq"
+
+ // here are the timestamps in the impala file, as they were saved by
impala
+ val impalaFileData =
+ Seq(
+ "2001-01-01 01:01:01",
+ "2002-02-02 02:02:02",
+ "2003-03-03 03:03:03"
+ ).map { s => java.sql.Timestamp.valueOf(s) }
+ val impalaPath =
Thread.currentThread().getContextClassLoader.getResource(impalaFile)
+ .toURI.getPath
+ withTempPath { tableDir =>
+ val ts = Seq(
+ "2004-04-04 04:04:04",
+ "2005-05-05 05:05:05",
+ "2006-06-06 06:06:06"
+ ).map { s => java.sql.Timestamp.valueOf(s) }
+ import testImplicits._
+ // match the column names of the file from impala
+ val df =
spark.createDataset(ts).toDF().repartition(1).withColumnRenamed("value", "ts")
+ df.write.parquet(tableDir.getAbsolutePath)
+ FileUtils.copyFile(new File(impalaPath), new File(tableDir,
"part-00001.parq"))
+
+ Seq(false, true).foreach { int96TimestampConversion =>
+ Seq(false, true).foreach { vectorized =>
+ withSQLConf(
+ (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key,
int96TimestampConversion.toString()),
+ (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key,
vectorized.toString())
+ ) {
+ val readBack =
spark.read.parquet(tableDir.getAbsolutePath).collect()
+ assert(readBack.size === 6)
+ // if we apply the conversion, we'll get the "right" values,
as saved by impala in the
+ // original file. Otherwise, they're off by the local
timezone offset, set to
+ // America/Los_Angeles in tests
+ val impalaExpectations = if (int96TimestampConversion) {
+ impalaFileData
+ } else {
+ impalaFileData.map { ts =>
+ DateTimeUtils.toJavaTimestamp(DateTimeUtils.convertTz(
+ DateTimeUtils.fromJavaTimestamp(ts),
+ DateTimeUtils.TimeZoneUTC,
+ DateTimeUtils.getTimeZone(conf.sessionLocalTimeZone)))
+ }
+ }
+ val fullExpectations = (ts ++
impalaExpectations).map(_.toString).sorted.toArray
+ val actual = readBack.map(_.getTimestamp(0).toString).sorted
+ withClue(s"applyConversion = $int96TimestampConversion;
vectorized = $vectorized") {
+ assert(fullExpectations === actual)
+
+ // Now test that the behavior is still correct even with a
filter which could get
+ // pushed down into parquet. We don't need extra handling
for pushed down
+ // predicates because (a) in ParquetFilters, we ignore
TimestampType and (b) parquet
+ // does not read statistics from int96 fields, as they are
unsigned. See
+ // scalastyle:off line.size.limit
+ //
https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L419
+ //
https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L348
+ // scalastyle:on line.size.limit
+ //
+ // Just to be defensive in case anything ever changes in
parquet, this test checks
+ // the assumption on column stats, and also the end-to-end
behavior.
+
+ val hadoopConf = sparkContext.hadoopConfiguration
+ val fs = FileSystem.get(hadoopConf)
+ val parts = fs.listStatus(new
Path(tableDir.getAbsolutePath), new PathFilter {
+ override def accept(path: Path): Boolean =
!path.getName.startsWith("_")
+ })
+ // grab the meta data from the parquet file. The next
section of asserts just make
+ // sure the test is configured correctly.
+ assert(parts.size == 2)
+ parts.map { part =>
--- End diff --
`map` -> `foreach`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]