Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/19769#discussion_r151635947
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
---
@@ -87,4 +96,113 @@ class ParquetInteroperabilitySuite extends
ParquetCompatibilityTest with SharedS
Row(Seq(2, 3))))
}
}
+
+ val ImpalaFile = "test-data/impala_timestamp.parq"
+ test("parquet timestamp conversion") {
+ // Make a table with one parquet file written by impala, and one
parquet file written by spark.
+ // We should only adjust the timestamps in the impala file, and only
if the conf is set
+
+ // here's the timestamps in the impala file, as they were saved by
impala
+ val impalaFileData =
+ Seq(
+ "2001-01-01 01:01:01",
+ "2002-02-02 02:02:02",
+ "2003-03-03 03:03:03"
+ ).map { s => java.sql.Timestamp.valueOf(s) }
+ val impalaFile =
Thread.currentThread().getContextClassLoader.getResource(ImpalaFile)
+ .toURI.getPath
+ withTempPath { tableDir =>
+ val ts = Seq(
+ "2004-04-04 04:04:04",
+ "2005-05-05 05:05:05",
+ "2006-06-06 06:06:06"
+ ).map { s => java.sql.Timestamp.valueOf(s) }
+ val s = spark
+ import s.implicits._
+ // match the column names of the file from impala
+ val df =
spark.createDataset(ts).toDF().repartition(1).withColumnRenamed("value", "ts")
+ val schema = df.schema
+ df.write.parquet(tableDir.getAbsolutePath)
+ FileUtils.copyFile(new File(impalaFile), new File(tableDir,
"part-00001.parq"))
+
+ Seq(false, true).foreach { applyConversion =>
+ Seq(false, true).foreach { vectorized =>
+ withSQLConf(
+ (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key,
applyConversion.toString()),
+ (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key,
vectorized.toString())
+ ) {
+ val read =
spark.read.parquet(tableDir.getAbsolutePath).collect()
+ assert(read.size === 6)
+ // if we apply the conversion, we'll get the "right" values,
as saved by impala in the
+ // original file. Otherwise, they're off by the local
timezone offset, set to
+ // America/Los_Angeles in tests
+ val impalaExpectations = if (applyConversion) {
+ impalaFileData
+ } else {
+ impalaFileData.map { ts =>
+ DateTimeUtils.toJavaTimestamp(DateTimeUtils.convertTz(
+ DateTimeUtils.fromJavaTimestamp(ts),
+ TimeZone.getTimeZone("UTC"),
+ TimeZone.getDefault()))
+ }
+ }
+ val fullExpectations = (ts ++ impalaExpectations).map {
+ _.toString()
+ }.sorted.toArray
+ val actual = read.map {
+ _.getTimestamp(0).toString()
+ }.sorted
+ withClue(s"applyConversion = $applyConversion; vectorized =
$vectorized") {
+ assert(fullExpectations === actual)
+
+ // Now test that the behavior is still correct even with a
filter which could get
+ // pushed down into parquet. We don't need extra handling
for pushed down
+ // predicates because (a) in ParquetFilters, we ignore
TimestampType and (b) parquet
+ // does not read statistics from int96 fields, as they are
unsigned. See
+ // scalastyle:off line.size.limit
+ //
https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L419
+ //
https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L348
+ // scalastyle:on line.size.limit
+ //
+ // Just to be defensive in case anything ever changes in
parquet, this test checks
+ // the assumption on column stats, and also the end-to-end
behavior.
+
+ val hadoopConf = sparkContext.hadoopConfiguration
+ val fs = FileSystem.get(hadoopConf)
+ val parts = fs.listStatus(new
Path(tableDir.getAbsolutePath), new PathFilter {
+ override def accept(path: Path): Boolean =
!path.getName.startsWith("_")
+ })
+ // grab the meta data from the parquet file. The next
section of asserts just make
+ // sure the test is configured correctly.
+ assert(parts.size == 2)
+ parts.map { part =>
+ val oneFooter =
--- End diff --
I just found it shorter. I am fine.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]