This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 724d6a8  [SPARK-36645][SQL][FOLLOWUP] Disable min/max push down for 
Parquet Binary
724d6a8 is described below

commit 724d6a83df0aa7927cbaa205fcd7c48f544432bc
Author: Huaxin Gao <huaxin_...@apple.com>
AuthorDate: Fri Oct 22 13:28:54 2021 +0900

    [SPARK-36645][SQL][FOLLOWUP] Disable min/max push down for Parquet Binary
    
    ### What changes were proposed in this pull request?
    Disable min/max push down for Parquet Binary
    
    ### Why are the changes needed?
    Parquet Binary min/max could be truncated. We may get wrong result if we 
rely on parquet Binary min/max.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    modify existing tests
    
    Closes #34346 from huaxingao/disableBinary.
    
    Authored-by: Huaxin Gao <huaxin_...@apple.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../v2/parquet/ParquetScanBuilder.scala            | 12 ++--
 .../parquet/ParquetAggregatePushDownSuite.scala    | 69 +++++++++++-----------
 2 files changed, 41 insertions(+), 40 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
index c579867..113438a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, Spark
 import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
 import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{ArrayType, LongType, MapType, StructField, 
StructType, TimestampType}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DateType, 
DoubleType, FloatType, IntegerType, LongType, ShortType, StructField, 
StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 case class ParquetScanBuilder(
@@ -114,11 +114,15 @@ case class ParquetScanBuilder(
         // not push down complex type
         // not push down Timestamp because INT96 sort order is undefined,
         // Parquet doesn't return statistics for INT96
-        case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | 
TimestampType =>
-          false
-        case _ =>
+        // not push down Parquet Binary because min/max could be truncated
+        // (https://issues.apache.org/jira/browse/PARQUET-1685), Parquet Binary
+        // could be Spark StringType, BinaryType or DecimalType
+        case BooleanType | ByteType | ShortType | IntegerType
+             | LongType | FloatType | DoubleType | DateType =>
           finalSchema = finalSchema.add(structField.copy(s"$aggType(" + 
structField.name + ")"))
           true
+        case _ =>
+          false
       }
     }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
