Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/20476#discussion_r165437683
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
---
@@ -81,35 +81,34 @@ object PushDownOperatorsToDataSource extends
Rule[LogicalPlan] with PredicateHel
// TODO: add more push down rules.
- // TODO: nested fields pruning
- def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent:
Seq[Attribute]): Unit = {
- plan match {
- case Project(projectList, child) =>
- val required =
projectList.filter(requiredByParent.contains).flatMap(_.references)
- pushDownRequiredColumns(child, required)
-
- case Filter(condition, child) =>
- val required = requiredByParent ++ condition.references
- pushDownRequiredColumns(child, required)
-
- case DataSourceV2Relation(fullOutput, reader) => reader match {
- case r: SupportsPushDownRequiredColumns =>
- // Match original case of attributes.
- val attrMap = AttributeMap(fullOutput.zip(fullOutput))
- val requiredColumns = requiredByParent.map(attrMap)
- r.pruneColumns(requiredColumns.toStructType)
- case _ =>
- }
+ pushDownRequiredColumns(filterPushed, filterPushed.outputSet)
+ // After column pruning, we may have redundant PROJECT nodes in the
query plan, remove them.
+ RemoveRedundantProject(filterPushed)
+ }
+
+ // TODO: nested fields pruning
+ private def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent:
AttributeSet): Unit = {
+ plan match {
+ case Project(projectList, child) =>
+ val required = projectList.flatMap(_.references)
+ pushDownRequiredColumns(child, AttributeSet(required))
+
+ case Filter(condition, child) =>
+ val required = requiredByParent ++ condition.references
+ pushDownRequiredColumns(child, required)
- // TODO: there may be more operators can be used to calculate
required columns, we can add
- // more and more in the future.
- case _ => plan.children.foreach(child =>
pushDownRequiredColumns(child, child.output))
+ case relation: DataSourceV2Relation => relation.reader match {
+ case reader: SupportsPushDownRequiredColumns =>
+ val requiredColumns =
relation.output.filter(requiredByParent.contains)
+ reader.pruneColumns(requiredColumns.toStructType)
+
+ case _ =>
}
- }
- pushDownRequiredColumns(filterPushed, filterPushed.output)
- // After column pruning, we may have redundant PROJECT nodes in the
query plan, remove them.
- RemoveRedundantProject(filterPushed)
+ // TODO: there may be more operators can be used to calculate
required columns, we can add
+ // more and more in the future.
--- End diff --
Nit. `there may be more operators that can be used to calculate the
required columns. We can add more and more in the future.`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]