This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 57849a5  [SPARK-36034][SQL] Rebase datetime in pushed down filters to 
parquet
57849a5 is described below

commit 57849a54da39c337fc7209404815e8980e83e03b
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Thu Jul 15 22:21:57 2021 +0300

    [SPARK-36034][SQL] Rebase datetime in pushed down filters to parquet
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to propagate either the SQL config 
`spark.sql.parquet.datetimeRebaseModeInRead` or/and Parquet option 
`datetimeRebaseMode` to `ParquetFilters`. The `ParquetFilters` class uses the 
settings in conversions of dates/timestamps instances from datasource filters 
to values pushed via `FilterApi` to the `parquet-column` lib.
    
    Before the changes, date/timestamp values expressed as 
days/microseconds/milliseconds are interpreted as offsets in Proleptic 
Gregorian calendar, and pushed to the parquet library as is. That works fine if 
timestamp/dates values in parquet files were saved in the `CORRECTED` mode but 
in the `LEGACY` mode, filter's values could not match to actual values.
    
    After the changes, timestamp/dates values of filters pushed down to parquet 
libs such as `FilterApi.eq(col1, -719162)` are rebased according the rebase 
settings. For the example, if the rebase mode is `CORRECTED`, **-719162** is 
pushed down as is but if the current rebase mode is `LEGACY`, the number of 
days is rebased to **-719164**. For more context, the PR description 
https://github.com/apache/spark/pull/28067 shows the diffs between two 
calendars.
    
    ### Why are the changes needed?
    The changes fix the bug portrayed by the following example from SPARK-36034:
    ```scala
    In [27]: 
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
    >>> spark.sql("SELECT DATE '0001-01-01' AS 
date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
    >>> spark.read.parquet("date_written_by_spark3_legacy").where("date = 
'0001-01-01'").show()
    +----+
    |date|
    +----+
    +----+
    ```
    The result must have the date value `0001-01-01`.
    
    ### Does this PR introduce _any_ user-facing change?
    In some sense, yes. Query results can be different in some cases. For the 
example above:
    ```scala
    scala> spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", 
"LEGACY")
    scala> spark.sql("SELECT DATE '0001-01-01' AS 
date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
    scala> spark.read.parquet("date_written_by_spark3_legacy").where("date = 
'0001-01-01'").show(false)
    +----------+
    |date      |
    +----------+
    |0001-01-01|
    +----------+
    ```
    
    ### How was this patch tested?
    By running the modified test suite `ParquetFilterSuite`:
    ```
    $ build/sbt "test:testOnly *ParquetV1FilterSuite"
    $ build/sbt "test:testOnly *ParquetV2FilterSuite"
    ```
    
    Closes #33347 from MaxGekk/fix-parquet-ts-filter-pushdown.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
    (cherry picked from commit b09b7f7cc024d3054debd7bdb51caec3b53764d7)
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../datasources/parquet/ParquetFileFormat.scala    |  17 ++-
 .../datasources/parquet/ParquetFilters.scala       |  29 +++-
 .../v2/parquet/ParquetPartitionReaderFactory.scala |  17 ++-
 .../v2/parquet/ParquetScanBuilder.scala            |  14 +-
 .../datasources/parquet/ParquetFilterSuite.scala   | 164 +++++++++++----------
 5 files changed, 145 insertions(+), 96 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 48e2e6e..ee229a3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -266,11 +266,21 @@ class ParquetFileFormat
 
       lazy val footerFileMetaData =
         ParquetFooterReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
+      val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+        footerFileMetaData.getKeyValueMetaData.get,
+        datetimeRebaseModeInRead)
       // Try to push down filters when filter push-down is enabled.
       val pushed = if (enableParquetFilterPushDown) {
         val parquetSchema = footerFileMetaData.getSchema
-        val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, 
pushDownTimestamp,
-          pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, 
isCaseSensitive)
+        val parquetFilters = new ParquetFilters(
+          parquetSchema,
+          pushDownDate,
+          pushDownTimestamp,
+          pushDownDecimal,
+          pushDownStringStartWith,
+          pushDownInFilterThreshold,
+          isCaseSensitive,
+          datetimeRebaseMode)
         filters
           // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
           // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
@@ -296,9 +306,6 @@ class ParquetFileFormat
           None
         }
 