index c795bd9..0ae95db 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
@@ -361,89 +361,86 @@ abstract class ParquetAggregatePushDownSuite
           withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true",
             vectorizedReaderEnabledKey -> testVectorizedReader) {
 
-            val testMinWithTS = sql("SELECT min(StringCol), min(BooleanCol), 
min(ByteCol), " +
+            val testMinWithAllTypes = sql("SELECT min(StringCol), 
min(BooleanCol), min(ByteCol), " +
               "min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), 
min(FloatCol), " +
               "min(DoubleCol), min(DecimalCol), min(DateCol), 
min(TimestampCol) FROM test")
 
             // INT96 (Timestamp) sort order is undefined, parquet doesn't 
return stats for this type
             // so aggregates are not pushed down
-            testMinWithTS.queryExecution.optimizedPlan.collect {
+            // In addition, Parquet Binary min/max could be truncated, so we 
disable aggregate
+            // push down for Parquet Binary (could be Spark StringType, 
BinaryType or DecimalType)
+            testMinWithAllTypes.queryExecution.optimizedPlan.collect {
               case _: DataSourceV2ScanRelation =>
                 val expected_plan_fragment =
                   "PushedAggregation: []"
-                checkKeywordsExistsInExplain(testMinWithTS, 
expected_plan_fragment)
+                checkKeywordsExistsInExplain(testMinWithAllTypes, 
expected_plan_fragment)
             }
 
-            checkAnswer(testMinWithTS, Seq(Row("a string", false, 1.toByte, 
"Parquet".getBytes,
-              2.toShort, 3, -9223372036854775808L, 0.15.toFloat, 0.75D, 
1.23457,
-              ("2004-06-19").date, ("1999-08-26 10:43:59.123").ts)))
+            checkAnswer(testMinWithAllTypes, Seq(Row("a string", false, 
1.toByte,
+              "Parquet".getBytes, 2.toShort, 3, -9223372036854775808L, 
0.15.toFloat, 0.75D,
+              1.23457, ("2004-06-19").date, ("1999-08-26 10:43:59.123").ts)))
 
-            val testMinWithOutTS = sql("SELECT min(StringCol), 
min(BooleanCol), min(ByteCol), " +
-              "min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), 
min(FloatCol), " +
-              "min(DoubleCol), min(DecimalCol), min(DateCol) FROM test")
+            val testMinWithOutTSAndBinary = sql("SELECT min(BooleanCol), 
min(ByteCol), " +
+              "min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " +
+              "min(DoubleCol), min(DateCol) FROM test")
 
-            testMinWithOutTS.queryExecution.optimizedPlan.collect {
+            testMinWithOutTSAndBinary.queryExecution.optimizedPlan.collect {
               case _: DataSourceV2ScanRelation =>
                 val expected_plan_fragment =
-                  "PushedAggregation: [MIN(StringCol), " +
-                    "MIN(BooleanCol), " +
+                  "PushedAggregation: [MIN(BooleanCol), " +
                     "MIN(ByteCol), " +
-                    "MIN(BinaryCol), " +
                     "MIN(ShortCol), " +
                     "MIN(IntegerCol), " +
                     "MIN(LongCol), " +
                     "MIN(FloatCol), " +
                     "MIN(DoubleCol), " +
-                    "MIN(DecimalCol), " +
                     "MIN(DateCol)]"
-                checkKeywordsExistsInExplain(testMinWithOutTS, 
expected_plan_fragment)
+                checkKeywordsExistsInExplain(testMinWithOutTSAndBinary, 
expected_plan_fragment)
             }
 
-            checkAnswer(testMinWithOutTS, Seq(Row("a string", false, 1.toByte, 
"Parquet".getBytes,
-              2.toShort, 3, -9223372036854775808L, 0.15.toFloat, 0.75D, 
1.23457,
-              ("2004-06-19").date)))
+            checkAnswer(testMinWithOutTSAndBinary, Seq(Row(false, 1.toByte,
+              2.toShort, 3, -9223372036854775808L, 0.15.toFloat, 0.75D, 
("2004-06-19").date)))
 
-            val testMaxWithTS = sql("SELECT max(StringCol), max(BooleanCol), 
max(ByteCol), " +
-              "max(BinaryCol), max(ShortCol), max(IntegerCol), max(LongCol), 
max(FloatCol), " +
-              "max(DoubleCol), max(DecimalCol), max(DateCol), 
max(TimestampCol) FROM test")
+            val testMaxWithAllTypes = sql("SELECT max(StringCol), 
max(BooleanCol), " +
+              "max(ByteCol), max(BinaryCol), max(ShortCol), max(IntegerCol), 
max(LongCol), " +
+              "max(FloatCol), max(DoubleCol), max(DecimalCol), max(DateCol), 
max(TimestampCol) " +
+              "FROM test")
 
             // INT96 (Timestamp) sort order is undefined, parquet doesn't 
return stats for this type
             // so aggregates are not pushed down
-            testMaxWithTS.queryExecution.optimizedPlan.collect {
+            // In addition, Parquet Binary min/max could be truncated, so we 
disable aggregate
+            // push down for Parquet Binary (could be Spark StringType, 
BinaryType or DecimalType)
+            testMaxWithAllTypes.queryExecution.optimizedPlan.collect {
               case _: DataSourceV2ScanRelation =>
                 val expected_plan_fragment =
                   "PushedAggregation: []"
-                checkKeywordsExistsInExplain(testMaxWithTS, 
expected_plan_fragment)
+                checkKeywordsExistsInExplain(testMaxWithAllTypes, 
expected_plan_fragment)
             }
 
-            checkAnswer(testMaxWithTS, Seq(Row("test string", true, 16.toByte,
+            checkAnswer(testMaxWithAllTypes, Seq(Row("test string", true, 
16.toByte,
               "Spark SQL".getBytes, 222.toShort, 113, 9223372036854775807L, 
0.25.toFloat, 0.85D,
               12345.678, ("2021-01-01").date, ("2021-01-01 23:50:59.123").ts)))
 
-            val testMaxWithoutTS = sql("SELECT max(StringCol), 
max(BooleanCol), max(ByteCol), " +
-              "max(BinaryCol), max(ShortCol), max(IntegerCol), max(LongCol), 
max(FloatCol), " +
-              "max(DoubleCol), max(DecimalCol), max(DateCol) FROM test")
+            val testMaxWithoutTSAndBinary = sql("SELECT max(BooleanCol), 
max(ByteCol), " +
+              "max(ShortCol), max(IntegerCol), max(LongCol), max(FloatCol), " +
+              "max(DoubleCol), max(DateCol) FROM test")
 
-            testMaxWithoutTS.queryExecution.optimizedPlan.collect {
+            testMaxWithoutTSAndBinary.queryExecution.optimizedPlan.collect {
               case _: DataSourceV2ScanRelation =>
                 val expected_plan_fragment =
-                  "PushedAggregation: [MAX(StringCol), " +
-                    "MAX(BooleanCol), " +
+                  "PushedAggregation: [MAX(BooleanCol), " +
                     "MAX(ByteCol), " +
-                    "MAX(BinaryCol), " +
                     "MAX(ShortCol), " +
                     "MAX(IntegerCol), " +
                     "MAX(LongCol), " +
                     "MAX(FloatCol), " +
                     "MAX(DoubleCol), " +
-                    "MAX(DecimalCol), " +
                     "MAX(DateCol)]"
-                checkKeywordsExistsInExplain(testMaxWithoutTS, 
expected_plan_fragment)
+                checkKeywordsExistsInExplain(testMaxWithoutTSAndBinary, 
expected_plan_fragment)
             }
 
-            checkAnswer(testMaxWithoutTS, Seq(Row("test string", true, 
16.toByte,
-              "Spark SQL".getBytes, 222.toShort, 113, 9223372036854775807L, 
0.25.toFloat, 0.85D,
-              12345.678, ("2021-01-01").date)))
+            checkAnswer(testMaxWithoutTSAndBinary, Seq(Row(true, 16.toByte,
+              222.toShort, 113, 9223372036854775807L, 0.25.toFloat, 0.85D, 
("2021-01-01").date)))
 
             val testCount = sql("SELECT count(StringCol), count(BooleanCol)," +
               " count(ByteCol), count(BinaryCol), count(ShortCol), 
count(IntegerCol)," +

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to