viirya commented on a change in pull request #29950:
URL: https://github.com/apache/spark/pull/29950#discussion_r503665670



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -760,12 +756,43 @@ object CollapseProject extends Rule[LogicalPlan] {
       s.copy(child = p2.copy(projectList = buildCleanedProjectList(l1, 
p2.projectList)))
   }
 
+  private def collapseProjects(plan: LogicalPlan): LogicalPlan = plan match {
+    case p1 @ Project(_, p2: Project) =>
+      val maxCommonExprs = SQLConf.get.maxCommonExprsInCollapseProject
+
+      if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) ||
+          getLargestNumOfCommonOutput(p1.projectList, p2.projectList) > 
maxCommonExprs) {
+        p1
+      } else {
+        collapseProjects(
+          p2.copy(projectList = buildCleanedProjectList(p1.projectList, 
p2.projectList)))
+      }
+    case _ => plan
+  }
+
   private def collectAliases(projectList: Seq[NamedExpression]): 
AttributeMap[Alias] = {
     AttributeMap(projectList.collect {
       case a: Alias => a.toAttribute -> a
     })
   }
 
+  // Counts for the largest times common outputs from lower operator are used 
in upper operators.
+  private def getLargestNumOfCommonOutput(

Review comment:
       Two places looks similar however the parameters are slightly different. 
We can make them share same code, but the code lines are just few and 
refactoring needs more change, so seems not worth to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to