maropu commented on a change in pull request #28366:
URL: https://github.com/apache/spark/pull/28366#discussion_r417678104
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2063,16 +2063,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- val NESTED_PREDICATE_PUSHDOWN_ENABLED =
- buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled")
- .internal()
- .doc("When true, Spark tries to push down predicates for nested columns
and or names " +
- "containing `dots` to data sources. Currently, Parquet implements both
optimizations " +
- "while ORC only supports predicates for names containing `dots`. The
other data sources" +
- "don't support this feature yet.")
+ val NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST =
+ buildConf("spark.sql.optimizer.nestedPredicatePushdown.v1sourceList")
+ .internal()
+ .doc("A comma-separated list of data source short names or fully
qualified data source " +
+ "implementation class names for which Spark tries to push down
predicates for nested " +
+ "columns and or names containing `dots` to data sources. Currently,
Parquet implements " +
+ "both optimizations while ORC only supports predicates for names
containing `dots`. The " +
+ "other data sources don't support this feature yet.")
.version("3.0.0")
- .booleanConf
- .createWithDefault(true)
+ .stringConf
Review comment:
We need `.transform(_.toUpperCase(Locale.ROOT))`? Also, could we
validate input by `checkValues`? btw, does this feature cover custom data
sources except for the prebuilt ones (parquet, orc, ...)?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2063,16 +2063,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- val NESTED_PREDICATE_PUSHDOWN_ENABLED =
- buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled")
- .internal()
- .doc("When true, Spark tries to push down predicates for nested columns
and or names " +
- "containing `dots` to data sources. Currently, Parquet implements both
optimizations " +
- "while ORC only supports predicates for names containing `dots`. The
other data sources" +
- "don't support this feature yet.")
+ val NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST =
+ buildConf("spark.sql.optimizer.nestedPredicatePushdown.v1sourceList")
+ .internal()
+ .doc("A comma-separated list of data source short names or fully
qualified data source " +
+ "implementation class names for which Spark tries to push down
predicates for nested " +
+ "columns and or names containing `dots` to data sources. Currently,
Parquet implements " +
Review comment:
nit: `and or` -> `and`?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2063,16 +2063,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- val NESTED_PREDICATE_PUSHDOWN_ENABLED =
- buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled")
- .internal()
- .doc("When true, Spark tries to push down predicates for nested columns
and or names " +
- "containing `dots` to data sources. Currently, Parquet implements both
optimizations " +
- "while ORC only supports predicates for names containing `dots`. The
other data sources" +
- "don't support this feature yet.")
+ val NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST =
+ buildConf("spark.sql.optimizer.nestedPredicatePushdown.v1sourceList")
Review comment:
nit: How about `v1sourceList` -> `supportedV1Sources`?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2063,16 +2063,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- val NESTED_PREDICATE_PUSHDOWN_ENABLED =
- buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled")
- .internal()
- .doc("When true, Spark tries to push down predicates for nested columns
and or names " +
- "containing `dots` to data sources. Currently, Parquet implements both
optimizations " +
- "while ORC only supports predicates for names containing `dots`. The
other data sources" +
- "don't support this feature yet.")
+ val NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST =
+ buildConf("spark.sql.optimizer.nestedPredicatePushdown.v1sourceList")
+ .internal()
+ .doc("A comma-separated list of data source short names or fully
qualified data source " +
+ "implementation class names for which Spark tries to push down
predicates for nested " +
+ "columns and or names containing `dots` to data sources. Currently,
Parquet implements " +
+ "both optimizations while ORC only supports predicates for names
containing `dots`. The " +
+ "other data sources don't support this feature yet.")
Review comment:
How about listing up a valid set of sources like `The value can be
'parquet', 'orc', .... The default value is 'parquet,orc'.`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -68,6 +71,20 @@ object DataSourceUtils {
private[sql] def isDataFile(fileName: String) =
!(fileName.startsWith("_") || fileName.startsWith("."))
+ /**
+ * Returns if the given relation's V1 datasource provider supports nested
predicate pushdown.
+ */
+ private[sql] def supportNestedPredicatePushdown(relation: BaseRelation):
Boolean =
+ relation match {
+ case hs: HadoopFsRelation =>
+ val supportedDatasources =
+ SQLConf.get.getConf(SQLConf.NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST)
+ .toLowerCase(Locale.ROOT)
+ .split(",").map(_.trim)
Review comment:
Could we use `Utils.stringToSeq`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -529,11 +532,14 @@ object DataSourceStrategy {
* @param translatedFilterToExpr An optional map from leaf node filter
expressions to its
* translated [[Filter]]. The map is used for
rebuilding
* [[Expression]] from [[Filter]].
+ * @param nestedPredicatePushdownEnabled Whether nested predicate pushdown
is enabled. Default is
+ * disabled.
Review comment:
What does `Default is disabled` means? we should add a default value in
the argument like `nestedPredicatePushdownEnabled: Boolean = false`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -547,21 +553,31 @@ object DataSourceStrategy {
// Pushing one leg of AND down is only safe to do at the top level.
// You can see ParquetFilters' createFilter for more details.
for {
- leftFilter <- translateFilterWithMapping(left,
translatedFilterToExpr)
- rightFilter <- translateFilterWithMapping(right,
translatedFilterToExpr)
+ leftFilter <- translateFilterWithMapping(
+ left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
+ rightFilter <- translateFilterWithMapping(
+ right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
} yield sources.And(leftFilter, rightFilter)
case expressions.Or(left, right) =>
for {
- leftFilter <- translateFilterWithMapping(left,
translatedFilterToExpr)
- rightFilter <- translateFilterWithMapping(right,
translatedFilterToExpr)
+ leftFilter <- translateFilterWithMapping(
+ left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
+ rightFilter <- translateFilterWithMapping(
+ right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
} yield sources.Or(leftFilter, rightFilter)
case expressions.Not(child) =>
- translateFilterWithMapping(child,
translatedFilterToExpr).map(sources.Not)
+ translateFilterWithMapping(child, translatedFilterToExpr,
nestedPredicatePushdownEnabled)
+ .map(sources.Not)
case other =>
- val filter = translateLeafNodeFilter(other)
+ val pushableColumn = if (nestedPredicatePushdownEnabled) {
+ PushableColumnAndNestedColumn
+ } else {
+ PushableColumnWithoutNestedColumn
+ }
Review comment:
How about moving this check to the`PushableColumn` object?
```
object PushableColumn {
def apply(nestedPredicatePushdownEnabled: Boolean) = {
if (nestedPredicatePushdownEnabled) {
PushableColumnAndNestedColumn
} else {
PushableColumnWithoutNestedColumn
}
}
}
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
##########
@@ -178,8 +178,11 @@ object FileSourceStrategy extends Strategy with Logging {
// Partition keys are not available in the statistics of the files.
val dataFilters =
normalizedFiltersWithoutSubqueries.filter(_.references.intersect(partitionSet).isEmpty)
- logInfo(s"Pushed Filters: " +
-
s"${dataFilters.flatMap(DataSourceStrategy.translateFilter).mkString(",")}")
+ val supportNestedPredicatePushdown =
+ DataSourceUtils.supportNestedPredicatePushdown(fsRelation)
+ val pushedFilters = dataFilters
+ .flatMap(DataSourceStrategy.translateFilter(_,
supportNestedPredicatePushdown))
+ logInfo(s"Pushed Filters: " + s"${pushedFilters.mkString(",")}")
Review comment:
nit: `logInfo(s"Pushed Filters: ${pushedFilters.mkString(",")}")`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]