Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/9385#issuecomment-155677775
Hi, @marmbrus
After digging the root reason why Expand cases failed, I found we still
need a deeper clean of subquery after elimination.
Let me use the following example to explain what happened in Expand. This
query works well if we do not compare the qualifiers when comparing two
AttributeReferences. I think this is a bug if merging
https://github.com/apache/spark/pull/9216, right?
```scala
val sqlDF = sql("select a, b, sum(a) from mytable group by a, b with
rollup").explain(true)
```
Before subquery elimination, the subquery name "mytable" is shown in all
the two upper layers (Aggregate and Expand).
```scala
Aggregate [a#2,b#3,grouping__id#5], [a#2,b#3,sum(cast(a#2 as bigint)) AS
_c2#4L]
Expand [0,1,3], [a#2,b#3], grouping__id#5
Subquery mytable
Project [_1#0 AS a#2,_2#1 AS b#3]
LocalRelation [_1#0,_2#1], [[1,2],[2,4]]
```
After subquery elimination, the subquery name "mytable" is not removed in
these two upper layers.
```scala
Aggregate [a#2,b#3,grouping__id#5], [a#2,b#3,sum(cast(a#2 as bigint)) AS
_c2#4L]
Expand [0,1,3], [a#2,b#3], grouping__id#5
Project [_1#0 AS a#2,_2#1 AS b#3]
LocalRelation [_1#0,_2#1], [[1,2],[2,4]]
```
In SparkStrategies, we create an array of Projections for the child
projection of Expand.
```scala
case e @ logical.Expand(_, _, _, child) =>
execution.Expand(e.projections, e.output, planLater(child)) :: Nil
```
`e.projections` calls the function `expand()`. Inside the function
`expand()`, I do not think we should use `semanticEquals` there.
Let me post the incorrect physical plan
```scala
TungstenAggregate(key=[a#2,b#3,grouping__id#12], functions=[(sum(cast(a#2
as bigint)),mode=Final,isDistinct=false)], output=[a#2,b#3,_c2#11L])
TungstenExchange hashpartitioning(a#2,b#3,grouping__id#12,5)
TungstenAggregate(key=[a#2,b#3,grouping__id#12], functions=[(sum(cast(a#2
as bigint)),mode=Partial,isDistinct=false)],
output=[a#2,b#3,grouping__id#12,currentSum#15L])
Expand [List(a#2, b#3, 0),List(a#2, b#3, 1),List(a#2, b#3, 3)],
[a#2,b#3,grouping__id#12]
LocalTableScan [a#2,b#3], [[1,2],[2,4]]
```
For you convenience, below is the correct one:
```scala
TungstenAggregate(key=[a#2,b#3,grouping__id#12], functions=[(sum(cast(a#2
as bigint)),mode=Final,isDistinct=false)], output=[a#2,b#3,_c2#11L])
TungstenExchange hashpartitioning(a#2,b#3,grouping__id#12,5)
TungstenAggregate(key=[a#2,b#3,grouping__id#12], functions=[(sum(cast(a#2
as bigint)),mode=Partial,isDistinct=false)],
output=[a#2,b#3,grouping__id#12,currentSum#15L])
Expand [List(null, null, 0),List(a#2, null, 1),List(a#2, b#3, 3)],
[a#2,b#3,grouping__id#12]
LocalTableScan [a#2,b#3], [[1,2],[2,4]]
```
My current fix does not fix this issue yet.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]