Github user gatorsmile commented on the pull request:

    https://github.com/apache/spark/pull/9385#issuecomment-155677775
  
    Hi, @marmbrus 
    
    After digging the root reason why Expand cases failed, I found we  still 
need a deeper clean of subquery after elimination. 
    
    Let me use the following example to explain what happened in Expand. This 
query works well if we do not compare the qualifiers when comparing two 
AttributeReferences. I think this is a bug if merging 
https://github.com/apache/spark/pull/9216, right? 
    
    ```scala
    val sqlDF = sql("select a, b, sum(a) from mytable group by a, b with 
rollup").explain(true)
    ```
    
    Before subquery elimination, the subquery name "mytable" is shown in all 
the two upper layers (Aggregate and Expand). 
    ```scala
    Aggregate [a#2,b#3,grouping__id#5], [a#2,b#3,sum(cast(a#2 as bigint)) AS 
_c2#4L]
     Expand [0,1,3], [a#2,b#3], grouping__id#5
      Subquery mytable
       Project [_1#0 AS a#2,_2#1 AS b#3]
        LocalRelation [_1#0,_2#1], [[1,2],[2,4]]
    ```
    
    After subquery elimination, the subquery name "mytable" is not removed in 
these two upper layers. 
    ```scala
    Aggregate [a#2,b#3,grouping__id#5], [a#2,b#3,sum(cast(a#2 as bigint)) AS 
_c2#4L]
     Expand [0,1,3], [a#2,b#3], grouping__id#5
      Project [_1#0 AS a#2,_2#1 AS b#3]
       LocalRelation [_1#0,_2#1], [[1,2],[2,4]]
    ```
    
    In SparkStrategies, we create an array of Projections for the child 
projection of Expand.
    
    ```scala
          case e @ logical.Expand(_, _, _, child) =>
            execution.Expand(e.projections, e.output, planLater(child)) :: Nil
    ```
    
    `e.projections` calls the function `expand()`. Inside the function 
`expand()`, I do not think we should use `semanticEquals` there. 
    
    Let me post the incorrect physical plan
    
    ```scala
    TungstenAggregate(key=[a#2,b#3,grouping__id#12], functions=[(sum(cast(a#2 
as bigint)),mode=Final,isDistinct=false)], output=[a#2,b#3,_c2#11L])
     TungstenExchange hashpartitioning(a#2,b#3,grouping__id#12,5)
      TungstenAggregate(key=[a#2,b#3,grouping__id#12], functions=[(sum(cast(a#2 
as bigint)),mode=Partial,isDistinct=false)], 
output=[a#2,b#3,grouping__id#12,currentSum#15L])
       Expand [List(a#2, b#3, 0),List(a#2, b#3, 1),List(a#2, b#3, 3)], 
[a#2,b#3,grouping__id#12]
        LocalTableScan [a#2,b#3], [[1,2],[2,4]]
    ```
    
    For you convenience, below is the correct one:
    
    ```scala
    TungstenAggregate(key=[a#2,b#3,grouping__id#12], functions=[(sum(cast(a#2 
as bigint)),mode=Final,isDistinct=false)], output=[a#2,b#3,_c2#11L])
     TungstenExchange hashpartitioning(a#2,b#3,grouping__id#12,5)
      TungstenAggregate(key=[a#2,b#3,grouping__id#12], functions=[(sum(cast(a#2 
as bigint)),mode=Partial,isDistinct=false)], 
output=[a#2,b#3,grouping__id#12,currentSum#15L])
       Expand [List(null, null, 0),List(a#2, null, 1),List(a#2, b#3, 3)], 
[a#2,b#3,grouping__id#12]
        LocalTableScan [a#2,b#3], [[1,2],[2,4]]
    ```
    
    My current fix does not fix this issue yet. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to