peter-toth commented on code in PR #46183:
URL: https://github.com/apache/spark/pull/46183#discussion_r1579492849


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -38,28 +38,29 @@ case class RelationWrapper(cls: Class[_], outputAttrIds: 
Seq[Long])
 object DeduplicateRelations extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     val newPlan = renewDuplicatedRelations(mutable.HashSet.empty, plan)._1
-    if (newPlan.find(p => p.resolved && p.missingInput.nonEmpty).isDefined) {
-      // Wait for `ResolveMissingReferences` to resolve missing attributes 
first
-      return newPlan
-    }
+
+    def noMissingInput(p: LogicalPlan) = !p.exists(_.missingInput.nonEmpty)
+
     newPlan.resolveOperatorsUpWithPruning(
       _.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, 
UNION, COMMAND),
       ruleId) {
       case p: LogicalPlan if !p.childrenResolved => p
       // To resolve duplicate expression IDs for Join.
-      case j @ Join(left, right, _, _, _) if !j.duplicateResolved =>
+      case j @ Join(left, right, _, _, _) if !j.duplicateResolved && 
noMissingInput(right) =>
         j.copy(right = dedupRight(left, right))
       // Resolve duplicate output for LateralJoin.
-      case j @ LateralJoin(left, right, _, _) if right.resolved && 
!j.duplicateResolved =>
+      case j @ LateralJoin(left, right, _, _)
+          if right.resolved && !j.duplicateResolved && 
noMissingInput(right.plan) =>
         j.copy(right = right.withNewPlan(dedupRight(left, right.plan)))
       // Resolve duplicate output for AsOfJoin.
-      case j @ AsOfJoin(left, right, _, _, _, _, _) if !j.duplicateResolved =>
+      case j @ AsOfJoin(left, right, _, _, _, _, _)
+          if !j.duplicateResolved && noMissingInput(right) =>
         j.copy(right = dedupRight(left, right))
       // intersect/except will be rewritten to join at the beginning of 
optimizer. Here we need to
       // deduplicate the right side plan, so that we won't produce an invalid 
self-join later.
-      case i @ Intersect(left, right, _) if !i.duplicateResolved =>
+      case i @ Intersect(left, right, _) if !i.duplicateResolved && 
noMissingInput(right) =>
         i.copy(right = dedupRight(left, right))
-      case e @ Except(left, right, _) if !e.duplicateResolved =>
+      case e @ Except(left, right, _) if !e.duplicateResolved && 
noMissingInput(right) =>
         e.copy(right = dedupRight(left, right))
       // Only after we finish by-name resolution for Union
       case u: Union if !u.byName && !u.duplicateResolved =>

Review Comment:
   This is because the `Union` deduplication is different to others. Here we 
just insert additional `Project`s into the 2nd+ union legs, which doesn't alter 
attributes in a leg under the inserted `Project`.
   But in other cases deduplication is done by `dedupRight()`, which can alter 
the attributes in a leg, posibly under a missing attribute...
   
   Here is an `Union` deduplication where only a `Project [id#24 AS id#29]` is 
inserted and that doesn't have any effect on the missing `name#25`:
   ```
   === Applying Rule 
org.apache.spark.sql.catalyst.analysis.DeduplicateRelations ===
    Distinct                                                                    
                  Distinct
    +- Union false, false                                                       
                  +- Union false, false
       :- Project [id#24]                                                       
                     :- Project [id#24]
       :  +- SubqueryAlias l1                                                   
                     :  +- SubqueryAlias l1
       :     +- SubqueryAlias spark_catalog.default.v1                          
                     :     +- SubqueryAlias spark_catalog.default.v1
       :        +- View (`default`.`v1`, [id#24,name#25])                       
                     :        +- View (`default`.`v1`, [id#24,name#25])
       :           +- Project [cast(id#20 as int) AS id#24, cast(name#21 as 
string) AS name#25]      :           +- Project [cast(id#20 as int) AS id#24, 
cast(name#21 as string) AS name#25]
       :              +- Project [id#20, name#21]                               
                     :              +- Project [id#20, name#21]
       :                 +- SubqueryAlias spark_catalog.default.t               
                     :                 +- SubqueryAlias spark_catalog.default.t
       :                    +- Relation default.t[id#20,name#21] parquet        
                     :                    +- Relation default.t[id#20,name#21] 
parquet
   !   +- !Filter (count(distinct tempresolvedcolumn(name#25, name)) > cast(1 
as bigint))            +- Project [id#24 AS id#29]
   !      +- Aggregate [id#24], [id#24]                                         
                        +- !Filter (count(distinct tempresolvedcolumn(name#25, 
name)) > cast(1 as bigint))
   !         +- SubqueryAlias spark_catalog.default.v1                          
                           +- Aggregate [id#24], [id#24]
   !            +- View (`default`.`v1`, [id#24,name#25])                       
                              +- SubqueryAlias spark_catalog.default.v1
   !               +- Project [cast(id#26 as int) AS id#24, cast(name#27 as 
string) AS name#25]                  +- View (`default`.`v1`, [id#24,name#25])
   !                  +- Project [id#26, name#27]                               
                                    +- Project [cast(id#26 as int) AS id#24, 
cast(name#27 as string) AS name#25]
   !                     +- SubqueryAlias spark_catalog.default.t               
                                       +- Project [id#26, name#27]
   !                        +- Relation default.t[id#26,name#27] parquet        
                                          +- SubqueryAlias 
spark_catalog.default.t
   !                                                                            
                                             +- Relation 
default.t[id#26,name#27] parquet
   ```
   
   And here how a `Join` deduplication would look like without the `&& 
noMissingInput(right)` guard. The missing `name#25` can no longer be resolved:
   ```
   === Applying Rule 
org.apache.spark.sql.catalyst.analysis.DeduplicateRelations ===
    'Aggregate ['l1.name, 'l1.id], ['l1.id]                                     
                     'Aggregate ['l1.name, 'l1.id], ['l1.id]
    +- 'Join Inner, ('l1.id = 'l2.id)                                           
                     +- 'Join Inner, ('l1.id = 'l2.id)
       :- SubqueryAlias l1                                                      
                        :- SubqueryAlias l1
       :  +- SubqueryAlias spark_catalog.default.v1                             
                        :  +- SubqueryAlias spark_catalog.default.v1
       :     +- View (`default`.`v1`, [id#24,name#25])                          
                        :     +- View (`default`.`v1`, [id#24,name#25])
       :        +- Project [cast(id#20 as int) AS id#24, cast(name#21 as 
string) AS name#25]            :        +- Project [cast(id#20 as int) AS 
id#24, cast(name#21 as string) AS name#25]
       :           +- Project [id#20, name#21]                                  
                        :           +- Project [id#20, name#21]
       :              +- SubqueryAlias spark_catalog.default.t                  
                        :              +- SubqueryAlias spark_catalog.default.t
       :                 +- Relation default.t[id#20,name#21] parquet           
                        :                 +- Relation default.t[id#20,name#21] 
parquet
       +- SubqueryAlias l2                                                      
                        +- SubqueryAlias l2
          +- !Filter (count(distinct tempresolvedcolumn(name#25, name)) > 
cast(1 as bigint))               +- !Filter (count(distinct 
tempresolvedcolumn(name#25, name)) > cast(1 as bigint))
   !         +- Aggregate [id#24], [id#24]                                      
                              +- Aggregate [id#29], [id#29]
                +- SubqueryAlias spark_catalog.default.v1                       
                                 +- SubqueryAlias spark_catalog.default.v1
   !               +- View (`default`.`v1`, [id#24,name#25])                    
                                    +- View (`default`.`v1`, [id#29,name#30])
   !                  +- Project [cast(id#26 as int) AS id#24, cast(name#27 as 
string) AS name#25]                     +- Project [cast(id#26 as int) AS 
id#29, cast(name#27 as string) AS name#30]
                         +- Project [id#26, name#27]                            
                                          +- Project [id#26, name#27]
                            +- SubqueryAlias spark_catalog.default.t            
                                             +- SubqueryAlias 
spark_catalog.default.t
                               +- Relation default.t[id#26,name#27] parquet     
                                                +- Relation 
default.t[id#26,name#27] parquet
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to