maropu commented on a change in pull request #32590:
URL: https://github.com/apache/spark/pull/32590#discussion_r635668680



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
##########
@@ -17,16 +17,30 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap, 
AttributeSet, NamedExpression, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.AlwaysProcess
 
+/**
+ * A help class that used to detect duplicate relations fast in 
`DeduplicateRelations`

Review comment:
       nit: That might be, `A helper class used to...`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
##########
@@ -67,61 +81,78 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
    *         all relations of the new LogicalPlan )
    */
   private def renewDuplicatedRelations(
-      existingRelations: Seq[MultiInstanceRelation],
-      plan: LogicalPlan): (LogicalPlan, Seq[MultiInstanceRelation]) = plan 
match {
-    case p: LogicalPlan if p.isStreaming => (plan, Nil)
+      existingRelations: mutable.HashSet[ReferenceEqualPlanWrapper],
+      plan: LogicalPlan)
+    : (LogicalPlan, mutable.HashSet[ReferenceEqualPlanWrapper], Boolean) = 
plan match {
+    case p: LogicalPlan if p.isStreaming => (plan, mutable.HashSet.empty, 
false)
 
     case m: MultiInstanceRelation =>
-      if (isDuplicated(existingRelations, m)) {
+      val planWrapper = ReferenceEqualPlanWrapper(m)
+      if (isDuplicated(existingRelations, planWrapper)) {
         val newNode = m.newInstance()
         newNode.copyTagsFrom(m)
-        (newNode, Nil)
+        (newNode, mutable.HashSet.empty, true)
       } else {
-        (m, Seq(m))
+        val mWrapper = new mutable.HashSet[ReferenceEqualPlanWrapper]()
+        mWrapper.add(planWrapper)
+        (m, mWrapper, false)
       }
 
     case plan: LogicalPlan =>
-      val relations = ArrayBuffer.empty[MultiInstanceRelation]
+      val relations = new mutable.HashSet[ReferenceEqualPlanWrapper]()
+      var planChanged = false
       val newPlan = if (plan.children.nonEmpty) {
-        val newChildren = ArrayBuffer.empty[LogicalPlan]
+        val newChildren = mutable.ArrayBuffer.empty[LogicalPlan]
         for (c <- plan.children) {
-          val (renewed, collected) = 
renewDuplicatedRelations(existingRelations ++ relations, c)
+          val (renewed, collected, changed) =
+            renewDuplicatedRelations(existingRelations ++ relations, c)
           newChildren += renewed
           relations ++= collected
+          if (changed) {
+            planChanged = true
+          }
         }
 
-        if (plan.childrenResolved) {
-          val attrMap = AttributeMap(
-            plan
-              .children
-              .flatMap(_.output).zip(newChildren.flatMap(_.output))
-              .filter { case (a1, a2) => a1.exprId != a2.exprId }
-          )
-          plan.withNewChildren(newChildren.toSeq).rewriteAttrs(attrMap)
+        if (planChanged) {
+          if (plan.childrenResolved) {
+            val planWithNewChildren = plan.withNewChildren(newChildren.toSeq)
+            val attrMap = AttributeMap(
+              plan
+                .children
+                .flatMap(_.output).zip(newChildren.flatMap(_.output))
+                .filter { case (a1, a2) => a1.exprId != a2.exprId }
+            )
+            if (attrMap.isEmpty) {
+              planWithNewChildren
+            } else {
+              planWithNewChildren.rewriteAttrs(attrMap)
+            }
+          } else {
+            plan.withNewChildren(newChildren.toSeq)
+          }
         } else {
-          plan.withNewChildren(newChildren.toSeq)
+          plan
         }
       } else {
         plan
       }
 
       val planWithNewSubquery = newPlan.transformExpressions {
         case subquery: SubqueryExpression =>
-          val (renewed, collected) = renewDuplicatedRelations(
+          val (renewed, collected, changed) = renewDuplicatedRelations(
             existingRelations ++ relations, subquery.plan)
           relations ++= collected
+          if (changed) planChanged = true
           subquery.withNewPlan(renewed)
       }
-      (planWithNewSubquery, relations.toSeq)
+      (planWithNewSubquery, relations, planChanged)
   }
 
+  @inline

Review comment:
       This annotation can also reduce the running time? (The PR description 
say nothing though)

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
##########
@@ -17,16 +17,30 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap, 
AttributeSet, NamedExpression, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.AlwaysProcess
 
+/**
+ * A help class that used to detect duplicate relations fast in 
`DeduplicateRelations`
+ */
+case class ReferenceEqualPlanWrapper(plan: LogicalPlan) {
+  private val _hashCode = System.identityHashCode(plan)

Review comment:
       Ah, I see. The java library does't have `IdentityHasSet`, even though it 
has `IdentityHasMap`...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to