-      val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
-        footerFileMetaData.getKeyValueMetaData.get,
-        datetimeRebaseModeInRead)
       val int96RebaseMode = DataSourceUtils.int96RebaseMode(
         footerFileMetaData.getKeyValueMetaData.get,
         int96RebaseModeInRead)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 9999df7..5afb736 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -35,6 +35,8 @@ import 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import 
org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, 
rebaseGregorianToJulianMicros}
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.sources
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -48,7 +50,8 @@ class ParquetFilters(
     pushDownDecimal: Boolean,
     pushDownStartWith: Boolean,
     pushDownInFilterThreshold: Int,
-    caseSensitive: Boolean) {
+    caseSensitive: Boolean,
+    datetimeRebaseMode: LegacyBehaviorPolicy.Value) {
   // A map which contains parquet field name and data type, if predicate push 
down applies.
   //
   // Each key in `nameToParquetField` represents a column; `dots` are used as 
separators for
@@ -129,14 +132,26 @@ class ParquetFilters(
   private val ParquetTimestampMillisType =
     ParquetSchemaType(LogicalTypeAnnotation.timestampType(true, 
TimeUnit.MILLIS), INT64, 0)
 
-  private def dateToDays(date: Any): Int = date match {
-    case d: Date => DateTimeUtils.fromJavaDate(d)
-    case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
+  private def dateToDays(date: Any): Int = {
+    val gregorianDays = date match {
+      case d: Date => DateTimeUtils.fromJavaDate(d)
+      case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
+    }
+    datetimeRebaseMode match {
+      case LegacyBehaviorPolicy.LEGACY => 
rebaseGregorianToJulianDays(gregorianDays)
+      case _ => gregorianDays
+    }
   }
 
-  private def timestampToMicros(v: Any): JLong = v match {
-    case i: Instant => DateTimeUtils.instantToMicros(i)
-    case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
+  private def timestampToMicros(v: Any): JLong = {
+    val gregorianMicros = v match {
+      case i: Instant => DateTimeUtils.instantToMicros(i)
+      case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
+    }
+    datetimeRebaseMode match {
+      case LegacyBehaviorPolicy.LEGACY => 
rebaseGregorianToJulianMicros(gregorianMicros)
+      case _ => gregorianMicros
+    }
   }
 
   private def decimalToInt32(decimal: JBigDecimal): Integer = 
decimal.unscaledValue().intValue()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 7807604..058669b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -133,11 +133,21 @@ case class ParquetPartitionReaderFactory(
 
     lazy val footerFileMetaData =
       ParquetFooterReader.readFooter(conf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get,
+      datetimeRebaseModeInRead)
     // Try to push down filters when filter push-down is enabled.
     val pushed = if (enableParquetFilterPushDown) {
       val parquetSchema = footerFileMetaData.getSchema
-      val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, 
pushDownTimestamp,
-        pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, 
isCaseSensitive)
+      val parquetFilters = new ParquetFilters(
+        parquetSchema,
+        pushDownDate,
+        pushDownTimestamp,
+        pushDownDecimal,
+        pushDownStringStartWith,
+        pushDownInFilterThreshold,
+        isCaseSensitive,
+        datetimeRebaseMode)
       filters
         // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
         // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
@@ -170,9 +180,6 @@ case class ParquetPartitionReaderFactory(
     if (pushed.isDefined) {
       
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
     }
-    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
-      footerFileMetaData.getKeyValueMetaData.get,
-      datetimeRebaseModeInRead)
     val int96RebaseMode = DataSourceUtils.int96RebaseMode(
       footerFileMetaData.getKeyValueMetaData.get,
       int96RebaseModeInRead)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
index 4405383..4b3f4e7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.read.{Scan, 
SupportsPushDownFilters}
 import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, 
SparkToParquetSchemaConverter}
 import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -51,8 +52,17 @@ case class ParquetScanBuilder(
     val isCaseSensitive = sqlConf.caseSensitiveAnalysis
     val parquetSchema =
       new 
SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(readDataSchema())
-    val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, 
pushDownTimestamp,
-      pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, 
isCaseSensitive)
+    val parquetFilters = new ParquetFilters(
+      parquetSchema,
+      pushDownDate,
+      pushDownTimestamp,
+      pushDownDecimal,
+      pushDownStringStartWith,
+      pushDownInFilterThreshold,
+      isCaseSensitive,
+      // The rebase mode doesn't matter here because the filters are used to 
determine
+      // whether they is convertible.
+      LegacyBehaviorPolicy.CORRECTED)
     parquetFilters.convertibleFilters(this.filters).toArray
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 230d547..8354158 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -45,7 +45,9 @@ import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, 
LEGACY}
+import 
org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.{INT96, 
TIMESTAMP_MICROS, TIMESTAMP_MILLIS}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.tags.ExtendedSQLTest
@@ -73,11 +75,14 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
 
   protected def createParquetFilters(
       schema: MessageType,
-      caseSensitive: Option[Boolean] = None): ParquetFilters =
+      caseSensitive: Option[Boolean] = None,
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value = 
LegacyBehaviorPolicy.CORRECTED
+    ): ParquetFilters =
     new ParquetFilters(schema, conf.parquetFilterPushDownDate, 
conf.parquetFilterPushDownTimestamp,
       conf.parquetFilterPushDownDecimal, 
conf.parquetFilterPushDownStringStartWith,
       conf.parquetFilterPushDownInFilterThreshold,
-      caseSensitive.getOrElse(conf.caseSensitiveAnalysis))
+      caseSensitive.getOrElse(conf.caseSensitiveAnalysis),
+      datetimeRebaseMode)
 
   override def beforeEach(): Unit = {
     super.beforeEach()
@@ -587,71 +592,75 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
       def date: Date = Date.valueOf(s)
     }
 
-    val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")
+    val data = Seq("1000-01-01", "2018-03-19", "2018-03-20", "2018-03-21")
     import testImplicits._
 
     Seq(false, true).foreach { java8Api =>
-      withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
-        val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF()
-        withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) =>
-          implicit val df: DataFrame = inputDF
-
-          def resultFun(dateStr: String): Any = {
-            val parsed = if (java8Api) LocalDate.parse(dateStr) else 
Date.valueOf(dateStr)
-            fun(parsed)
-          }
-
-          val dateAttr: Expression = df(colName).expr
-          assert(df(colName).expr.dataType === DateType)
-
-          checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
-          checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
-            data.map(i => Row.apply(resultFun(i))))
-
-          checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]],
-            resultFun("2018-03-18"))
-          checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]],
-            resultFun("2018-03-18"))
-          checkFilterPredicate(dateAttr =!= "2018-03-18".date, 
classOf[NotEq[_]],
-            Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => 
Row.apply(resultFun(i))))
-
-          checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
-            resultFun("2018-03-18"))
-          checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
-            resultFun("2018-03-21"))
-          checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]],
-            resultFun("2018-03-18"))
-          checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
-            resultFun("2018-03-21"))
-
-          checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, 
classOf[Eq[_]],
-            resultFun("2018-03-18"))
-          checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, 
classOf[Eq[_]],
-            resultFun("2018-03-18"))
-          checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, 
classOf[Lt[_]],
-            resultFun("2018-03-18"))
-          checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, 
classOf[Gt[_]],
-            resultFun("2018-03-21"))
-          checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, 
classOf[LtEq[_]],
-            resultFun("2018-03-18"))
-          checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, 
classOf[GtEq[_]],
-            resultFun("2018-03-21"))
-
-          checkFilterPredicate(!(dateAttr < "2018-03-21".date), 
classOf[GtEq[_]],
-            resultFun("2018-03-21"))
-          checkFilterPredicate(
-            dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
-            classOf[Operators.Or],
-            Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21"))))
+      Seq(CORRECTED, LEGACY).foreach { rebaseMode =>
+        withSQLConf(
+          SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
+          SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString) {
+          val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF()
+          withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) =>
+            implicit val df: DataFrame = inputDF
+
+            def resultFun(dateStr: String): Any = {
+              val parsed = if (java8Api) LocalDate.parse(dateStr) else 
Date.valueOf(dateStr)
+              fun(parsed)
+            }
 
-          Seq(3, 20).foreach { threshold =>
-            withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key 
-> s"$threshold") {
-              checkFilterPredicate(
-                In(dateAttr, Array("2018-03-19".date, "2018-03-20".date, 
"2018-03-21".date,
-                  "2018-03-22".date).map(Literal.apply)),
-                if (threshold == 3) classOf[Operators.And] else 
classOf[Operators.Or],
-                Seq(Row(resultFun("2018-03-19")), Row(resultFun("2018-03-20")),
-                  Row(resultFun("2018-03-21"))))
+            val dateAttr: Expression = df(colName).expr
+            assert(df(colName).expr.dataType === DateType)
+
+            checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], 
Seq.empty[Row])
+            checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
+              data.map(i => Row.apply(resultFun(i))))
+
+            checkFilterPredicate(dateAttr === "1000-01-01".date, 
classOf[Eq[_]],
+              resultFun("1000-01-01"))
+            checkFilterPredicate(dateAttr <=> "1000-01-01".date, 
classOf[Eq[_]],
+              resultFun("1000-01-01"))
+            checkFilterPredicate(dateAttr =!= "1000-01-01".date, 
classOf[NotEq[_]],
+              Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => 
Row.apply(resultFun(i))))
+
+            checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
+              resultFun("1000-01-01"))
+            checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
+              resultFun("2018-03-21"))
+            checkFilterPredicate(dateAttr <= "1000-01-01".date, 
classOf[LtEq[_]],
+              resultFun("1000-01-01"))
+            checkFilterPredicate(dateAttr >= "2018-03-21".date, 
classOf[GtEq[_]],
+              resultFun("2018-03-21"))
+
+            checkFilterPredicate(Literal("1000-01-01".date) === dateAttr, 
classOf[Eq[_]],
+              resultFun("1000-01-01"))
+            checkFilterPredicate(Literal("1000-01-01".date) <=> dateAttr, 
classOf[Eq[_]],
+              resultFun("1000-01-01"))
+            checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, 
classOf[Lt[_]],
+              resultFun("1000-01-01"))
+            checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, 
classOf[Gt[_]],
+              resultFun("2018-03-21"))
+            checkFilterPredicate(Literal("1000-01-01".date) >= dateAttr, 
classOf[LtEq[_]],
+              resultFun("1000-01-01"))
+            checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, 
classOf[GtEq[_]],
+              resultFun("2018-03-21"))
+
+            checkFilterPredicate(!(dateAttr < "2018-03-21".date), 
classOf[GtEq[_]],
+              resultFun("2018-03-21"))
+            checkFilterPredicate(
+              dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
+              classOf[Operators.Or],
+              Seq(Row(resultFun("1000-01-01")), Row(resultFun("2018-03-21"))))
+
+            Seq(3, 20).foreach { threshold =>
+              
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> 
s"$threshold") {
+                checkFilterPredicate(
+                  In(dateAttr, Array("2018-03-19".date, "2018-03-20".date, 
"2018-03-21".date,
+                    "2018-03-22".date).map(Literal.apply)),
+                  if (threshold == 3) classOf[Operators.And] else 
classOf[Operators.Or],
+                  Seq(Row(resultFun("2018-03-19")), 
Row(resultFun("2018-03-20")),
+                    Row(resultFun("2018-03-21"))))
+              }
             }
           }
         }
