Github user gatorsmile commented on the pull request: https://github.com/apache/spark/pull/9385#issuecomment-155677775 Hi, @marmbrus After digging the root reason why Expand cases failed, I found we still need a deeper clean of subquery after elimination. Let me use the following example to explain what happened in Expand. This query works well if we do not compare the qualifiers when comparing two AttributeReferences. I think this is a bug if merging https://github.com/apache/spark/pull/9216, right? ```scala val sqlDF = sql("select a, b, sum(a) from mytable group by a, b with rollup").explain(true) ``` Before subquery elimination, the subquery name "mytable" is shown in all the two upper layers (Aggregate and Expand). ```scala Aggregate [a#2,b#3,grouping__id#5], [a#2,b#3,sum(cast(a#2 as bigint)) AS _c2#4L] Expand [0,1,3], [a#2,b#3], grouping__id#5 Subquery mytable Project [_1#0 AS a#2,_2#1 AS b#3] LocalRelation [_1#0,_2#1], [[1,2],[2,4]] ``` After subquery elimination, the subquery name "mytable" is not removed in these two upper layers. ```scala Aggregate [a#2,b#3,grouping__id#5], [a#2,b#3,sum(cast(a#2 as bigint)) AS _c2#4L] Expand [0,1,3], [a#2,b#3], grouping__id#5 Project [_1#0 AS a#2,_2#1 AS b#3] LocalRelation [_1#0,_2#1], [[1,2],[2,4]] ``` In SparkStrategies, we create an array of Projections for the child projection of Expand. ```scala case e @ logical.Expand(_, _, _, child) => execution.Expand(e.projections, e.output, planLater(child)) :: Nil ``` `e.projections` calls the function `expand()`. Inside the function `expand()`, I do not think we should use `semanticEquals` there. Let me post the incorrect physical plan ```scala TungstenAggregate(key=[a#2,b#3,grouping__id#12], functions=[(sum(cast(a#2 as bigint)),mode=Final,isDistinct=false)], output=[a#2,b#3,_c2#11L]) TungstenExchange hashpartitioning(a#2,b#3,grouping__id#12,5) TungstenAggregate(key=[a#2,b#3,grouping__id#12], functions=[(sum(cast(a#2 as bigint)),mode=Partial,isDistinct=false)], output=[a#2,b#3,grouping__id#12,currentSum#15L]) Expand [List(a#2, b#3, 0),List(a#2, b#3, 1),List(a#2, b#3, 3)], [a#2,b#3,grouping__id#12] LocalTableScan [a#2,b#3], [[1,2],[2,4]] ``` For you convenience, below is the correct one: ```scala TungstenAggregate(key=[a#2,b#3,grouping__id#12], functions=[(sum(cast(a#2 as bigint)),mode=Final,isDistinct=false)], output=[a#2,b#3,_c2#11L]) TungstenExchange hashpartitioning(a#2,b#3,grouping__id#12,5) TungstenAggregate(key=[a#2,b#3,grouping__id#12], functions=[(sum(cast(a#2 as bigint)),mode=Partial,isDistinct=false)], output=[a#2,b#3,grouping__id#12,currentSum#15L]) Expand [List(null, null, 0),List(a#2, null, 1),List(a#2, b#3, 3)], [a#2,b#3,grouping__id#12] LocalTableScan [a#2,b#3], [[1,2],[2,4]] ``` My current fix does not fix this issue yet.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org