maropu commented on a change in pull request #29585:
URL: https://github.com/apache/spark/pull/29585#discussion_r484201997
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -136,56 +137,59 @@ object Analyzer {
*/
def rewritePlan(plan: LogicalPlan, rewritePlanMap: Map[LogicalPlan,
LogicalPlan])
: (LogicalPlan, Seq[(Attribute, Attribute)]) = {
- if (plan.resolved) {
- val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
- val newChildren = plan.children.map { child =>
- // If not, we'd rewrite child plan recursively until we find the
- // conflict node or reach the leaf node.
- val (newChild, childAttrMapping) = rewritePlan(child, rewritePlanMap)
- attrMapping ++= childAttrMapping.filter { case (oldAttr, _) =>
- // `attrMapping` is not only used to replace the attributes of the
current `plan`,
- // but also to be propagated to the parent plans of the current
`plan`. Therefore,
- // the `oldAttr` must be part of either `plan.references` (so that
it can be used to
- // replace attributes of the current `plan`) or `plan.outputSet` (so
that it can be
- // used by those parent plans).
+ val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
+ val newChildren = plan.children.map { child =>
+ // If not, we'd rewrite child plan recursively until we find the
+ // conflict node or reach the leaf node.
+ val (newChild, childAttrMapping) = rewritePlan(child, rewritePlanMap)
+ attrMapping ++= childAttrMapping.filter { case (oldAttr, _) =>
+ // `attrMapping` is not only used to replace the attributes of the
current `plan`,
+ // but also to be propagated to the parent plans of the current
`plan`. Therefore,
+ // the `oldAttr` must be part of either `plan.references` (so that it
can be used to
+ // replace attributes of the current `plan`) or `plan.outputSet` (so
that it can be
+ // used by those parent plans).
+ if (plan.resolved) {
(plan.outputSet ++ plan.references).contains(oldAttr)
+ } else {
+ plan.references.filter(_.resolved).contains(oldAttr)
}
- newChild
}
+ newChild
+ }
- val newPlan = if (rewritePlanMap.contains(plan)) {
- rewritePlanMap(plan).withNewChildren(newChildren)
- } else {
- plan.withNewChildren(newChildren)
- }
+ val newPlan = if (rewritePlanMap.contains(plan)) {
+ rewritePlanMap(plan).withNewChildren(newChildren)
+ } else {
+ plan.withNewChildren(newChildren)
+ }
- assert(!attrMapping.groupBy(_._1.exprId)
- .exists(_._2.map(_._2.exprId).distinct.length > 1),
- "Found duplicate rewrite attributes")
+ assert(!attrMapping.groupBy(_._1.exprId)
+ .exists(_._2.map(_._2.exprId).distinct.length > 1),
+ "Found duplicate rewrite attributes")
- val attributeRewrites = AttributeMap(attrMapping)
- // Using attrMapping from the children plans to rewrite their parent
node.
- // Note that we shouldn't rewrite a node using attrMapping from its
sibling nodes.
- val p = newPlan.transformExpressions {
- case a: Attribute =>
- updateAttr(a, attributeRewrites)
- case s: SubqueryExpression =>
- s.withNewPlan(updateOuterReferencesInSubquery(s.plan,
attributeRewrites))
- }
+ val attributeRewrites = AttributeMap(attrMapping)
+ // Using attrMapping from the children plans to rewrite their parent node.
+ // Note that we shouldn't rewrite a node using attrMapping from its
sibling nodes.
+ val p = newPlan.transformExpressions {
+ case a: Attribute =>
+ updateAttr(a, attributeRewrites)
+ case s: SubqueryExpression =>
+ s.withNewPlan(updateOuterReferencesInSubquery(s.plan,
attributeRewrites))
+ }
+ if (plan.resolved) {
attrMapping ++= plan.output.zip(p.output)
.filter { case (a1, a2) => a1.exprId != a2.exprId }
- p -> attrMapping
- } else {
- // Just passes through unresolved nodes
- plan.mapChildren {
Review comment:
NOTE: I applied this band-aid-fix to solve the issue pointed out in
https://github.com/apache/spark/pull/29643#discussion_r483161865, but this
issue should be fixed in #29643. So, After it merged, I will remove this update.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]