@@ -661,35 +670,36 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
 
   test("filter pushdown - timestamp") {
     Seq(true, false).foreach { java8Api =>
-      withSQLConf(
-        SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
-        SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED",
-        SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
-        // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
+      Seq(CORRECTED, LEGACY).foreach { rebaseMode =>
         val millisData = Seq(
           "1000-06-14 08:28:53.123",
           "1582-06-15 08:28:53.001",
           "1900-06-16 08:28:53.0",
           "2018-06-17 08:28:53.999")
-        withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
-          ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
+        withSQLConf(
+          SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
+          SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
+          SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> 
TIMESTAMP_MILLIS.toString) {
           testTimestampPushdown(millisData, java8Api)
         }
 
-        // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
         val microsData = Seq(
           "1000-06-14 08:28:53.123456",
           "1582-06-15 08:28:53.123456",
           "1900-06-16 08:28:53.123456",
           "2018-06-17 08:28:53.123456")
-        withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
-          ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
+        withSQLConf(
+          SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
+          SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
+          SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> 
TIMESTAMP_MICROS.toString) {
           testTimestampPushdown(microsData, java8Api)
         }
 
-        // spark.sql.parquet.outputTimestampType = INT96 doesn't support 
pushdown
-        withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
-          ParquetOutputTimestampType.INT96.toString) {
+        // INT96 doesn't support pushdown
+        withSQLConf(
+          SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
+          SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> 
rebaseMode.toString,
+          SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> INT96.toString) {
           import testImplicits._
           withTempPath { file =>
             millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to