maropu commented on a change in pull request #32590:
URL: https://github.com/apache/spark/pull/32590#discussion_r635668680
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
##########
@@ -17,16 +17,30 @@
package org.apache.spark.sql.catalyst.analysis
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap,
AttributeSet, NamedExpression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.AlwaysProcess
+/**
+ * A help class that used to detect duplicate relations fast in
`DeduplicateRelations`
Review comment:
nit: That might be, `A helper class used to...`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
##########
@@ -67,61 +81,78 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
* all relations of the new LogicalPlan )
*/
private def renewDuplicatedRelations(
- existingRelations: Seq[MultiInstanceRelation],
- plan: LogicalPlan): (LogicalPlan, Seq[MultiInstanceRelation]) = plan
match {
- case p: LogicalPlan if p.isStreaming => (plan, Nil)
+ existingRelations: mutable.HashSet[ReferenceEqualPlanWrapper],
+ plan: LogicalPlan)
+ : (LogicalPlan, mutable.HashSet[ReferenceEqualPlanWrapper], Boolean) =
plan match {
+ case p: LogicalPlan if p.isStreaming => (plan, mutable.HashSet.empty,
false)
case m: MultiInstanceRelation =>
- if (isDuplicated(existingRelations, m)) {
+ val planWrapper = ReferenceEqualPlanWrapper(m)
+ if (isDuplicated(existingRelations, planWrapper)) {
val newNode = m.newInstance()
newNode.copyTagsFrom(m)
- (newNode, Nil)
+ (newNode, mutable.HashSet.empty, true)
} else {
- (m, Seq(m))
+ val mWrapper = new mutable.HashSet[ReferenceEqualPlanWrapper]()
+ mWrapper.add(planWrapper)
+ (m, mWrapper, false)
}
case plan: LogicalPlan =>
- val relations = ArrayBuffer.empty[MultiInstanceRelation]
+ val relations = new mutable.HashSet[ReferenceEqualPlanWrapper]()
+ var planChanged = false
val newPlan = if (plan.children.nonEmpty) {
- val newChildren = ArrayBuffer.empty[LogicalPlan]
+ val newChildren = mutable.ArrayBuffer.empty[LogicalPlan]
for (c <- plan.children) {
- val (renewed, collected) =
renewDuplicatedRelations(existingRelations ++ relations, c)
+ val (renewed, collected, changed) =
+ renewDuplicatedRelations(existingRelations ++ relations, c)
newChildren += renewed
relations ++= collected
+ if (changed) {
+ planChanged = true
+ }
}
- if (plan.childrenResolved) {
- val attrMap = AttributeMap(
- plan
- .children
- .flatMap(_.output).zip(newChildren.flatMap(_.output))
- .filter { case (a1, a2) => a1.exprId != a2.exprId }
- )
- plan.withNewChildren(newChildren.toSeq).rewriteAttrs(attrMap)
+ if (planChanged) {
+ if (plan.childrenResolved) {
+ val planWithNewChildren = plan.withNewChildren(newChildren.toSeq)
+ val attrMap = AttributeMap(
+ plan
+ .children
+ .flatMap(_.output).zip(newChildren.flatMap(_.output))
+ .filter { case (a1, a2) => a1.exprId != a2.exprId }
+ )
+ if (attrMap.isEmpty) {
+ planWithNewChildren
+ } else {
+ planWithNewChildren.rewriteAttrs(attrMap)
+ }
+ } else {
+ plan.withNewChildren(newChildren.toSeq)
+ }
} else {
- plan.withNewChildren(newChildren.toSeq)
+ plan
}
} else {
plan
}
val planWithNewSubquery = newPlan.transformExpressions {
case subquery: SubqueryExpression =>
- val (renewed, collected) = renewDuplicatedRelations(
+ val (renewed, collected, changed) = renewDuplicatedRelations(
existingRelations ++ relations, subquery.plan)
relations ++= collected
+ if (changed) planChanged = true
subquery.withNewPlan(renewed)
}
- (planWithNewSubquery, relations.toSeq)
+ (planWithNewSubquery, relations, planChanged)
}
+ @inline
Review comment:
This annotation can also reduce the running time? (The PR description
say nothing though)
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
##########
@@ -17,16 +17,30 @@
package org.apache.spark.sql.catalyst.analysis
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap,
AttributeSet, NamedExpression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.AlwaysProcess
+/**
+ * A help class that used to detect duplicate relations fast in
`DeduplicateRelations`
+ */
+case class ReferenceEqualPlanWrapper(plan: LogicalPlan) {
+ private val _hashCode = System.identityHashCode(plan)
Review comment:
Ah, I see. The java library does't have `IdentityHasSet`, even though it
has `IdentityHasMap`...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]