[GitHub] [spark] cloud-fan commented on a change in pull request #29166: [SPARK-32372][SQL] ResolveReferences.dedupRight should only rewrite attributes for ancestor nodes of the conflict plan

2020-07-22 Thread GitBox


cloud-fan commented on a change in pull request #29166:
URL: https://github.com/apache/spark/pull/29166#discussion_r458959593



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -1237,20 +1250,79 @@ class Analyzer(
   if (conflictPlans.isEmpty) {
 right
   } else {
-val attributeRewrites = AttributeMap(conflictPlans.flatMap {
-  case (oldRelation, newRelation) => 
oldRelation.output.zip(newRelation.output)})
-val conflictPlanMap = conflictPlans.toMap
-// transformDown so that we can replace all the old Relations in one 
turn due to
-// the reason that `conflictPlans` are also collected in pre-order.
-right transformDown {
-  case r => conflictPlanMap.getOrElse(r, r)
-} transformUp {
-  case other => other transformExpressions {
+rewritePlan(right, conflictPlans.toMap)._1
+  }
+}
+
+private def rewritePlan(plan: LogicalPlan, conflictPlanMap: 
Map[LogicalPlan, LogicalPlan])
+  : (LogicalPlan, Seq[(Attribute, Attribute)]) = {
+  if (conflictPlanMap.contains(plan)) {
+// If the plan is the one that conflict the with left one, we'd
+// just replace it with the new plan and collect the rewrite
+// attributes for the parent node.
+val newRelation = conflictPlanMap(plan)
+newRelation -> plan.output.zip(newRelation.output)
+  } else {
+val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
+val newPlan = plan.mapChildren { child =>
+  // If not, we'd rewrite child plan recursively until we find the
+  // conflict node or reach the leaf node.
+  val (newChild, childAttrMapping) = rewritePlan(child, 
conflictPlanMap)
+  // Only return rewrite attributes which could be used by the parent 
node.
+  // Otherwise, it could introduce duplicate rewrite attributes. For 
example,
+  // for the following plan, if we don't do filter for the 
`childAttrMapping`,
+  // the node `SubqueryAlias b` will return rewrite attribute of 
[kind#220 -> kind#228]
+  // (which is from the conflict plan `Project [id#218, foo AS 
kind#228]`), and the node
+  // `SubqueryAlias c` will return rewrite attribute of [kind#220 -> 
kind#229] (which
+  // is from the conflict plan `Project [id#227, foo AS kind#229]`). 
As a result, the top
+  // Join will have duplicated rewrite attribute.
+  //
+  // The problem is, the plan `Join Inner, (kind#229 = kind#223)` 
shouldn't keep returning
+  // rewrite attribute of [kind#220 -> kind#229] to its parent node 
`Project [id#227]` as
+  // it doesn't really need it.
+  //
+  // Join Inner, (id#218 = id#227)
+  // :- SubqueryAlias b
+  // :  +- Project [id#218, foo AS kind#228]
+  // : +- SubqueryAlias a
+  // :+- Project [1 AS id#218]
+  // :   +- OneRowRelation
+  // +- SubqueryAlias c
+  //+- Project [id#227]
+  //   +- Join Inner, (kind#229 = kind#223)
+  //  :- SubqueryAlias l
+  //  :  +- SubqueryAlias b
+  //  : +- Project [id#227, foo AS kind#229]
+  //  :+- SubqueryAlias a
+  //  :   +- Project [1 AS id#227]
+  //  :  +- OneRowRelation
+  //  +- SubqueryAlias r
+  // +- SubqueryAlias b
+  //+- Project [id#224, foo AS kind#223]
+  //   +- SubqueryAlias a
+  //  +- Project [1 AS id#224]
+  // +- OneRowRelation
+  attrMapping ++= childAttrMapping.filter { case (oldAttr, _) =>
+(plan.references ++ plan.outputSet ++ 
plan.producedAttributes).contains(oldAttr)

Review comment:
   and seems we only need `plan.outputSet`: if an attribute is not output 
by the current plan, it's useless to propagate this attribute to the parent 
nodes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #29166: [SPARK-32372][SQL] ResolveReferences.dedupRight should only rewrite attributes for ancestor nodes of the conflict plan

2020-07-22 Thread GitBox


cloud-fan commented on a change in pull request #29166:
URL: https://github.com/apache/spark/pull/29166#discussion_r458960395



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -1237,20 +1250,79 @@ class Analyzer(
   if (conflictPlans.isEmpty) {
 right
   } else {
-val attributeRewrites = AttributeMap(conflictPlans.flatMap {
-  case (oldRelation, newRelation) => 
oldRelation.output.zip(newRelation.output)})
-val conflictPlanMap = conflictPlans.toMap
-// transformDown so that we can replace all the old Relations in one 
turn due to
-// the reason that `conflictPlans` are also collected in pre-order.
-right transformDown {
-  case r => conflictPlanMap.getOrElse(r, r)
-} transformUp {
-  case other => other transformExpressions {
+rewritePlan(right, conflictPlans.toMap)._1
+  }
+}
+
+private def rewritePlan(plan: LogicalPlan, conflictPlanMap: 
Map[LogicalPlan, LogicalPlan])
+  : (LogicalPlan, Seq[(Attribute, Attribute)]) = {
+  if (conflictPlanMap.contains(plan)) {
+// If the plan is the one that conflict the with left one, we'd
+// just replace it with the new plan and collect the rewrite
+// attributes for the parent node.
+val newRelation = conflictPlanMap(plan)
+newRelation -> plan.output.zip(newRelation.output)
+  } else {
+val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
+val newPlan = plan.mapChildren { child =>
+  // If not, we'd rewrite child plan recursively until we find the
+  // conflict node or reach the leaf node.
+  val (newChild, childAttrMapping) = rewritePlan(child, 
conflictPlanMap)
+  // Only return rewrite attributes which could be used by the parent 
node.
+  // Otherwise, it could introduce duplicate rewrite attributes. For 
example,
+  // for the following plan, if we don't do filter for the 
`childAttrMapping`,
+  // the node `SubqueryAlias b` will return rewrite attribute of 
[kind#220 -> kind#228]
+  // (which is from the conflict plan `Project [id#218, foo AS 
kind#228]`), and the node
+  // `SubqueryAlias c` will return rewrite attribute of [kind#220 -> 
kind#229] (which
+  // is from the conflict plan `Project [id#227, foo AS kind#229]`). 
As a result, the top
+  // Join will have duplicated rewrite attribute.
+  //
+  // The problem is, the plan `Join Inner, (kind#229 = kind#223)` 
shouldn't keep returning
+  // rewrite attribute of [kind#220 -> kind#229] to its parent node 
`Project [id#227]` as
+  // it doesn't really need it.
+  //
+  // Join Inner, (id#218 = id#227)
+  // :- SubqueryAlias b
+  // :  +- Project [id#218, foo AS kind#228]
+  // : +- SubqueryAlias a
+  // :+- Project [1 AS id#218]
+  // :   +- OneRowRelation
+  // +- SubqueryAlias c
+  //+- Project [id#227]
+  //   +- Join Inner, (kind#229 = kind#223)
+  //  :- SubqueryAlias l
+  //  :  +- SubqueryAlias b
+  //  : +- Project [id#227, foo AS kind#229]
+  //  :+- SubqueryAlias a
+  //  :   +- Project [1 AS id#227]
+  //  :  +- OneRowRelation
+  //  +- SubqueryAlias r
+  // +- SubqueryAlias b
+  //+- Project [id#224, foo AS kind#223]
+  //   +- SubqueryAlias a
+  //  +- Project [1 AS id#224]
+  // +- OneRowRelation
+  attrMapping ++= childAttrMapping.filter { case (oldAttr, _) =>
+(plan.references ++ plan.outputSet ++ 
plan.producedAttributes).contains(oldAttr)

Review comment:
   The above is probably good enough as the comment here, instead of 
introducing a very complicated example.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #29166: [SPARK-32372][SQL] ResolveReferences.dedupRight should only rewrite attributes for ancestor nodes of the conflict plan

2020-07-22 Thread GitBox


cloud-fan commented on a change in pull request #29166:
URL: https://github.com/apache/spark/pull/29166#discussion_r458960395



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -1237,20 +1250,79 @@ class Analyzer(
   if (conflictPlans.isEmpty) {
 right
   } else {
-val attributeRewrites = AttributeMap(conflictPlans.flatMap {
-  case (oldRelation, newRelation) => 
oldRelation.output.zip(newRelation.output)})
-val conflictPlanMap = conflictPlans.toMap
-// transformDown so that we can replace all the old Relations in one 
turn due to
-// the reason that `conflictPlans` are also collected in pre-order.
-right transformDown {
-  case r => conflictPlanMap.getOrElse(r, r)
-} transformUp {
-  case other => other transformExpressions {
+rewritePlan(right, conflictPlans.toMap)._1
+  }
+}
+
+private def rewritePlan(plan: LogicalPlan, conflictPlanMap: 
Map[LogicalPlan, LogicalPlan])
+  : (LogicalPlan, Seq[(Attribute, Attribute)]) = {
+  if (conflictPlanMap.contains(plan)) {
+// If the plan is the one that conflict the with left one, we'd
+// just replace it with the new plan and collect the rewrite
+// attributes for the parent node.
+val newRelation = conflictPlanMap(plan)
+newRelation -> plan.output.zip(newRelation.output)
+  } else {
+val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
+val newPlan = plan.mapChildren { child =>
+  // If not, we'd rewrite child plan recursively until we find the
+  // conflict node or reach the leaf node.
+  val (newChild, childAttrMapping) = rewritePlan(child, 
conflictPlanMap)
+  // Only return rewrite attributes which could be used by the parent 
node.
+  // Otherwise, it could introduce duplicate rewrite attributes. For 
example,
+  // for the following plan, if we don't do filter for the 
`childAttrMapping`,
+  // the node `SubqueryAlias b` will return rewrite attribute of 
[kind#220 -> kind#228]
+  // (which is from the conflict plan `Project [id#218, foo AS 
kind#228]`), and the node
+  // `SubqueryAlias c` will return rewrite attribute of [kind#220 -> 
kind#229] (which
+  // is from the conflict plan `Project [id#227, foo AS kind#229]`). 
As a result, the top
+  // Join will have duplicated rewrite attribute.
+  //
+  // The problem is, the plan `Join Inner, (kind#229 = kind#223)` 
shouldn't keep returning
+  // rewrite attribute of [kind#220 -> kind#229] to its parent node 
`Project [id#227]` as
+  // it doesn't really need it.
+  //
+  // Join Inner, (id#218 = id#227)
+  // :- SubqueryAlias b
+  // :  +- Project [id#218, foo AS kind#228]
+  // : +- SubqueryAlias a
+  // :+- Project [1 AS id#218]
+  // :   +- OneRowRelation
+  // +- SubqueryAlias c
+  //+- Project [id#227]
+  //   +- Join Inner, (kind#229 = kind#223)
+  //  :- SubqueryAlias l
+  //  :  +- SubqueryAlias b
+  //  : +- Project [id#227, foo AS kind#229]
+  //  :+- SubqueryAlias a
+  //  :   +- Project [1 AS id#227]
+  //  :  +- OneRowRelation
+  //  +- SubqueryAlias r
+  // +- SubqueryAlias b
+  //+- Project [id#224, foo AS kind#223]
+  //   +- SubqueryAlias a
+  //  +- Project [1 AS id#224]
+  // +- OneRowRelation
+  attrMapping ++= childAttrMapping.filter { case (oldAttr, _) =>
+(plan.references ++ plan.outputSet ++ 
plan.producedAttributes).contains(oldAttr)

Review comment:
   The above is probably good enough as the comment here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #29166: [SPARK-32372][SQL] ResolveReferences.dedupRight should only rewrite attributes for ancestor nodes of the conflict plan

2020-07-22 Thread GitBox


cloud-fan commented on a change in pull request #29166:
URL: https://github.com/apache/spark/pull/29166#discussion_r458959593



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -1237,20 +1250,79 @@ class Analyzer(
   if (conflictPlans.isEmpty) {
 right
   } else {
-val attributeRewrites = AttributeMap(conflictPlans.flatMap {
-  case (oldRelation, newRelation) => 
oldRelation.output.zip(newRelation.output)})
-val conflictPlanMap = conflictPlans.toMap
-// transformDown so that we can replace all the old Relations in one 
turn due to
-// the reason that `conflictPlans` are also collected in pre-order.
-right transformDown {
-  case r => conflictPlanMap.getOrElse(r, r)
-} transformUp {
-  case other => other transformExpressions {
+rewritePlan(right, conflictPlans.toMap)._1
+  }
+}
+
+private def rewritePlan(plan: LogicalPlan, conflictPlanMap: 
Map[LogicalPlan, LogicalPlan])
+  : (LogicalPlan, Seq[(Attribute, Attribute)]) = {
+  if (conflictPlanMap.contains(plan)) {
+// If the plan is the one that conflict the with left one, we'd
+// just replace it with the new plan and collect the rewrite
+// attributes for the parent node.
+val newRelation = conflictPlanMap(plan)
+newRelation -> plan.output.zip(newRelation.output)
+  } else {
+val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
+val newPlan = plan.mapChildren { child =>
+  // If not, we'd rewrite child plan recursively until we find the
+  // conflict node or reach the leaf node.
+  val (newChild, childAttrMapping) = rewritePlan(child, 
conflictPlanMap)
+  // Only return rewrite attributes which could be used by the parent 
node.
+  // Otherwise, it could introduce duplicate rewrite attributes. For 
example,
+  // for the following plan, if we don't do filter for the 
`childAttrMapping`,
+  // the node `SubqueryAlias b` will return rewrite attribute of 
[kind#220 -> kind#228]
+  // (which is from the conflict plan `Project [id#218, foo AS 
kind#228]`), and the node
+  // `SubqueryAlias c` will return rewrite attribute of [kind#220 -> 
kind#229] (which
+  // is from the conflict plan `Project [id#227, foo AS kind#229]`). 
As a result, the top
+  // Join will have duplicated rewrite attribute.
+  //
+  // The problem is, the plan `Join Inner, (kind#229 = kind#223)` 
shouldn't keep returning
+  // rewrite attribute of [kind#220 -> kind#229] to its parent node 
`Project [id#227]` as
+  // it doesn't really need it.
+  //
+  // Join Inner, (id#218 = id#227)
+  // :- SubqueryAlias b
+  // :  +- Project [id#218, foo AS kind#228]
+  // : +- SubqueryAlias a
+  // :+- Project [1 AS id#218]
+  // :   +- OneRowRelation
+  // +- SubqueryAlias c
+  //+- Project [id#227]
+  //   +- Join Inner, (kind#229 = kind#223)
+  //  :- SubqueryAlias l
+  //  :  +- SubqueryAlias b
+  //  : +- Project [id#227, foo AS kind#229]
+  //  :+- SubqueryAlias a
+  //  :   +- Project [1 AS id#227]
+  //  :  +- OneRowRelation
+  //  +- SubqueryAlias r
+  // +- SubqueryAlias b
+  //+- Project [id#224, foo AS kind#223]
+  //   +- SubqueryAlias a
+  //  +- Project [1 AS id#224]
+  // +- OneRowRelation
+  attrMapping ++= childAttrMapping.filter { case (oldAttr, _) =>
+(plan.references ++ plan.outputSet ++ 
plan.producedAttributes).contains(oldAttr)

Review comment:
   and seems we only need `plan.outputSet`: if an attribute is not output 
by the current plan, it's useless to propagate this attribute to the parent 
nodes, as they are not going to have this attribute.

##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -1237,20 +1250,79 @@ class Analyzer(
   if (conflictPlans.isEmpty) {
 right
   } else {
-val attributeRewrites = AttributeMap(conflictPlans.flatMap {
-  case (oldRelation, newRelation) => 
oldRelation.output.zip(newRelation.output)})
-val conflictPlanMap = conflictPlans.toMap
-// transformDown so that we can replace all the old Relations in one 
turn due to
-// the reason that `conflictPlans` are also collected in pre-order.
-right transformDown {
-  case r => conflictPlanMap.getOrElse(r, r)
-} transformUp {
-  case other => other transformExpressions {
+

[GitHub] [spark] cloud-fan commented on a change in pull request #29166: [SPARK-32372][SQL] ResolveReferences.dedupRight should only rewrite attributes for ancestor nodes of the conflict plan

2020-07-22 Thread GitBox


cloud-fan commented on a change in pull request #29166:
URL: https://github.com/apache/spark/pull/29166#discussion_r458958886



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -1237,20 +1250,79 @@ class Analyzer(
   if (conflictPlans.isEmpty) {
 right
   } else {
-val attributeRewrites = AttributeMap(conflictPlans.flatMap {
-  case (oldRelation, newRelation) => 
oldRelation.output.zip(newRelation.output)})
-val conflictPlanMap = conflictPlans.toMap
-// transformDown so that we can replace all the old Relations in one 
turn due to
-// the reason that `conflictPlans` are also collected in pre-order.
-right transformDown {
-  case r => conflictPlanMap.getOrElse(r, r)
-} transformUp {
-  case other => other transformExpressions {
+rewritePlan(right, conflictPlans.toMap)._1
+  }
+}
+
+private def rewritePlan(plan: LogicalPlan, conflictPlanMap: 
Map[LogicalPlan, LogicalPlan])
+  : (LogicalPlan, Seq[(Attribute, Attribute)]) = {
+  if (conflictPlanMap.contains(plan)) {
+// If the plan is the one that conflict the with left one, we'd
+// just replace it with the new plan and collect the rewrite
+// attributes for the parent node.
+val newRelation = conflictPlanMap(plan)
+newRelation -> plan.output.zip(newRelation.output)
+  } else {
+val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
+val newPlan = plan.mapChildren { child =>
+  // If not, we'd rewrite child plan recursively until we find the
+  // conflict node or reach the leaf node.
+  val (newChild, childAttrMapping) = rewritePlan(child, 
conflictPlanMap)
+  // Only return rewrite attributes which could be used by the parent 
node.
+  // Otherwise, it could introduce duplicate rewrite attributes. For 
example,
+  // for the following plan, if we don't do filter for the 
`childAttrMapping`,
+  // the node `SubqueryAlias b` will return rewrite attribute of 
[kind#220 -> kind#228]
+  // (which is from the conflict plan `Project [id#218, foo AS 
kind#228]`), and the node
+  // `SubqueryAlias c` will return rewrite attribute of [kind#220 -> 
kind#229] (which
+  // is from the conflict plan `Project [id#227, foo AS kind#229]`). 
As a result, the top
+  // Join will have duplicated rewrite attribute.
+  //
+  // The problem is, the plan `Join Inner, (kind#229 = kind#223)` 
shouldn't keep returning
+  // rewrite attribute of [kind#220 -> kind#229] to its parent node 
`Project [id#227]` as
+  // it doesn't really need it.
+  //
+  // Join Inner, (id#218 = id#227)
+  // :- SubqueryAlias b
+  // :  +- Project [id#218, foo AS kind#228]
+  // : +- SubqueryAlias a
+  // :+- Project [1 AS id#218]
+  // :   +- OneRowRelation
+  // +- SubqueryAlias c
+  //+- Project [id#227]
+  //   +- Join Inner, (kind#229 = kind#223)
+  //  :- SubqueryAlias l
+  //  :  +- SubqueryAlias b
+  //  : +- Project [id#227, foo AS kind#229]
+  //  :+- SubqueryAlias a
+  //  :   +- Project [1 AS id#227]
+  //  :  +- OneRowRelation
+  //  +- SubqueryAlias r
+  // +- SubqueryAlias b
+  //+- Project [id#224, foo AS kind#223]
+  //   +- SubqueryAlias a
+  //  +- Project [1 AS id#224]
+  // +- OneRowRelation
+  attrMapping ++= childAttrMapping.filter { case (oldAttr, _) =>
+(plan.references ++ plan.outputSet ++ 
plan.producedAttributes).contains(oldAttr)

Review comment:
   why do we need `plan.producedAttributes`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #29166: [SPARK-32372][SQL] ResolveReferences.dedupRight should only rewrite attributes for ancestor nodes of the conflict plan

2020-07-22 Thread GitBox


cloud-fan commented on a change in pull request #29166:
URL: https://github.com/apache/spark/pull/29166#discussion_r458958501



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -1237,20 +1250,79 @@ class Analyzer(
   if (conflictPlans.isEmpty) {
 right
   } else {
-val attributeRewrites = AttributeMap(conflictPlans.flatMap {
-  case (oldRelation, newRelation) => 
oldRelation.output.zip(newRelation.output)})
-val conflictPlanMap = conflictPlans.toMap
-// transformDown so that we can replace all the old Relations in one 
turn due to
-// the reason that `conflictPlans` are also collected in pre-order.
-right transformDown {
-  case r => conflictPlanMap.getOrElse(r, r)
-} transformUp {
-  case other => other transformExpressions {
+rewritePlan(right, conflictPlans.toMap)._1
+  }
+}
+
+private def rewritePlan(plan: LogicalPlan, conflictPlanMap: 
Map[LogicalPlan, LogicalPlan])
+  : (LogicalPlan, Seq[(Attribute, Attribute)]) = {
+  if (conflictPlanMap.contains(plan)) {
+// If the plan is the one that conflict the with left one, we'd
+// just replace it with the new plan and collect the rewrite
+// attributes for the parent node.
+val newRelation = conflictPlanMap(plan)
+newRelation -> plan.output.zip(newRelation.output)
+  } else {
+val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
+val newPlan = plan.mapChildren { child =>
+  // If not, we'd rewrite child plan recursively until we find the
+  // conflict node or reach the leaf node.
+  val (newChild, childAttrMapping) = rewritePlan(child, 
conflictPlanMap)
+  // Only return rewrite attributes which could be used by the parent 
node.
+  // Otherwise, it could introduce duplicate rewrite attributes. For 
example,
+  // for the following plan, if we don't do filter for the 
`childAttrMapping`,
+  // the node `SubqueryAlias b` will return rewrite attribute of 
[kind#220 -> kind#228]

Review comment:
   And can we use attr id `1, 2, 3, 4, ...` instead?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #29166: [SPARK-32372][SQL] ResolveReferences.dedupRight should only rewrite attributes for ancestor nodes of the conflict plan

2020-07-22 Thread GitBox


cloud-fan commented on a change in pull request #29166:
URL: https://github.com/apache/spark/pull/29166#discussion_r458958229



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -1237,20 +1250,79 @@ class Analyzer(
   if (conflictPlans.isEmpty) {
 right
   } else {
-val attributeRewrites = AttributeMap(conflictPlans.flatMap {
-  case (oldRelation, newRelation) => 
oldRelation.output.zip(newRelation.output)})
-val conflictPlanMap = conflictPlans.toMap
-// transformDown so that we can replace all the old Relations in one 
turn due to
-// the reason that `conflictPlans` are also collected in pre-order.
-right transformDown {
-  case r => conflictPlanMap.getOrElse(r, r)
-} transformUp {
-  case other => other transformExpressions {
+rewritePlan(right, conflictPlans.toMap)._1
+  }
+}
+
+private def rewritePlan(plan: LogicalPlan, conflictPlanMap: 
Map[LogicalPlan, LogicalPlan])
+  : (LogicalPlan, Seq[(Attribute, Attribute)]) = {
+  if (conflictPlanMap.contains(plan)) {
+// If the plan is the one that conflict the with left one, we'd
+// just replace it with the new plan and collect the rewrite
+// attributes for the parent node.
+val newRelation = conflictPlanMap(plan)
+newRelation -> plan.output.zip(newRelation.output)
+  } else {
+val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
+val newPlan = plan.mapChildren { child =>
+  // If not, we'd rewrite child plan recursively until we find the
+  // conflict node or reach the leaf node.
+  val (newChild, childAttrMapping) = rewritePlan(child, 
conflictPlanMap)
+  // Only return rewrite attributes which could be used by the parent 
node.
+  // Otherwise, it could introduce duplicate rewrite attributes. For 
example,
+  // for the following plan, if we don't do filter for the 
`childAttrMapping`,
+  // the node `SubqueryAlias b` will return rewrite attribute of 
[kind#220 -> kind#228]

Review comment:
   You didn't explain `kind#20` ahead. We should probably explain what's in 
the `conflictPlanMap` at the beginning of the comment

##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -1237,20 +1250,79 @@ class Analyzer(
   if (conflictPlans.isEmpty) {
 right
   } else {
-val attributeRewrites = AttributeMap(conflictPlans.flatMap {
-  case (oldRelation, newRelation) => 
oldRelation.output.zip(newRelation.output)})
-val conflictPlanMap = conflictPlans.toMap
-// transformDown so that we can replace all the old Relations in one 
turn due to
-// the reason that `conflictPlans` are also collected in pre-order.
-right transformDown {
-  case r => conflictPlanMap.getOrElse(r, r)
-} transformUp {
-  case other => other transformExpressions {
+rewritePlan(right, conflictPlans.toMap)._1
+  }
+}
+
+private def rewritePlan(plan: LogicalPlan, conflictPlanMap: 
Map[LogicalPlan, LogicalPlan])
+  : (LogicalPlan, Seq[(Attribute, Attribute)]) = {
+  if (conflictPlanMap.contains(plan)) {
+// If the plan is the one that conflict the with left one, we'd
+// just replace it with the new plan and collect the rewrite
+// attributes for the parent node.
+val newRelation = conflictPlanMap(plan)
+newRelation -> plan.output.zip(newRelation.output)
+  } else {
+val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
+val newPlan = plan.mapChildren { child =>
+  // If not, we'd rewrite child plan recursively until we find the
+  // conflict node or reach the leaf node.
+  val (newChild, childAttrMapping) = rewritePlan(child, 
conflictPlanMap)
+  // Only return rewrite attributes which could be used by the parent 
node.
+  // Otherwise, it could introduce duplicate rewrite attributes. For 
example,
+  // for the following plan, if we don't do filter for the 
`childAttrMapping`,
+  // the node `SubqueryAlias b` will return rewrite attribute of 
[kind#220 -> kind#228]

Review comment:
   You didn't explain `kind#200` ahead. We should probably explain what's 
in the `conflictPlanMap` at the beginning of the comment





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For 

[GitHub] [spark] cloud-fan commented on a change in pull request #29166: [SPARK-32372][SQL] ResolveReferences.dedupRight should only rewrite attributes for ancestor nodes of the conflict plan

2020-07-22 Thread GitBox


cloud-fan commented on a change in pull request #29166:
URL: https://github.com/apache/spark/pull/29166#discussion_r458625525



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -1237,20 +1250,42 @@ class Analyzer(
   if (conflictPlans.isEmpty) {
 right
   } else {
-val attributeRewrites = AttributeMap(conflictPlans.flatMap {
-  case (oldRelation, newRelation) => 
oldRelation.output.zip(newRelation.output)})
-val conflictPlanMap = conflictPlans.toMap
-// transformDown so that we can replace all the old Relations in one 
turn due to
-// the reason that `conflictPlans` are also collected in pre-order.
-right transformDown {
-  case r => conflictPlanMap.getOrElse(r, r)
-} transformUp {
-  case other => other transformExpressions {
+rewritePlan(right, conflictPlans.toMap)._1
+  }
+}
+
+private def rewritePlan(plan: LogicalPlan, conflictPlanMap: 
Map[LogicalPlan, LogicalPlan])
+  : (LogicalPlan, Seq[(Attribute, Attribute)]) = {
+  if (conflictPlanMap.contains(plan)) {
+// If the plan is the one that conflict the with left one, we'd
+// just replace it with the new plan and collect the rewrite
+// attributes for the parent node.
+val newRelation = conflictPlanMap(plan)
+newRelation -> plan.output.zip(newRelation.output)

Review comment:
   ah i see, nvm.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #29166: [SPARK-32372][SQL] ResolveReferences.dedupRight should only rewrite attributes for ancestor nodes of the conflict plan

2020-07-21 Thread GitBox


cloud-fan commented on a change in pull request #29166:
URL: https://github.com/apache/spark/pull/29166#discussion_r458004948



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -1237,20 +1250,42 @@ class Analyzer(
   if (conflictPlans.isEmpty) {
 right
   } else {
-val attributeRewrites = AttributeMap(conflictPlans.flatMap {
-  case (oldRelation, newRelation) => 
oldRelation.output.zip(newRelation.output)})
-val conflictPlanMap = conflictPlans.toMap
-// transformDown so that we can replace all the old Relations in one 
turn due to
-// the reason that `conflictPlans` are also collected in pre-order.
-right transformDown {
-  case r => conflictPlanMap.getOrElse(r, r)
-} transformUp {
-  case other => other transformExpressions {
+rewritePlan(right, conflictPlans.toMap)._1
+  }
+}
+
+private def rewritePlan(plan: LogicalPlan, conflictPlanMap: 
Map[LogicalPlan, LogicalPlan])
+  : (LogicalPlan, Seq[(Attribute, Attribute)]) = {
+  if (conflictPlanMap.contains(plan)) {
+// If the plan is the one that conflict the with left one, we'd
+// just replace it with the new plan and collect the rewrite
+// attributes for the parent node.
+val newRelation = conflictPlanMap(plan)
+newRelation -> plan.output.zip(newRelation.output)

Review comment:
   nit: `conflictPlanMap(plan) -> plan.output.zip(newRelation.output)`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #29166: [SPARK-32372][SQL] ResolveReferences.dedupRight should only rewrite attributes for ancestor nodes of the conflict plan

2020-07-20 Thread GitBox


cloud-fan commented on a change in pull request #29166:
URL: https://github.com/apache/spark/pull/29166#discussion_r457836362



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -1237,20 +1249,44 @@ class Analyzer(
   if (conflictPlans.isEmpty) {
 right
   } else {
-val attributeRewrites = AttributeMap(conflictPlans.flatMap {
-  case (oldRelation, newRelation) => 
oldRelation.output.zip(newRelation.output)})
-val conflictPlanMap = conflictPlans.toMap
-// transformDown so that we can replace all the old Relations in one 
turn due to
-// the reason that `conflictPlans` are also collected in pre-order.
-right transformDown {
-  case r => conflictPlanMap.getOrElse(r, r)
-} transformUp {
-  case other => other transformExpressions {
+rewritePlan(right, conflictPlans.toMap)._1
+  }
+}
+
+private def rewritePlan(plan: LogicalPlan, conflictPlanMap: 
Map[LogicalPlan, LogicalPlan])

Review comment:
   This rewriting is much more surgical than before, +1





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org