Ngone51 commented on a change in pull request #32590:
URL: https://github.com/apache/spark/pull/32590#discussion_r635896472



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
##########
@@ -67,61 +81,78 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
    *         all relations of the new LogicalPlan )
    */
   private def renewDuplicatedRelations(
-      existingRelations: Seq[MultiInstanceRelation],
-      plan: LogicalPlan): (LogicalPlan, Seq[MultiInstanceRelation]) = plan 
match {
-    case p: LogicalPlan if p.isStreaming => (plan, Nil)
+      existingRelations: mutable.HashSet[ReferenceEqualPlanWrapper],
+      plan: LogicalPlan)
+    : (LogicalPlan, mutable.HashSet[ReferenceEqualPlanWrapper], Boolean) = 
plan match {
+    case p: LogicalPlan if p.isStreaming => (plan, mutable.HashSet.empty, 
false)
 
     case m: MultiInstanceRelation =>
-      if (isDuplicated(existingRelations, m)) {
+      val planWrapper = ReferenceEqualPlanWrapper(m)
+      if (isDuplicated(existingRelations, planWrapper)) {
         val newNode = m.newInstance()
         newNode.copyTagsFrom(m)
-        (newNode, Nil)
+        (newNode, mutable.HashSet.empty, true)
       } else {
-        (m, Seq(m))
+        val mWrapper = new mutable.HashSet[ReferenceEqualPlanWrapper]()
+        mWrapper.add(planWrapper)
+        (m, mWrapper, false)
       }
 
     case plan: LogicalPlan =>
-      val relations = ArrayBuffer.empty[MultiInstanceRelation]
+      val relations = new mutable.HashSet[ReferenceEqualPlanWrapper]()
+      var planChanged = false
       val newPlan = if (plan.children.nonEmpty) {
-        val newChildren = ArrayBuffer.empty[LogicalPlan]
+        val newChildren = mutable.ArrayBuffer.empty[LogicalPlan]
         for (c <- plan.children) {
-          val (renewed, collected) = 
renewDuplicatedRelations(existingRelations ++ relations, c)
+          val (renewed, collected, changed) =
+            renewDuplicatedRelations(existingRelations ++ relations, c)
           newChildren += renewed
           relations ++= collected
+          if (changed) {
+            planChanged = true
+          }
         }
 
-        if (plan.childrenResolved) {
-          val attrMap = AttributeMap(
-            plan
-              .children
-              .flatMap(_.output).zip(newChildren.flatMap(_.output))
-              .filter { case (a1, a2) => a1.exprId != a2.exprId }
-          )
-          plan.withNewChildren(newChildren.toSeq).rewriteAttrs(attrMap)
+        if (planChanged) {
+          if (plan.childrenResolved) {
+            val planWithNewChildren = plan.withNewChildren(newChildren.toSeq)
+            val attrMap = AttributeMap(
+              plan
+                .children
+                .flatMap(_.output).zip(newChildren.flatMap(_.output))
+                .filter { case (a1, a2) => a1.exprId != a2.exprId }
+            )
+            if (attrMap.isEmpty) {
+              planWithNewChildren
+            } else {
+              planWithNewChildren.rewriteAttrs(attrMap)
+            }
+          } else {
+            plan.withNewChildren(newChildren.toSeq)
+          }
         } else {
-          plan.withNewChildren(newChildren.toSeq)
+          plan
         }
       } else {
         plan
       }
 
       val planWithNewSubquery = newPlan.transformExpressions {
         case subquery: SubqueryExpression =>
-          val (renewed, collected) = renewDuplicatedRelations(
+          val (renewed, collected, changed) = renewDuplicatedRelations(
             existingRelations ++ relations, subquery.plan)
           relations ++= collected
+          if (changed) planChanged = true
           subquery.withNewPlan(renewed)
       }
-      (planWithNewSubquery, relations.toSeq)
+      (planWithNewSubquery, relations, planChanged)
   }
 
+  @inline

Review comment:
       > This annotation can also reduce the running time? (The PR description 
says nothing though)
   
   Not really. It's actually an experienced change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to