cloud-fan commented on a change in pull request #32590:
URL: https://github.com/apache/spark/pull/32590#discussion_r635759716
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
##########
@@ -67,61 +81,78 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
* all relations of the new LogicalPlan )
*/
private def renewDuplicatedRelations(
- existingRelations: Seq[MultiInstanceRelation],
- plan: LogicalPlan): (LogicalPlan, Seq[MultiInstanceRelation]) = plan
match {
- case p: LogicalPlan if p.isStreaming => (plan, Nil)
+ existingRelations: mutable.HashSet[ReferenceEqualPlanWrapper],
+ plan: LogicalPlan)
+ : (LogicalPlan, mutable.HashSet[ReferenceEqualPlanWrapper], Boolean) =
plan match {
+ case p: LogicalPlan if p.isStreaming => (plan, mutable.HashSet.empty,
false)
case m: MultiInstanceRelation =>
- if (isDuplicated(existingRelations, m)) {
+ val planWrapper = ReferenceEqualPlanWrapper(m)
+ if (isDuplicated(existingRelations, planWrapper)) {
val newNode = m.newInstance()
newNode.copyTagsFrom(m)
- (newNode, Nil)
+ (newNode, mutable.HashSet.empty, true)
} else {
- (m, Seq(m))
+ val mWrapper = new mutable.HashSet[ReferenceEqualPlanWrapper]()
+ mWrapper.add(planWrapper)
+ (m, mWrapper, false)
}
case plan: LogicalPlan =>
- val relations = ArrayBuffer.empty[MultiInstanceRelation]
+ val relations = new mutable.HashSet[ReferenceEqualPlanWrapper]()
+ var planChanged = false
val newPlan = if (plan.children.nonEmpty) {
- val newChildren = ArrayBuffer.empty[LogicalPlan]
+ val newChildren = mutable.ArrayBuffer.empty[LogicalPlan]
for (c <- plan.children) {
- val (renewed, collected) =
renewDuplicatedRelations(existingRelations ++ relations, c)
+ val (renewed, collected, changed) =
+ renewDuplicatedRelations(existingRelations ++ relations, c)
newChildren += renewed
relations ++= collected
+ if (changed) {
+ planChanged = true
+ }
}
- if (plan.childrenResolved) {
- val attrMap = AttributeMap(
- plan
- .children
- .flatMap(_.output).zip(newChildren.flatMap(_.output))
- .filter { case (a1, a2) => a1.exprId != a2.exprId }
- )
- plan.withNewChildren(newChildren.toSeq).rewriteAttrs(attrMap)
+ if (planChanged) {
+ if (plan.childrenResolved) {
+ val planWithNewChildren = plan.withNewChildren(newChildren.toSeq)
+ val attrMap = AttributeMap(
+ plan
+ .children
+ .flatMap(_.output).zip(newChildren.flatMap(_.output))
+ .filter { case (a1, a2) => a1.exprId != a2.exprId }
+ )
+ if (attrMap.isEmpty) {
+ planWithNewChildren
+ } else {
+ planWithNewChildren.rewriteAttrs(attrMap)
+ }
+ } else {
+ plan.withNewChildren(newChildren.toSeq)
+ }
} else {
- plan.withNewChildren(newChildren.toSeq)
+ plan
}
} else {
plan
}
val planWithNewSubquery = newPlan.transformExpressions {
case subquery: SubqueryExpression =>
- val (renewed, collected) = renewDuplicatedRelations(
+ val (renewed, collected, changed) = renewDuplicatedRelations(
existingRelations ++ relations, subquery.plan)
relations ++= collected
+ if (changed) planChanged = true
subquery.withNewPlan(renewed)
}
- (planWithNewSubquery, relations.toSeq)
+ (planWithNewSubquery, relations, planChanged)
}
+ @inline
Review comment:
Instead of calling this function, can we manually inline it and write
`existingRelations.contains(planWrapper)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]