This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 724d6a8 [SPARK-36645][SQL][FOLLOWUP] Disable min/max push down for Parquet Binary 724d6a8 is described below commit 724d6a83df0aa7927cbaa205fcd7c48f544432bc Author: Huaxin Gao <huaxin_...@apple.com> AuthorDate: Fri Oct 22 13:28:54 2021 +0900 [SPARK-36645][SQL][FOLLOWUP] Disable min/max push down for Parquet Binary ### What changes were proposed in this pull request? Disable min/max push down for Parquet Binary ### Why are the changes needed? Parquet Binary min/max could be truncated. We may get wrong result if we rely on parquet Binary min/max. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? modify existing tests Closes #34346 from huaxingao/disableBinary. Authored-by: Huaxin Gao <huaxin_...@apple.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../v2/parquet/ParquetScanBuilder.scala | 12 ++-- .../parquet/ParquetAggregatePushDownSuite.scala | 69 +++++++++++----------- 2 files changed, 41 insertions(+), 40 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala index c579867..113438a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, Spark import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.{ArrayType, LongType, MapType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{BooleanType, ByteType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap case class ParquetScanBuilder( @@ -114,11 +114,15 @@ case class ParquetScanBuilder( // not push down complex type // not push down Timestamp because INT96 sort order is undefined, // Parquet doesn't return statistics for INT96 - case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType => - false - case _ => + // not push down Parquet Binary because min/max could be truncated + // (https://issues.apache.org/jira/browse/PARQUET-1685), Parquet Binary + // could be Spark StringType, BinaryType or DecimalType + case BooleanType | ByteType | ShortType | IntegerType + | LongType | FloatType | DoubleType | DateType => finalSchema = finalSchema.add(structField.copy(s"$aggType(" + structField.name + ")")) true + case _ => + false } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala index c795bd9..0ae95db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala @@ -361,89 +361,86 @@ abstract class ParquetAggregatePushDownSuite withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", vectorizedReaderEnabledKey -> testVectorizedReader) { - val testMinWithTS = sql("SELECT min(StringCol), min(BooleanCol), min(ByteCol), " + + val testMinWithAllTypes = sql("SELECT min(StringCol), min(BooleanCol), min(ByteCol), " + "min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " + "min(DoubleCol), min(DecimalCol), min(DateCol), min(TimestampCol) FROM test") // INT96 (Timestamp) sort order is undefined, parquet doesn't return stats for this type // so aggregates are not pushed down - testMinWithTS.queryExecution.optimizedPlan.collect { + // In addition, Parquet Binary min/max could be truncated, so we disable aggregate + // push down for Parquet Binary (could be Spark StringType, BinaryType or DecimalType) + testMinWithAllTypes.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => val expected_plan_fragment = "PushedAggregation: []" - checkKeywordsExistsInExplain(testMinWithTS, expected_plan_fragment) + checkKeywordsExistsInExplain(testMinWithAllTypes, expected_plan_fragment) } - checkAnswer(testMinWithTS, Seq(Row("a string", false, 1.toByte, "Parquet".getBytes, - 2.toShort, 3, -9223372036854775808L, 0.15.toFloat, 0.75D, 1.23457, - ("2004-06-19").date, ("1999-08-26 10:43:59.123").ts))) + checkAnswer(testMinWithAllTypes, Seq(Row("a string", false, 1.toByte, + "Parquet".getBytes, 2.toShort, 3, -9223372036854775808L, 0.15.toFloat, 0.75D, + 1.23457, ("2004-06-19").date, ("1999-08-26 10:43:59.123").ts))) - val testMinWithOutTS = sql("SELECT min(StringCol), min(BooleanCol), min(ByteCol), " + - "min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " + - "min(DoubleCol), min(DecimalCol), min(DateCol) FROM test") + val testMinWithOutTSAndBinary = sql("SELECT min(BooleanCol), min(ByteCol), " + + "min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " + + "min(DoubleCol), min(DateCol) FROM test") - testMinWithOutTS.queryExecution.optimizedPlan.collect { + testMinWithOutTSAndBinary.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => val expected_plan_fragment = - "PushedAggregation: [MIN(StringCol), " + - "MIN(BooleanCol), " + + "PushedAggregation: [MIN(BooleanCol), " + "MIN(ByteCol), " + - "MIN(BinaryCol), " + "MIN(ShortCol), " + "MIN(IntegerCol), " + "MIN(LongCol), " + "MIN(FloatCol), " + "MIN(DoubleCol), " + - "MIN(DecimalCol), " + "MIN(DateCol)]" - checkKeywordsExistsInExplain(testMinWithOutTS, expected_plan_fragment) + checkKeywordsExistsInExplain(testMinWithOutTSAndBinary, expected_plan_fragment) } - checkAnswer(testMinWithOutTS, Seq(Row("a string", false, 1.toByte, "Parquet".getBytes, - 2.toShort, 3, -9223372036854775808L, 0.15.toFloat, 0.75D, 1.23457, - ("2004-06-19").date))) + checkAnswer(testMinWithOutTSAndBinary, Seq(Row(false, 1.toByte, + 2.toShort, 3, -9223372036854775808L, 0.15.toFloat, 0.75D, ("2004-06-19").date))) - val testMaxWithTS = sql("SELECT max(StringCol), max(BooleanCol), max(ByteCol), " + - "max(BinaryCol), max(ShortCol), max(IntegerCol), max(LongCol), max(FloatCol), " + - "max(DoubleCol), max(DecimalCol), max(DateCol), max(TimestampCol) FROM test") + val testMaxWithAllTypes = sql("SELECT max(StringCol), max(BooleanCol), " + + "max(ByteCol), max(BinaryCol), max(ShortCol), max(IntegerCol), max(LongCol), " + + "max(FloatCol), max(DoubleCol), max(DecimalCol), max(DateCol), max(TimestampCol) " + + "FROM test") // INT96 (Timestamp) sort order is undefined, parquet doesn't return stats for this type // so aggregates are not pushed down - testMaxWithTS.queryExecution.optimizedPlan.collect { + // In addition, Parquet Binary min/max could be truncated, so we disable aggregate + // push down for Parquet Binary (could be Spark StringType, BinaryType or DecimalType) + testMaxWithAllTypes.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => val expected_plan_fragment = "PushedAggregation: []" - checkKeywordsExistsInExplain(testMaxWithTS, expected_plan_fragment) + checkKeywordsExistsInExplain(testMaxWithAllTypes, expected_plan_fragment) } - checkAnswer(testMaxWithTS, Seq(Row("test string", true, 16.toByte, + checkAnswer(testMaxWithAllTypes, Seq(Row("test string", true, 16.toByte, "Spark SQL".getBytes, 222.toShort, 113, 9223372036854775807L, 0.25.toFloat, 0.85D, 12345.678, ("2021-01-01").date, ("2021-01-01 23:50:59.123").ts))) - val testMaxWithoutTS = sql("SELECT max(StringCol), max(BooleanCol), max(ByteCol), " + - "max(BinaryCol), max(ShortCol), max(IntegerCol), max(LongCol), max(FloatCol), " + - "max(DoubleCol), max(DecimalCol), max(DateCol) FROM test") + val testMaxWithoutTSAndBinary = sql("SELECT max(BooleanCol), max(ByteCol), " + + "max(ShortCol), max(IntegerCol), max(LongCol), max(FloatCol), " + + "max(DoubleCol), max(DateCol) FROM test") - testMaxWithoutTS.queryExecution.optimizedPlan.collect { + testMaxWithoutTSAndBinary.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => val expected_plan_fragment = - "PushedAggregation: [MAX(StringCol), " + - "MAX(BooleanCol), " + + "PushedAggregation: [MAX(BooleanCol), " + "MAX(ByteCol), " + - "MAX(BinaryCol), " + "MAX(ShortCol), " + "MAX(IntegerCol), " + "MAX(LongCol), " + "MAX(FloatCol), " + "MAX(DoubleCol), " + - "MAX(DecimalCol), " + "MAX(DateCol)]" - checkKeywordsExistsInExplain(testMaxWithoutTS, expected_plan_fragment) + checkKeywordsExistsInExplain(testMaxWithoutTSAndBinary, expected_plan_fragment) } - checkAnswer(testMaxWithoutTS, Seq(Row("test string", true, 16.toByte, - "Spark SQL".getBytes, 222.toShort, 113, 9223372036854775807L, 0.25.toFloat, 0.85D, - 12345.678, ("2021-01-01").date))) + checkAnswer(testMaxWithoutTSAndBinary, Seq(Row(true, 16.toByte, + 222.toShort, 113, 9223372036854775807L, 0.25.toFloat, 0.85D, ("2021-01-01").date))) val testCount = sql("SELECT count(StringCol), count(BooleanCol)," + " count(ByteCol), count(BinaryCol), count(ShortCol), count(IntegerCol)," + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org