cloud-fan commented on a change in pull request #35214:
URL: https://github.com/apache/spark/pull/35214#discussion_r789694623



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1322,6 +1322,25 @@ object CombineUnions extends Rule[LogicalPlan] {
         case Union(children, byName, allowMissingCol)
             if byName == topByName && allowMissingCol == topAllowMissingCol =>
           stack.pushAll(children.reverse)
+        // Push down projection and then push pushed plan to Stack if there is 
a Project.
+        case Project(projectList, Distinct(u @ Union(children, byName, 
allowMissingCol)))
+            if projectList.forall(_.deterministic) && children.nonEmpty &&
+              flattenDistinct && byName == topByName && allowMissingCol == 
topAllowMissingCol =>
+          val newChildren = 
PushProjectionThroughUnion.pushProjectionThroughUnion(projectList, u)
+            .map(CollapseProject(_))
+          stack.pushAll(newChildren.reverse)
+        case Project(projectList, Deduplicate(keys: Seq[Attribute], u: Union))
+            if projectList.forall(_.deterministic) && flattenDistinct && 
u.byName == topByName &&
+              u.allowMissingCol == topAllowMissingCol && AttributeSet(keys) == 
u.outputSet =>
+          val newChildren = 
PushProjectionThroughUnion.pushProjectionThroughUnion(projectList, u)
+            .map(CollapseProject(_))
+          stack.pushAll(newChildren.reverse)
+        case Project(projectList, u @ Union(children, byName, allowMissingCol))
+            if projectList.forall(_.deterministic) && children.nonEmpty &&
+              byName == topByName && allowMissingCol == topAllowMissingCol =>
+          val newChildren = 
PushProjectionThroughUnion.pushProjectionThroughUnion(projectList, u)
+            .map(CollapseProject(_))

Review comment:
       I think it's a bit overkill to run the entire `CollapseProject` rule. It 
also makes it harder to understand the algorithm here. It's actually a simple 
algorithm: When we hit `Project Chain(Union(A, B))`, we put `Project Chain(A)` 
and `Project Chain(B)` back to the stack and repeat this process. In the end, 
we will union a bunch of plans with a very deep `Project` chain. To be more 
efficient, we should collapse projects during this process.
   
   The code can be
   ```
   stack.pop() match {
     case p1 @ Project(_, p2: Project) if 
CollapseProject.canCollapseExpressions(p1, p2) =>
       stack.push(p2.copy(projectList = 
CollapseProject.buildCleanedProjectList(p1.projectList, p2.projectList)))
     case Union(children, ...) => stack.pushAll(children.reverse)
     case Project(list, Union(children, ...)) if list.forall(_.deterministic) =>
       stack.pushAll(children.reverse.map(Project(list, _)))
     ... // handle distinct and deduplicate
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to