[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r398738263 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ## @@ -187,201 +209,273 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } test("filter pushdown - boolean") { -withParquetDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b { implicit df => - checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), Row(false))) - - checkFilterPredicate('_1 === true, classOf[Eq[_]], true) - checkFilterPredicate('_1 <=> true, classOf[Eq[_]], true) - checkFilterPredicate('_1 =!= true, classOf[NotEq[_]], false) +val data = (true :: false :: Nil).map(b => Tuple1.apply(Option(b))) +import testImplicits._ +withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) => + withParquetDataFrame(inputDF) { implicit df => +val booleanAttr = df(colName).expr +assert(df(colName).expr.dataType === BooleanType) + +checkFilterPredicate(booleanAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) +checkFilterPredicate(booleanAttr.isNotNull, classOf[NotEq[_]], + Seq(Row(resultFun(true)), Row(resultFun(false + +checkFilterPredicate(booleanAttr === true, classOf[Eq[_]], resultFun(true)) +checkFilterPredicate(booleanAttr <=> true, classOf[Eq[_]], resultFun(true)) +checkFilterPredicate(booleanAttr =!= true, classOf[NotEq[_]], resultFun(false)) + } } } test("filter pushdown - tinyint") { -withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toByte { implicit df => - assert(df.schema.head.dataType === ByteType) - checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) - - checkFilterPredicate('_1 === 1.toByte, classOf[Eq[_]], 1) - checkFilterPredicate('_1 <=> 1.toByte, classOf[Eq[_]], 1) - checkFilterPredicate('_1 =!= 1.toByte, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) - - checkFilterPredicate('_1 < 2.toByte, classOf[Lt[_]], 1) - checkFilterPredicate('_1 > 3.toByte, classOf[Gt[_]], 4) - checkFilterPredicate('_1 <= 1.toByte, classOf[LtEq[_]], 1) - checkFilterPredicate('_1 >= 4.toByte, classOf[GtEq[_]], 4) - - checkFilterPredicate(Literal(1.toByte) === '_1, classOf[Eq[_]], 1) - checkFilterPredicate(Literal(1.toByte) <=> '_1, classOf[Eq[_]], 1) - checkFilterPredicate(Literal(2.toByte) > '_1, classOf[Lt[_]], 1) - checkFilterPredicate(Literal(3.toByte) < '_1, classOf[Gt[_]], 4) - checkFilterPredicate(Literal(1.toByte) >= '_1, classOf[LtEq[_]], 1) - checkFilterPredicate(Literal(4.toByte) <= '_1, classOf[GtEq[_]], 4) - - checkFilterPredicate(!('_1 < 4.toByte), classOf[GtEq[_]], 4) - checkFilterPredicate('_1 < 2.toByte || '_1 > 3.toByte, -classOf[Operators.Or], Seq(Row(1), Row(4))) +val data = (1 to 4).map(i => Tuple1(Option(i.toByte))) +import testImplicits._ +withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) => + withParquetDataFrame(inputDF) { implicit df => +val tinyIntAttr = df(colName).expr +assert(df(colName).expr.dataType === ByteType) + +checkFilterPredicate(tinyIntAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) +checkFilterPredicate(tinyIntAttr.isNotNull, classOf[NotEq[_]], + (1 to 4).map(i => Row.apply(resultFun(i + +checkFilterPredicate(tinyIntAttr === 1.toByte, classOf[Eq[_]], resultFun(1)) +checkFilterPredicate(tinyIntAttr <=> 1.toByte, classOf[Eq[_]], resultFun(1)) +checkFilterPredicate(tinyIntAttr =!= 1.toByte, classOf[NotEq[_]], + (2 to 4).map(i => Row.apply(resultFun(i + +checkFilterPredicate(tinyIntAttr < 2.toByte, classOf[Lt[_]], resultFun(1)) +checkFilterPredicate(tinyIntAttr > 3.toByte, classOf[Gt[_]], resultFun(4)) +checkFilterPredicate(tinyIntAttr <= 1.toByte, classOf[LtEq[_]], resultFun(1)) +checkFilterPredicate(tinyIntAttr >= 4.toByte, classOf[GtEq[_]], resultFun(4)) + +checkFilterPredicate(Literal(1.toByte) === tinyIntAttr, classOf[Eq[_]], resultFun(1)) +checkFilterPredicate(Literal(1.toByte) <=> tinyIntAttr, classOf[Eq[_]], resultFun(1)) +checkFilterPredicate(Literal(2.toByte) > tinyIntAttr, classOf[Lt[_]], resultFun(1)) +checkFilterPredicate(Literal(3.toByte) < tinyIntAttr, classOf[Gt[_]], resultFun(4)) +checkFilterPredicate(Literal(1.toByte) >= tinyIntAttr, classOf[LtEq[_]], resultFun(1)) +
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r398738263 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ## @@ -187,201 +209,273 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } test("filter pushdown - boolean") { -withParquetDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b { implicit df => - checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), Row(false))) - - checkFilterPredicate('_1 === true, classOf[Eq[_]], true) - checkFilterPredicate('_1 <=> true, classOf[Eq[_]], true) - checkFilterPredicate('_1 =!= true, classOf[NotEq[_]], false) +val data = (true :: false :: Nil).map(b => Tuple1.apply(Option(b))) +import testImplicits._ +withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) => + withParquetDataFrame(inputDF) { implicit df => +val booleanAttr = df(colName).expr +assert(df(colName).expr.dataType === BooleanType) + +checkFilterPredicate(booleanAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) +checkFilterPredicate(booleanAttr.isNotNull, classOf[NotEq[_]], + Seq(Row(resultFun(true)), Row(resultFun(false + +checkFilterPredicate(booleanAttr === true, classOf[Eq[_]], resultFun(true)) +checkFilterPredicate(booleanAttr <=> true, classOf[Eq[_]], resultFun(true)) +checkFilterPredicate(booleanAttr =!= true, classOf[NotEq[_]], resultFun(false)) + } } } test("filter pushdown - tinyint") { -withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toByte { implicit df => - assert(df.schema.head.dataType === ByteType) - checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) - - checkFilterPredicate('_1 === 1.toByte, classOf[Eq[_]], 1) - checkFilterPredicate('_1 <=> 1.toByte, classOf[Eq[_]], 1) - checkFilterPredicate('_1 =!= 1.toByte, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) - - checkFilterPredicate('_1 < 2.toByte, classOf[Lt[_]], 1) - checkFilterPredicate('_1 > 3.toByte, classOf[Gt[_]], 4) - checkFilterPredicate('_1 <= 1.toByte, classOf[LtEq[_]], 1) - checkFilterPredicate('_1 >= 4.toByte, classOf[GtEq[_]], 4) - - checkFilterPredicate(Literal(1.toByte) === '_1, classOf[Eq[_]], 1) - checkFilterPredicate(Literal(1.toByte) <=> '_1, classOf[Eq[_]], 1) - checkFilterPredicate(Literal(2.toByte) > '_1, classOf[Lt[_]], 1) - checkFilterPredicate(Literal(3.toByte) < '_1, classOf[Gt[_]], 4) - checkFilterPredicate(Literal(1.toByte) >= '_1, classOf[LtEq[_]], 1) - checkFilterPredicate(Literal(4.toByte) <= '_1, classOf[GtEq[_]], 4) - - checkFilterPredicate(!('_1 < 4.toByte), classOf[GtEq[_]], 4) - checkFilterPredicate('_1 < 2.toByte || '_1 > 3.toByte, -classOf[Operators.Or], Seq(Row(1), Row(4))) +val data = (1 to 4).map(i => Tuple1(Option(i.toByte))) +import testImplicits._ +withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) => + withParquetDataFrame(inputDF) { implicit df => +val tinyIntAttr = df(colName).expr +assert(df(colName).expr.dataType === ByteType) + +checkFilterPredicate(tinyIntAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) +checkFilterPredicate(tinyIntAttr.isNotNull, classOf[NotEq[_]], + (1 to 4).map(i => Row.apply(resultFun(i + +checkFilterPredicate(tinyIntAttr === 1.toByte, classOf[Eq[_]], resultFun(1)) +checkFilterPredicate(tinyIntAttr <=> 1.toByte, classOf[Eq[_]], resultFun(1)) +checkFilterPredicate(tinyIntAttr =!= 1.toByte, classOf[NotEq[_]], + (2 to 4).map(i => Row.apply(resultFun(i + +checkFilterPredicate(tinyIntAttr < 2.toByte, classOf[Lt[_]], resultFun(1)) +checkFilterPredicate(tinyIntAttr > 3.toByte, classOf[Gt[_]], resultFun(4)) +checkFilterPredicate(tinyIntAttr <= 1.toByte, classOf[LtEq[_]], resultFun(1)) +checkFilterPredicate(tinyIntAttr >= 4.toByte, classOf[GtEq[_]], resultFun(4)) + +checkFilterPredicate(Literal(1.toByte) === tinyIntAttr, classOf[Eq[_]], resultFun(1)) +checkFilterPredicate(Literal(1.toByte) <=> tinyIntAttr, classOf[Eq[_]], resultFun(1)) +checkFilterPredicate(Literal(2.toByte) > tinyIntAttr, classOf[Lt[_]], resultFun(1)) +checkFilterPredicate(Literal(3.toByte) < tinyIntAttr, classOf[Gt[_]], resultFun(4)) +checkFilterPredicate(Literal(1.toByte) >= tinyIntAttr, classOf[LtEq[_]], resultFun(1)) +
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r398732474 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ## @@ -502,33 +617,39 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared (false, DecimalType.MAX_PRECISION) // binaryWriterUsingUnscaledBytes ).foreach { case (legacyFormat, precision) => withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> legacyFormat.toString) { -val schema = StructType.fromDDL(s"a decimal($precision, 2)") val rdd = spark.sparkContext.parallelize((1 to 4).map(i => Row(new java.math.BigDecimal(i -val dataFrame = spark.createDataFrame(rdd, schema) -testDecimalPushDown(dataFrame) { implicit df => Review comment: It's already removed here in this PR, https://github.com/apache/spark/pull/27728/files#diff-43b427b8b0b4b9d8dd7e4367c0526f83L157 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r398318993 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ## @@ -103,22 +107,42 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df) } - private def checkBinaryFilterPredicate - (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row]) - (implicit df: DataFrame): Unit = { -def checkBinaryAnswer(df: DataFrame, expected: Seq[Row]) = { - assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).sorted) { - df.rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted - } + /** + * Takes single level `inputDF` dataframe to generate multi-level nested + * dataframes as new test data. + */ + private def withNestedDataFrame(inputDF: DataFrame) + (runTests: (DataFrame, String, Any => Any) => Unit): Unit = { +assert(inputDF.schema.fields.length == 1) +assert(!inputDF.schema.fields.head.dataType.isInstanceOf[StructType]) Review comment: @cloud-fan schema checking in the code to avoid passing any type of dataframe. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r398310277 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ## @@ -121,43 +121,81 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df) } + /** + * Takes single level `inputDF` dataframe to generate multi-level nested + * dataframes as new test data. + */ + private def withNestedDataFrame(inputDF: DataFrame) Review comment: Okay, this is not easy since one of the test case is like ```scala val dataFrame = spark.createDataFrame(rdd, StructType.fromDDL(s"a decimal($precision, 2)")) withNestedDataFrame(dataFrame) { case (inputDF, pushDownColName, resultFun) => withParquetDataFrame(inputDF) { implicit df => val decimalAttr: Expression = df(pushDownColName).expr assert(df(pushDownColName).expr.dataType === DecimalType(precision, 2)) ``` , so the dataframe can not be constructed directly from `withNestedDataFrame[T](data: Seq[T])` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r398317799 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ## @@ -103,22 +107,42 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df) } - private def checkBinaryFilterPredicate Review comment: @cloud-fan Since `checkFilterPredicate` works for binary data in the current implementation, `checkBinaryFilterPredicate` can be deleted. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r398314267 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -2049,6 +2049,17 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_PREDICATE_PUSHDOWN_ENABLED = +buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled") + .internal() + .doc("When true, Spark tries to push down predicates for nested columns and or names " + +"containing `dots` to data sources. Currently, Parquet implements both optimizations " + +"while ORC only supports predicates for names containing `dots`. The other data sources" + +"don't support this feature yet.") + .version("3.0.0") + .booleanConf + .createWithDefault(true) Review comment: Since the filter apis will be enhanced to support nested columns and column name containing `dots`, it will be nice to introduce it in a major release. It's a good idea! We can make another PR to turn this feature on for specific data sources in a separate PR. This PR already grows too big. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r398312781 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ## @@ -121,43 +121,81 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df) } + /** + * Takes single level `inputDF` dataframe to generate multi-level nested + * dataframes as new test data. + */ + private def withNestedDataFrame(inputDF: DataFrame) Review comment: Instead, I add schema checking in the code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r398310277 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ## @@ -121,43 +121,81 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df) } + /** + * Takes single level `inputDF` dataframe to generate multi-level nested + * dataframes as new test data. + */ + private def withNestedDataFrame(inputDF: DataFrame) Review comment: Okay, this is not easy since one of the test case is like ```scala val dataFrame = spark.createDataFrame(rdd, StructType.fromDDL(s"a decimal($precision, 2)")) withNestedDataFrame(dataFrame) { case (inputDF, pushDownColName, resultFun) => withParquetDataFrame(inputDF) { implicit df => val decimalAttr: Expression = df(pushDownColName).expr assert(df(pushDownColName).expr.dataType === DecimalType(precision, 2)) ``` , and the dataframe can not be constructed directly from `withNestedDataFrame[T](data: Seq[T])` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r398304484 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ## @@ -121,43 +121,81 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df) } + /** + * Takes single level `inputDF` dataframe to generate multi-level nested + * dataframes as new test data. + */ + private def withNestedDataFrame(inputDF: DataFrame) Review comment: Sounds good. I think ` private def withNestedDataFrame[T <: Product: ClassTag: TypeTag](data: Seq[T])` is good enough. I feel no reason to put it in `Tuple1`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r398302492 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala ## @@ -62,13 +62,21 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { (data: Seq[T]) (f: String => Unit): Unit = withDataSourceFile(data)(f) + protected def toDF[T <: Product: ClassTag: TypeTag](data: Seq[T]): DataFrame = { Review comment: The error is ```scala Error:(45, 20) in trait ParquetTest, multiple overloaded alternatives of method withParquetDataFrame define default arguments. private[sql] trait ParquetTest extends FileBasedDataSourceTest { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r398280601 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala ## @@ -62,13 +62,21 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { (data: Seq[T]) (f: String => Unit): Unit = withDataSourceFile(data)(f) + protected def toDF[T <: Product: ClassTag: TypeTag](data: Seq[T]): DataFrame = { Review comment: I was thinking to do so, but surprisingly, overloading ```scala protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag] (data: Seq[T], testVectorized: Boolean = true) (f: DataFrame => Unit): Unit ``` and ```scala protected def withParquetDataFrame(df: DataFrame, testVectorized: Boolean = true) (f: DataFrame => Unit): Unit ``` is not allowed in Scala. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r397601474 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala ## @@ -62,13 +62,21 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { (data: Seq[T]) (f: String => Unit): Unit = withDataSourceFile(data)(f) + protected def toDF[T <: Product: ClassTag: TypeTag](data: Seq[T]): DataFrame = { +spark.createDataFrame(data) + } + /** - * Writes `data` to a Parquet file and reads it back as a [[DataFrame]], + * Writes `df` dataframe to a Parquet file and reads it back as a [[DataFrame]], * which is then passed to `f`. The Parquet file will be deleted after `f` returns. */ - protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag] - (data: Seq[T], testVectorized: Boolean = true) - (f: DataFrame => Unit): Unit = withDataSourceDataFrame(data, testVectorized)(f) + protected def withParquetDataFrame(df: DataFrame, testVectorized: Boolean = true) Review comment: It's because the original test framework takes `Seq[T]` which is very hard to programmatically to manipulate to create different favor of nested data as new test cases. See https://github.com/apache/spark/pull/27728/files#diff-43b427b8b0b4b9d8dd7e4367c0526f83R128 By taking a dataframe instead, it's very easier to create new nested data based on single level data for tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r397600752 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala ## @@ -40,12 +45,32 @@ sealed abstract class Filter { case f: Filter => f.references case _ => Array.empty } + + /** + * List of columns that are referenced by this filter. + * + * @return each element is a column name as an array of string multi-identifier + * @since 3.0.0 + */ + def V2references: Array[Array[String]] = { Review comment: done. thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r396944064 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala ## @@ -32,6 +33,7 @@ import org.apache.spark.annotation.{Evolving, Stable} sealed abstract class Filter { /** * List of columns that are referenced by this filter. + * Note that, if a column contains `dots` in name, it will be quoted to avoid confusion. Review comment: As @cloud-fan suggested, I add a conf to disable the new behavior so people can still use their old DSv1 implementation while upgrading to the new behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r395819986 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala ## @@ -32,6 +33,7 @@ import org.apache.spark.annotation.{Evolving, Stable} sealed abstract class Filter { /** * List of columns that are referenced by this filter. + * Note that, if a column contains `dots` in name, it will be quoted to avoid confusion. Review comment: I agree that some downstream datasoruce implementation might not support column name containing `dots` or nested column predicate pushdown. However, we are talking about the contract of Spark's Filter API. The implementation of the translation logics for each datasources should take care of this properly instead, and that's why we have this abstraction. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r394000137 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -652,10 +652,12 @@ object DataSourceStrategy { */ object PushableColumn { def unapply(e: Expression): Option[String] = { -def helper(e: Expression) = e match { - case a: Attribute => Some(a.name) +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper +def helper(e: Expression): Option[Seq[String]] = e match { + case a: Attribute => Some(Seq(a.name)) + case s: GetStructField => helper(s.child).map(_ :+ s.childSchema(s.ordinal).name) Review comment: Do you mean the nested predicate pushdown? It will be easy for this, but for `dots`, it's harder since if we don't add it here, we have to add it before calling the ORC filter API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet
dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet URL: https://github.com/apache/spark/pull/27728#discussion_r392507441 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala ## @@ -73,9 +74,11 @@ private[orc] object OrcFilters extends Logging { if (HiveUtils.isHive23) { DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]] } else { - val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap + val dataTypeMap = schema.map(f => quoteIfNeeded(f.name) -> f.dataType).toMap + // TODO (SPARK-25557): ORC doesn't support nested predicate pushdown, so they are removed. Review comment: ORC already supports column name with `dots`. In fact, ORC API is using the same quotation method I purposed in this PR, but we add the quotation if the column name contains dot right before calling ORC filter APIs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org