viirya commented on a change in pull request #30659:
URL: https://github.com/apache/spark/pull/30659#discussion_r541532973
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala
##########
@@ -61,15 +63,24 @@ object RemoveRedundantProjects extends Rule[SparkPlan] {
val keepOrdering = a.aggregateExpressions
.exists(ae => ae.mode.equals(Final) || ae.mode.equals(PartialMerge))
a.mapChildren(removeProject(_, keepOrdering))
- // GenerateExec requires column ordering since it binds input rows
directly with its
- // requiredChildOutput without using child's output schema.
- case g: GenerateExec => g.mapChildren(removeProject(_, true))
- // JoinExec ordering requirement will inherit from its parent. If there
is no ProjectExec in
- // its ancestors, JoinExec should require output columns to be ordered.
- case o => o.mapChildren(removeProject(_, requireOrdering))
+ case p if canPassThrough(p) => p.mapChildren(removeProject(_,
requireOrdering))
+ case o => o.mapChildren(removeProject(_, requireOrdering = true))
Review comment:
Or maybe:
```scala
private def getOrderingRequirement(plan: SparkPlan, requireAtParents:
Boolean): Boolean = plan match {
case _: FilterExec | BaseJoinExec | WindowExec | ExpandExec =>
requireAtParents
case _ => true
}
```
```scala
case o => o.mapChildren(removeProject(_, requireOrdering =
getOrderingRequirement(o, requireOrdering)))
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala
##########
@@ -61,15 +63,24 @@ object RemoveRedundantProjects extends Rule[SparkPlan] {
val keepOrdering = a.aggregateExpressions
.exists(ae => ae.mode.equals(Final) || ae.mode.equals(PartialMerge))
a.mapChildren(removeProject(_, keepOrdering))
- // GenerateExec requires column ordering since it binds input rows
directly with its
- // requiredChildOutput without using child's output schema.
- case g: GenerateExec => g.mapChildren(removeProject(_, true))
- // JoinExec ordering requirement will inherit from its parent. If there
is no ProjectExec in
- // its ancestors, JoinExec should require output columns to be ordered.
- case o => o.mapChildren(removeProject(_, requireOrdering))
+ case p if canPassThrough(p) => p.mapChildren(removeProject(_,
requireOrdering))
+ case o => o.mapChildren(removeProject(_, requireOrdering = true))
Review comment:
Seems we can combine two cases?
```scala
case o =>
// See if we should pass ordering requirement from the parents.
val required = if (canPassThrough(p)) {
requireOrdering
} else {
true
}
o.mapChildren(removeProject(_, requireOrdering = required))
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]