maropu commented on a change in pull request #28366:
URL: https://github.com/apache/spark/pull/28366#discussion_r417678104



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2063,16 +2063,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val NESTED_PREDICATE_PUSHDOWN_ENABLED =
-    buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled")
-      .internal()
-      .doc("When true, Spark tries to push down predicates for nested columns 
and or names " +
-        "containing `dots` to data sources. Currently, Parquet implements both 
optimizations " +
-        "while ORC only supports predicates for names containing `dots`. The 
other data sources" +
-        "don't support this feature yet.")
+  val NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST =
+    buildConf("spark.sql.optimizer.nestedPredicatePushdown.v1sourceList")
+      .internal()
+      .doc("A comma-separated list of data source short names or fully 
qualified data source " +
+        "implementation class names for which Spark tries to push down 
predicates for nested " +
+        "columns and or names containing `dots` to data sources. Currently, 
Parquet implements " +
+        "both optimizations while ORC only supports predicates for names 
containing `dots`. The " +
+        "other data sources don't support this feature yet.")
       .version("3.0.0")
-      .booleanConf
-      .createWithDefault(true)
+      .stringConf

Review comment:
       We need `.transform(_.toUpperCase(Locale.ROOT))`? Also, could we 
validate input by `checkValues`? btw, does this feature cover custom data 
sources except for the prebuilt ones (parquet, orc, ...)?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2063,16 +2063,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val NESTED_PREDICATE_PUSHDOWN_ENABLED =
-    buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled")
-      .internal()
-      .doc("When true, Spark tries to push down predicates for nested columns 
and or names " +
-        "containing `dots` to data sources. Currently, Parquet implements both 
optimizations " +
-        "while ORC only supports predicates for names containing `dots`. The 
other data sources" +
-        "don't support this feature yet.")
+  val NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST =
+    buildConf("spark.sql.optimizer.nestedPredicatePushdown.v1sourceList")
+      .internal()
+      .doc("A comma-separated list of data source short names or fully 
qualified data source " +
+        "implementation class names for which Spark tries to push down 
predicates for nested " +
+        "columns and or names containing `dots` to data sources. Currently, 
Parquet implements " +

Review comment:
       nit: `and or` -> `and`?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2063,16 +2063,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val NESTED_PREDICATE_PUSHDOWN_ENABLED =
-    buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled")
-      .internal()
-      .doc("When true, Spark tries to push down predicates for nested columns 
and or names " +
-        "containing `dots` to data sources. Currently, Parquet implements both 
optimizations " +
-        "while ORC only supports predicates for names containing `dots`. The 
other data sources" +
-        "don't support this feature yet.")
+  val NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST =
+    buildConf("spark.sql.optimizer.nestedPredicatePushdown.v1sourceList")

Review comment:
       nit: How about `v1sourceList` -> `supportedV1Sources`?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2063,16 +2063,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val NESTED_PREDICATE_PUSHDOWN_ENABLED =
-    buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled")
-      .internal()
-      .doc("When true, Spark tries to push down predicates for nested columns 
and or names " +
-        "containing `dots` to data sources. Currently, Parquet implements both 
optimizations " +
-        "while ORC only supports predicates for names containing `dots`. The 
other data sources" +
-        "don't support this feature yet.")
+  val NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST =
+    buildConf("spark.sql.optimizer.nestedPredicatePushdown.v1sourceList")
+      .internal()
+      .doc("A comma-separated list of data source short names or fully 
qualified data source " +
+        "implementation class names for which Spark tries to push down 
predicates for nested " +
+        "columns and or names containing `dots` to data sources. Currently, 
Parquet implements " +
+        "both optimizations while ORC only supports predicates for names 
containing `dots`. The " +
+        "other data sources don't support this feature yet.")

Review comment:
       How about listing up a valid set of sources like `The value can be 
'parquet', 'orc', .... The default value is 'parquet,orc'.`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -68,6 +71,20 @@ object DataSourceUtils {
   private[sql] def isDataFile(fileName: String) =
     !(fileName.startsWith("_") || fileName.startsWith("."))
 
+  /**
+   * Returns if the given relation's V1 datasource provider supports nested 
predicate pushdown.
+   */
+  private[sql] def supportNestedPredicatePushdown(relation: BaseRelation): 
Boolean =
+    relation match {
+      case hs: HadoopFsRelation =>
+        val supportedDatasources =
+          SQLConf.get.getConf(SQLConf.NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST)
+            .toLowerCase(Locale.ROOT)
+            .split(",").map(_.trim)

Review comment:
       Could we use `Utils.stringToSeq`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -529,11 +532,14 @@ object DataSourceStrategy {
    * @param translatedFilterToExpr An optional map from leaf node filter 
expressions to its
    *                               translated [[Filter]]. The map is used for 
rebuilding
    *                               [[Expression]] from [[Filter]].
+   * @param nestedPredicatePushdownEnabled Whether nested predicate pushdown 
is enabled. Default is
+   *                                       disabled.

Review comment:
       What does `Default is disabled` means? we should add a default value in 
the argument like `nestedPredicatePushdownEnabled: Boolean = false`? 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -547,21 +553,31 @@ object DataSourceStrategy {
         // Pushing one leg of AND down is only safe to do at the top level.
         // You can see ParquetFilters' createFilter for more details.
         for {
-          leftFilter <- translateFilterWithMapping(left, 
translatedFilterToExpr)
-          rightFilter <- translateFilterWithMapping(right, 
translatedFilterToExpr)
+          leftFilter <- translateFilterWithMapping(
+            left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
+          rightFilter <- translateFilterWithMapping(
+            right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
         } yield sources.And(leftFilter, rightFilter)
 
       case expressions.Or(left, right) =>
         for {
-          leftFilter <- translateFilterWithMapping(left, 
translatedFilterToExpr)
-          rightFilter <- translateFilterWithMapping(right, 
translatedFilterToExpr)
+          leftFilter <- translateFilterWithMapping(
+            left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
+          rightFilter <- translateFilterWithMapping(
+            right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
         } yield sources.Or(leftFilter, rightFilter)
 
       case expressions.Not(child) =>
-        translateFilterWithMapping(child, 
translatedFilterToExpr).map(sources.Not)
+        translateFilterWithMapping(child, translatedFilterToExpr, 
nestedPredicatePushdownEnabled)
+          .map(sources.Not)
 
       case other =>
-        val filter = translateLeafNodeFilter(other)
+        val pushableColumn = if (nestedPredicatePushdownEnabled) {
+          PushableColumnAndNestedColumn
+        } else {
+          PushableColumnWithoutNestedColumn
+        }

Review comment:
       How about moving this check to the`PushableColumn` object?
   ```
   object PushableColumn {
     def apply(nestedPredicatePushdownEnabled: Boolean) = {
           if (nestedPredicatePushdownEnabled) {
             PushableColumnAndNestedColumn
           } else {
             PushableColumnWithoutNestedColumn
           }
     }
   }
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
##########
@@ -178,8 +178,11 @@ object FileSourceStrategy extends Strategy with Logging {
       // Partition keys are not available in the statistics of the files.
       val dataFilters =
         
normalizedFiltersWithoutSubqueries.filter(_.references.intersect(partitionSet).isEmpty)
-      logInfo(s"Pushed Filters: " +
-        
s"${dataFilters.flatMap(DataSourceStrategy.translateFilter).mkString(",")}")
+      val supportNestedPredicatePushdown =
+        DataSourceUtils.supportNestedPredicatePushdown(fsRelation)
+      val pushedFilters = dataFilters
+        .flatMap(DataSourceStrategy.translateFilter(_, 
supportNestedPredicatePushdown))
+      logInfo(s"Pushed Filters: " + s"${pushedFilters.mkString(",")}")

Review comment:
       nit: `logInfo(s"Pushed Filters: ${pushedFilters.mkString(",")}")`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to