maropu commented on a change in pull request #29485:
URL: https://github.com/apache/spark/pull/29485#discussion_r475554377



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
##########
@@ -328,27 +328,46 @@ object TypeCoercion {
    */
   object WidenSetOperationTypes extends Rule[LogicalPlan] {
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
-      case s @ Except(left, right, isAll) if s.childrenResolved &&
-        left.output.length == right.output.length && !s.resolved =>
-        val newChildren: Seq[LogicalPlan] = 
buildNewChildrenWithWiderTypes(left :: right :: Nil)
-        assert(newChildren.length == 2)
-        Except(newChildren.head, newChildren.last, isAll)
-
-      case s @ Intersect(left, right, isAll) if s.childrenResolved &&
-        left.output.length == right.output.length && !s.resolved =>
-        val newChildren: Seq[LogicalPlan] = 
buildNewChildrenWithWiderTypes(left :: right :: Nil)
-        assert(newChildren.length == 2)
-        Intersect(newChildren.head, newChildren.last, isAll)
-
-      case s: Union if s.childrenResolved && !s.byName &&
+    def apply(plan: LogicalPlan): LogicalPlan = {
+      val exprIdMapArray = mutable.ArrayBuffer[(ExprId, Attribute)]()
+      val newPlan = plan resolveOperatorsUp {
+        case s @ Except(left, right, isAll) if s.childrenResolved &&
+          left.output.length == right.output.length && !s.resolved =>
+          val (newChildren, newExprIds) = buildNewChildrenWithWiderTypes(left 
:: right :: Nil)
+          exprIdMapArray ++= newExprIds
+          assert(newChildren.length == 2)
+          Except(newChildren.head, newChildren.last, isAll)
+
+        case s @ Intersect(left, right, isAll) if s.childrenResolved &&
+          left.output.length == right.output.length && !s.resolved =>
+          val (newChildren, newExprIds) = buildNewChildrenWithWiderTypes(left 
:: right :: Nil)
+          exprIdMapArray ++= newExprIds
+          assert(newChildren.length == 2)
+          Intersect(newChildren.head, newChildren.last, isAll)
+
+        case s: Union if s.childrenResolved && !s.byName &&
           s.children.forall(_.output.length == s.children.head.output.length) 
&& !s.resolved =>
-        val newChildren: Seq[LogicalPlan] = 
buildNewChildrenWithWiderTypes(s.children)
-        s.copy(children = newChildren)
+          val (newChildren, newExprIds) = 
buildNewChildrenWithWiderTypes(s.children)
+          exprIdMapArray ++= newExprIds
+          s.copy(children = newChildren)
+      }
+
+      // Re-maps existing references to the new ones (exprId and dataType)
+      // for aliases added when widening columns' data types.

Review comment:
       On second thought, we still need to update parent nodes even if we 
re-alias it. For example, in the example of the PR description;
   ```
   !Project [v#1]  <------ this project already has `AttributeReference(v, 
decimal(10, 0))#1`, so
                           we need to update the data type, too
   +- SubqueryAlias t
      +- Union
         :- Project [cast(v#1 as decimal(11,0)) AS v#1] <----- re-alias with 
exprId=#1
         :  +- Project [v#1] <----- dataType=decimal(10, 0)
         :     +- SubqueryAlias t3
         :        +- SubqueryAlias tbl
         :           +- LocalRelation [v#1]
   
         +- Project [v#2] <----- dataType=decimal(11, 0)
            +- ...
   ```
   the parent `Project` has a attribute reference with `exprId=1` and 
`dataType=decimal(10, 0)`. So, IIUC we need to update the data type, too. If we 
don't update it, plan integrity can break, e.g., in 
`PushProjectionThroughUnion`.
   ```
   -- !query
   CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v)
   -- !query schema
   struct<>
   -- !query output
   
   -- !query
   SELECT t.v FROM (
     SELECT v FROM t3
     UNION ALL
     SELECT v + v AS v FROM t3
   ) t
   -- !query schema
   struct<>
   -- !query output
   org.apache.spark.sql.catalyst.errors.package$TreeNodeException
   After applying rule 
org.apache.spark.sql.catalyst.optimizer.PushProjectionThroughUnion in batch 
Operator Optimization before Inferring Filters, the structural integrity of the 
plan is broken., tree:
   'Union false, false
   :- Project [v#183]
   :  +- Project [cast(v#183 as decimal(11,0)) AS v#183]
   :     +- Project [v#183]
   :        +- LocalRelation [v#183]
   +- Project [v#184]
      +- Project [v#184]
         +- Project [CheckOverflow((promote_precision(cast(v#183 as 
decimal(11,0))) + promote_precision(cast(v#183 as decimal(11,0)))), 
DecimalType(11,0), true) AS v#184]
            +- LocalRelation [v#183]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to