[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-26 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r398738263
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ##
 @@ -187,201 +209,273 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
   }
 
   test("filter pushdown - boolean") {
-withParquetDataFrame((true :: false :: Nil).map(b => 
Tuple1.apply(Option(b { implicit df =>
-  checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
-  checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), 
Row(false)))
-
-  checkFilterPredicate('_1 === true, classOf[Eq[_]], true)
-  checkFilterPredicate('_1 <=> true, classOf[Eq[_]], true)
-  checkFilterPredicate('_1 =!= true, classOf[NotEq[_]], false)
+val data = (true :: false :: Nil).map(b => Tuple1.apply(Option(b)))
+import testImplicits._
+withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
+  withParquetDataFrame(inputDF) { implicit df =>
+val booleanAttr = df(colName).expr
+assert(df(colName).expr.dataType === BooleanType)
+
+checkFilterPredicate(booleanAttr.isNull, classOf[Eq[_]], 
Seq.empty[Row])
+checkFilterPredicate(booleanAttr.isNotNull, classOf[NotEq[_]],
+  Seq(Row(resultFun(true)), Row(resultFun(false
+
+checkFilterPredicate(booleanAttr === true, classOf[Eq[_]], 
resultFun(true))
+checkFilterPredicate(booleanAttr <=> true, classOf[Eq[_]], 
resultFun(true))
+checkFilterPredicate(booleanAttr =!= true, classOf[NotEq[_]], 
resultFun(false))
+  }
 }
   }
 
   test("filter pushdown - tinyint") {
-withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toByte { 
implicit df =>
-  assert(df.schema.head.dataType === ByteType)
-  checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
-  checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
-
-  checkFilterPredicate('_1 === 1.toByte, classOf[Eq[_]], 1)
-  checkFilterPredicate('_1 <=> 1.toByte, classOf[Eq[_]], 1)
-  checkFilterPredicate('_1 =!= 1.toByte, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
-
-  checkFilterPredicate('_1 < 2.toByte, classOf[Lt[_]], 1)
-  checkFilterPredicate('_1 > 3.toByte, classOf[Gt[_]], 4)
-  checkFilterPredicate('_1 <= 1.toByte, classOf[LtEq[_]], 1)
-  checkFilterPredicate('_1 >= 4.toByte, classOf[GtEq[_]], 4)
-
-  checkFilterPredicate(Literal(1.toByte) === '_1, classOf[Eq[_]], 1)
-  checkFilterPredicate(Literal(1.toByte) <=> '_1, classOf[Eq[_]], 1)
-  checkFilterPredicate(Literal(2.toByte) > '_1, classOf[Lt[_]], 1)
-  checkFilterPredicate(Literal(3.toByte) < '_1, classOf[Gt[_]], 4)
-  checkFilterPredicate(Literal(1.toByte) >= '_1, classOf[LtEq[_]], 1)
-  checkFilterPredicate(Literal(4.toByte) <= '_1, classOf[GtEq[_]], 4)
-
-  checkFilterPredicate(!('_1 < 4.toByte), classOf[GtEq[_]], 4)
-  checkFilterPredicate('_1 < 2.toByte || '_1 > 3.toByte,
-classOf[Operators.Or], Seq(Row(1), Row(4)))
+val data = (1 to 4).map(i => Tuple1(Option(i.toByte)))
+import testImplicits._
+withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
+  withParquetDataFrame(inputDF) { implicit df =>
+val tinyIntAttr = df(colName).expr
+assert(df(colName).expr.dataType === ByteType)
+
+checkFilterPredicate(tinyIntAttr.isNull, classOf[Eq[_]], 
Seq.empty[Row])
+checkFilterPredicate(tinyIntAttr.isNotNull, classOf[NotEq[_]],
+  (1 to 4).map(i => Row.apply(resultFun(i
+
+checkFilterPredicate(tinyIntAttr === 1.toByte, classOf[Eq[_]], 
resultFun(1))
+checkFilterPredicate(tinyIntAttr <=> 1.toByte, classOf[Eq[_]], 
resultFun(1))
+checkFilterPredicate(tinyIntAttr =!= 1.toByte, classOf[NotEq[_]],
+  (2 to 4).map(i => Row.apply(resultFun(i
+
+checkFilterPredicate(tinyIntAttr < 2.toByte, classOf[Lt[_]], 
resultFun(1))
+checkFilterPredicate(tinyIntAttr > 3.toByte, classOf[Gt[_]], 
resultFun(4))
+checkFilterPredicate(tinyIntAttr <= 1.toByte, classOf[LtEq[_]], 
resultFun(1))
+checkFilterPredicate(tinyIntAttr >= 4.toByte, classOf[GtEq[_]], 
resultFun(4))
+
+checkFilterPredicate(Literal(1.toByte) === tinyIntAttr, 
classOf[Eq[_]], resultFun(1))
+checkFilterPredicate(Literal(1.toByte) <=> tinyIntAttr, 
classOf[Eq[_]], resultFun(1))
+checkFilterPredicate(Literal(2.toByte) > tinyIntAttr, classOf[Lt[_]], 
resultFun(1))
+checkFilterPredicate(Literal(3.toByte) < tinyIntAttr, classOf[Gt[_]], 
resultFun(4))
+checkFilterPredicate(Literal(1.toByte) >= tinyIntAttr, 
classOf[LtEq[_]], resultFun(1))
+

[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-26 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r398738263
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ##
 @@ -187,201 +209,273 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
   }
 
   test("filter pushdown - boolean") {
-withParquetDataFrame((true :: false :: Nil).map(b => 
Tuple1.apply(Option(b { implicit df =>
-  checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
-  checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), 
Row(false)))
-
-  checkFilterPredicate('_1 === true, classOf[Eq[_]], true)
-  checkFilterPredicate('_1 <=> true, classOf[Eq[_]], true)
-  checkFilterPredicate('_1 =!= true, classOf[NotEq[_]], false)
+val data = (true :: false :: Nil).map(b => Tuple1.apply(Option(b)))
+import testImplicits._
+withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
+  withParquetDataFrame(inputDF) { implicit df =>
+val booleanAttr = df(colName).expr
+assert(df(colName).expr.dataType === BooleanType)
+
+checkFilterPredicate(booleanAttr.isNull, classOf[Eq[_]], 
Seq.empty[Row])
+checkFilterPredicate(booleanAttr.isNotNull, classOf[NotEq[_]],
+  Seq(Row(resultFun(true)), Row(resultFun(false
+
+checkFilterPredicate(booleanAttr === true, classOf[Eq[_]], 
resultFun(true))
+checkFilterPredicate(booleanAttr <=> true, classOf[Eq[_]], 
resultFun(true))
+checkFilterPredicate(booleanAttr =!= true, classOf[NotEq[_]], 
resultFun(false))
+  }
 }
   }
 
   test("filter pushdown - tinyint") {
-withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toByte { 
implicit df =>
-  assert(df.schema.head.dataType === ByteType)
-  checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
-  checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
-
-  checkFilterPredicate('_1 === 1.toByte, classOf[Eq[_]], 1)
-  checkFilterPredicate('_1 <=> 1.toByte, classOf[Eq[_]], 1)
-  checkFilterPredicate('_1 =!= 1.toByte, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
-
-  checkFilterPredicate('_1 < 2.toByte, classOf[Lt[_]], 1)
-  checkFilterPredicate('_1 > 3.toByte, classOf[Gt[_]], 4)
-  checkFilterPredicate('_1 <= 1.toByte, classOf[LtEq[_]], 1)
-  checkFilterPredicate('_1 >= 4.toByte, classOf[GtEq[_]], 4)
-
-  checkFilterPredicate(Literal(1.toByte) === '_1, classOf[Eq[_]], 1)
-  checkFilterPredicate(Literal(1.toByte) <=> '_1, classOf[Eq[_]], 1)
-  checkFilterPredicate(Literal(2.toByte) > '_1, classOf[Lt[_]], 1)
-  checkFilterPredicate(Literal(3.toByte) < '_1, classOf[Gt[_]], 4)
-  checkFilterPredicate(Literal(1.toByte) >= '_1, classOf[LtEq[_]], 1)
-  checkFilterPredicate(Literal(4.toByte) <= '_1, classOf[GtEq[_]], 4)
-
-  checkFilterPredicate(!('_1 < 4.toByte), classOf[GtEq[_]], 4)
-  checkFilterPredicate('_1 < 2.toByte || '_1 > 3.toByte,
-classOf[Operators.Or], Seq(Row(1), Row(4)))
+val data = (1 to 4).map(i => Tuple1(Option(i.toByte)))
+import testImplicits._
+withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
+  withParquetDataFrame(inputDF) { implicit df =>
+val tinyIntAttr = df(colName).expr
+assert(df(colName).expr.dataType === ByteType)
+
+checkFilterPredicate(tinyIntAttr.isNull, classOf[Eq[_]], 
Seq.empty[Row])
+checkFilterPredicate(tinyIntAttr.isNotNull, classOf[NotEq[_]],
+  (1 to 4).map(i => Row.apply(resultFun(i
+
+checkFilterPredicate(tinyIntAttr === 1.toByte, classOf[Eq[_]], 
resultFun(1))
+checkFilterPredicate(tinyIntAttr <=> 1.toByte, classOf[Eq[_]], 
resultFun(1))
+checkFilterPredicate(tinyIntAttr =!= 1.toByte, classOf[NotEq[_]],
+  (2 to 4).map(i => Row.apply(resultFun(i
+
+checkFilterPredicate(tinyIntAttr < 2.toByte, classOf[Lt[_]], 
resultFun(1))
+checkFilterPredicate(tinyIntAttr > 3.toByte, classOf[Gt[_]], 
resultFun(4))
+checkFilterPredicate(tinyIntAttr <= 1.toByte, classOf[LtEq[_]], 
resultFun(1))
+checkFilterPredicate(tinyIntAttr >= 4.toByte, classOf[GtEq[_]], 
resultFun(4))
+
+checkFilterPredicate(Literal(1.toByte) === tinyIntAttr, 
classOf[Eq[_]], resultFun(1))
+checkFilterPredicate(Literal(1.toByte) <=> tinyIntAttr, 
classOf[Eq[_]], resultFun(1))
+checkFilterPredicate(Literal(2.toByte) > tinyIntAttr, classOf[Lt[_]], 
resultFun(1))
+checkFilterPredicate(Literal(3.toByte) < tinyIntAttr, classOf[Gt[_]], 
resultFun(4))
+checkFilterPredicate(Literal(1.toByte) >= tinyIntAttr, 
classOf[LtEq[_]], resultFun(1))
+

[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-26 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r398732474
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ##
 @@ -502,33 +617,39 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
   (false, DecimalType.MAX_PRECISION) // binaryWriterUsingUnscaledBytes
 ).foreach { case (legacyFormat, precision) =>
   withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> 
legacyFormat.toString) {
-val schema = StructType.fromDDL(s"a decimal($precision, 2)")
 val rdd =
   spark.sparkContext.parallelize((1 to 4).map(i => Row(new 
java.math.BigDecimal(i
-val dataFrame = spark.createDataFrame(rdd, schema)
-testDecimalPushDown(dataFrame) { implicit df =>
 
 Review comment:
   It's already removed here in this PR, 
https://github.com/apache/spark/pull/27728/files#diff-43b427b8b0b4b9d8dd7e4367c0526f83L157


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-25 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r398318993
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ##
 @@ -103,22 +107,42 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
 checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
   }
 
-  private def checkBinaryFilterPredicate
-  (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], 
expected: Seq[Row])
-  (implicit df: DataFrame): Unit = {
-def checkBinaryAnswer(df: DataFrame, expected: Seq[Row]) = {
-  assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).sorted) 
{
-
df.rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
-  }
+  /**
+   * Takes single level `inputDF` dataframe to generate multi-level nested
+   * dataframes as new test data.
+   */
+  private def withNestedDataFrame(inputDF: DataFrame)
+  (runTests: (DataFrame, String, Any => Any) => Unit): Unit = {
+assert(inputDF.schema.fields.length == 1)
+assert(!inputDF.schema.fields.head.dataType.isInstanceOf[StructType])
 
 Review comment:
   @cloud-fan schema checking in the code to avoid passing any type of 
dataframe.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-25 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r398310277
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ##
 @@ -121,43 +121,81 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
 checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
   }
 
+  /**
+   * Takes single level `inputDF` dataframe to generate multi-level nested
+   * dataframes as new test data.
+   */
+  private def withNestedDataFrame(inputDF: DataFrame)
 
 Review comment:
   Okay, this is not easy since one of the test case is like
   ```scala
   val dataFrame = spark.createDataFrame(rdd, StructType.fromDDL(s"a 
decimal($precision, 2)"))
   withNestedDataFrame(dataFrame) { case (inputDF, pushDownColName, 
resultFun) =>
 withParquetDataFrame(inputDF) { implicit df =>
   val decimalAttr: Expression = df(pushDownColName).expr
   assert(df(pushDownColName).expr.dataType === 
DecimalType(precision, 2))
   ```
   , so the dataframe can not be constructed directly from 
`withNestedDataFrame[T](data: Seq[T])`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-25 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r398317799
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ##
 @@ -103,22 +107,42 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
 checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
   }
 
-  private def checkBinaryFilterPredicate
 
 Review comment:
   @cloud-fan 
   Since `checkFilterPredicate` works for binary data in the current 
implementation, `checkBinaryFilterPredicate` can be deleted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-25 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r398314267
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
 ##
 @@ -2049,6 +2049,17 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val NESTED_PREDICATE_PUSHDOWN_ENABLED =
+buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled")
+  .internal()
+  .doc("When true, Spark tries to push down predicates for nested columns 
and or names " +
+"containing `dots` to data sources. Currently, Parquet implements both 
optimizations " +
+"while ORC only supports predicates for names containing `dots`. The 
other data sources" +
+"don't support this feature yet.")
+  .version("3.0.0")
+  .booleanConf
+  .createWithDefault(true)
 
 Review comment:
   Since the filter apis will be enhanced to support nested columns and column 
name containing `dots`, it will be nice to introduce it in a major release. 
   
   It's a good idea! We can make another PR to turn this feature on for 
specific data sources in a separate PR. This PR already grows too big.
   
   Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-25 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r398312781
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ##
 @@ -121,43 +121,81 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
 checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
   }
 
+  /**
+   * Takes single level `inputDF` dataframe to generate multi-level nested
+   * dataframes as new test data.
+   */
+  private def withNestedDataFrame(inputDF: DataFrame)
 
 Review comment:
   Instead, I add schema checking in the code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-25 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r398310277
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ##
 @@ -121,43 +121,81 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
 checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
   }
 
+  /**
+   * Takes single level `inputDF` dataframe to generate multi-level nested
+   * dataframes as new test data.
+   */
+  private def withNestedDataFrame(inputDF: DataFrame)
 
 Review comment:
   Okay, this is not easy since one of the test case is like
   ```scala
   val dataFrame = spark.createDataFrame(rdd, StructType.fromDDL(s"a 
decimal($precision, 2)"))
   withNestedDataFrame(dataFrame) { case (inputDF, pushDownColName, 
resultFun) =>
 withParquetDataFrame(inputDF) { implicit df =>
   val decimalAttr: Expression = df(pushDownColName).expr
   assert(df(pushDownColName).expr.dataType === 
DecimalType(precision, 2))
   ```
   , and the dataframe can not be constructed directly from 
`withNestedDataFrame[T](data: Seq[T])`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-25 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r398304484
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ##
 @@ -121,43 +121,81 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
 checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
   }
 
+  /**
+   * Takes single level `inputDF` dataframe to generate multi-level nested
+   * dataframes as new test data.
+   */
+  private def withNestedDataFrame(inputDF: DataFrame)
 
 Review comment:
   Sounds good.
   
   I think ` private def withNestedDataFrame[T <: Product: ClassTag: 
TypeTag](data: Seq[T])` is good enough. I feel no reason to put it in `Tuple1`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-25 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r398302492
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
 ##
 @@ -62,13 +62,21 @@ private[sql] trait ParquetTest extends 
FileBasedDataSourceTest {
   (data: Seq[T])
   (f: String => Unit): Unit = withDataSourceFile(data)(f)
 
+  protected def toDF[T <: Product: ClassTag: TypeTag](data: Seq[T]): DataFrame 
= {
 
 Review comment:
   The error is
   ```scala
   Error:(45, 20) in trait ParquetTest, multiple overloaded alternatives of 
method withParquetDataFrame define default arguments.
   private[sql] trait ParquetTest extends FileBasedDataSourceTest {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-25 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r398280601
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
 ##
 @@ -62,13 +62,21 @@ private[sql] trait ParquetTest extends 
FileBasedDataSourceTest {
   (data: Seq[T])
   (f: String => Unit): Unit = withDataSourceFile(data)(f)
 
+  protected def toDF[T <: Product: ClassTag: TypeTag](data: Seq[T]): DataFrame 
= {
 
 Review comment:
   I was thinking to do so, but surprisingly, overloading 
   ```scala
   protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag]
 (data: Seq[T], testVectorized: Boolean = true)
 (f: DataFrame => Unit): Unit
   ```
   and 
   ```scala
   protected def withParquetDataFrame(df: DataFrame, testVectorized: Boolean = 
true)
  (f: DataFrame => Unit): Unit
   ```
   is not allowed in Scala.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-24 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r397601474
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
 ##
 @@ -62,13 +62,21 @@ private[sql] trait ParquetTest extends 
FileBasedDataSourceTest {
   (data: Seq[T])
   (f: String => Unit): Unit = withDataSourceFile(data)(f)
 
+  protected def toDF[T <: Product: ClassTag: TypeTag](data: Seq[T]): DataFrame 
= {
+spark.createDataFrame(data)
+  }
+
   /**
-   * Writes `data` to a Parquet file and reads it back as a [[DataFrame]],
+   * Writes `df` dataframe to a Parquet file and reads it back as a 
[[DataFrame]],
* which is then passed to `f`. The Parquet file will be deleted after `f` 
returns.
*/
-  protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag]
-  (data: Seq[T], testVectorized: Boolean = true)
-  (f: DataFrame => Unit): Unit = withDataSourceDataFrame(data, 
testVectorized)(f)
+  protected def withParquetDataFrame(df: DataFrame, testVectorized: Boolean = 
true)
 
 Review comment:
   It's because the original test framework takes `Seq[T]` which is very hard 
to programmatically to manipulate to create different favor of nested data as 
new test cases. See  
   
https://github.com/apache/spark/pull/27728/files#diff-43b427b8b0b4b9d8dd7e4367c0526f83R128
   
   By taking a dataframe instead, it's very easier to create new nested data 
based on single level data for tests. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-24 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r397600752
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
 ##
 @@ -40,12 +45,32 @@ sealed abstract class Filter {
 case f: Filter => f.references
 case _ => Array.empty
   }
+
+  /**
+   * List of columns that are referenced by this filter.
+   *
+   * @return each element is a column name as an array of string 
multi-identifier
+   * @since 3.0.0
+   */
+  def V2references: Array[Array[String]] = {
 
 Review comment:
   done. thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-24 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r396944064
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
 ##
 @@ -32,6 +33,7 @@ import org.apache.spark.annotation.{Evolving, Stable}
 sealed abstract class Filter {
   /**
* List of columns that are referenced by this filter.
+   * Note that, if a column contains `dots` in name, it will be quoted to 
avoid confusion.
 
 Review comment:
   As @cloud-fan suggested, I add a conf to disable the new behavior so people 
can still use their old DSv1 implementation while upgrading to the new 
behavior. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-20 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r395819986
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
 ##
 @@ -32,6 +33,7 @@ import org.apache.spark.annotation.{Evolving, Stable}
 sealed abstract class Filter {
   /**
* List of columns that are referenced by this filter.
+   * Note that, if a column contains `dots` in name, it will be quoted to 
avoid confusion.
 
 Review comment:
   I agree that some downstream datasoruce implementation might not support 
column name containing `dots` or nested column predicate pushdown. However, we 
are talking about the contract of Spark's Filter API. The implementation of the 
translation logics for each datasources should take care of this properly 
instead, and that's why we have this abstraction. WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-17 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r394000137
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -652,10 +652,12 @@ object DataSourceStrategy {
  */
 object PushableColumn {
   def unapply(e: Expression): Option[String] = {
-def helper(e: Expression) = e match {
-  case a: Attribute => Some(a.name)
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+def helper(e: Expression): Option[Seq[String]] = e match {
+  case a: Attribute => Some(Seq(a.name))
+  case s: GetStructField => helper(s.child).map(_ :+ 
s.childSchema(s.ordinal).name)
 
 Review comment:
   Do you mean the nested predicate pushdown? It will be easy for this, but for 
`dots`, it's harder since if we don't add it here, we have to add it before 
calling the ORC filter API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dbtsai commented on a change in pull request #27728: [SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet

2020-03-13 Thread GitBox
dbtsai commented on a change in pull request #27728: 
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested 
Column Predicate Pushdown for Parquet
URL: https://github.com/apache/spark/pull/27728#discussion_r392507441
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
 ##
 @@ -73,9 +74,11 @@ private[orc] object OrcFilters extends Logging {
 if (HiveUtils.isHive23) {
   DatasourceOrcFilters.createFilter(schema, 
filters).asInstanceOf[Option[SearchArgument]]
 } else {
-  val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
+  val dataTypeMap = schema.map(f => quoteIfNeeded(f.name) -> 
f.dataType).toMap
+  // TODO (SPARK-25557): ORC doesn't support nested predicate pushdown, so 
they are removed.
 
 Review comment:
   ORC already supports column name with `dots`. In fact, ORC API is using the 
same quotation method I purposed in this PR, but we add the quotation if the 
column name contains dot right before calling ORC filter APIs. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org