Ngone51 commented on a change in pull request #32590:
URL: https://github.com/apache/spark/pull/32590#discussion_r635367649
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
##########
@@ -67,61 +81,78 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
* all relations of the new LogicalPlan )
*/
private def renewDuplicatedRelations(
- existingRelations: Seq[MultiInstanceRelation],
- plan: LogicalPlan): (LogicalPlan, Seq[MultiInstanceRelation]) = plan
match {
- case p: LogicalPlan if p.isStreaming => (plan, Nil)
+ existingRelations: mutable.HashSet[ReferenceEqualPlanWrapper],
+ plan: LogicalPlan)
+ : (LogicalPlan, mutable.HashSet[ReferenceEqualPlanWrapper], Boolean) =
plan match {
+ case p: LogicalPlan if p.isStreaming => (plan, mutable.HashSet.empty,
false)
case m: MultiInstanceRelation =>
- if (isDuplicated(existingRelations, m)) {
+ val planWrapper = ReferenceEqualPlanWrapper(m)
+ if (isDuplicated(existingRelations, planWrapper)) {
val newNode = m.newInstance()
newNode.copyTagsFrom(m)
- (newNode, Nil)
+ (newNode, mutable.HashSet.empty, true)
} else {
- (m, Seq(m))
+ val mWrapper = new mutable.HashSet[ReferenceEqualPlanWrapper]()
+ mWrapper.add(planWrapper)
+ (m, mWrapper, false)
}
case plan: LogicalPlan =>
- val relations = ArrayBuffer.empty[MultiInstanceRelation]
+ val relations = new mutable.HashSet[ReferenceEqualPlanWrapper]()
+ var planChanged = false
val newPlan = if (plan.children.nonEmpty) {
- val newChildren = ArrayBuffer.empty[LogicalPlan]
+ val newChildren = mutable.ArrayBuffer.empty[LogicalPlan]
for (c <- plan.children) {
- val (renewed, collected) =
renewDuplicatedRelations(existingRelations ++ relations, c)
+ val (renewed, collected, changed) =
+ renewDuplicatedRelations(existingRelations ++ relations, c)
newChildren += renewed
relations ++= collected
+ if (changed) {
+ planChanged = true
+ }
}
- if (plan.childrenResolved) {
- val attrMap = AttributeMap(
- plan
- .children
- .flatMap(_.output).zip(newChildren.flatMap(_.output))
- .filter { case (a1, a2) => a1.exprId != a2.exprId }
- )
- plan.withNewChildren(newChildren.toSeq).rewriteAttrs(attrMap)
+ if (planChanged) {
+ if (plan.childrenResolved) {
+ val planWithNewChildren = plan.withNewChildren(newChildren.toSeq)
+ val attrMap = AttributeMap(
+ plan
+ .children
+ .flatMap(_.output).zip(newChildren.flatMap(_.output))
+ .filter { case (a1, a2) => a1.exprId != a2.exprId }
+ )
+ if (attrMap.isEmpty) {
+ planWithNewChildren
+ } else {
+ planWithNewChildren.rewriteAttrs(attrMap)
+ }
+ } else {
+ plan.withNewChildren(newChildren.toSeq)
+ }
} else {
- plan.withNewChildren(newChildren.toSeq)
+ plan
}
} else {
plan
}
val planWithNewSubquery = newPlan.transformExpressions {
case subquery: SubqueryExpression =>
- val (renewed, collected) = renewDuplicatedRelations(
+ val (renewed, collected, changed) = renewDuplicatedRelations(
existingRelations ++ relations, subquery.plan)
relations ++= collected
+ if (changed) planChanged = true
subquery.withNewPlan(renewed)
}
- (planWithNewSubquery, relations.toSeq)
+ (planWithNewSubquery, relations, planChanged)
}
+ @inline
private def isDuplicated(
- existingRelations: Seq[MultiInstanceRelation],
- relation: MultiInstanceRelation): Boolean = {
- existingRelations.exists { er =>
- er.asInstanceOf[LogicalPlan].outputSet
- .intersect(relation.asInstanceOf[LogicalPlan].outputSet).nonEmpty
- }
+ existingRelations: mutable.HashSet[ReferenceEqualPlanWrapper],
+ planWrapper: ReferenceEqualPlanWrapper): Boolean = {
+ existingRelations.contains(planWrapper)
Review comment:
It's safe. For the `MultiInstanceRelation`, it's impossible to have
partial duplicate attributes with others as its attributes are always generated
by itself, which results in the monotonically increasing exprid.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]