miaojianlong created SPARK-26361: ------------------------------------ Summary: RsuseExchange is not available in case of ColumnPruning Key: SPARK-26361 URL: https://issues.apache.org/jira/browse/SPARK-26361 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: miaojianlong
RsuseExchange is not available if there is a ColumnPruning. see this code: {code:java} val df = spark.createDataFrame(Seq((1, 1, 1))).toDF("a", "b", "c") val groupDf = df.groupBy("a").agg("b"->"max", "c"->"min") // val df1 = groupDf.select("a", "max(b)") val df1 = groupDf.drop("max(b)") val df2 = groupDf.withColumn("d", lit(1)) df1.join(df2, "a").explain(true){code} {code:java} == Analyzed Logical Plan == a: int, min(c): int, max(b): int, min(c): int, d: int Project [a#6, min(c)#16, max(b)#29, min(c)#30, d#24] +- Join Inner, (a#6 = a#34) :- Project [a#6, min(c)#16] : +- Aggregate [a#6], [a#6, max(b#7) AS max(b)#15, min(c#8) AS min(c)#16] : +- Project [_1#0 AS a#6, _2#1 AS b#7, _3#2 AS c#8] : +- LocalRelation [_1#0, _2#1, _3#2] +- Project [a#34, max(b)#29, min(c)#30, 1 AS d#24] +- Aggregate [a#34], [a#34, max(b#35) AS max(b)#29, min(c#36) AS min(c)#30] +- Project [_1#0 AS a#34, _2#1 AS b#35, _3#2 AS c#36] +- LocalRelation [_1#0, _2#1, _3#2] == Optimized Logical Plan == Project [a#6, min(c)#16, max(b)#29, min(c)#30, d#24] +- Join Inner, (a#6 = a#34) :- Aggregate [a#6], [a#6, min(c#8) AS min(c)#16] : +- LocalRelation [a#6, c#8] +- Aggregate [a#34], [a#34, max(b#35) AS max(b)#29, min(c#36) AS min(c)#30, 1 AS d#24] +- LocalRelation [a#34, b#35, c#36] == Physical Plan == *(4) Project [a#6, min(c)#16, max(b)#29, min(c)#30, d#24] +- *(4) BroadcastHashJoin [a#6], [a#34], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *(2) HashAggregate(keys=[a#6], functions=[min(c#8)], output=[a#6, min(c)#16]) : +- Exchange hashpartitioning(a#6, 200) : +- *(1) HashAggregate(keys=[a#6], functions=[partial_min(c#8)], output=[a#6, min#44]) : +- LocalTableScan [a#6, c#8] +- *(4) HashAggregate(keys=[a#34], functions=[max(b#35), min(c#36)], output=[a#34, max(b)#29, min(c)#30, d#24]) +- Exchange hashpartitioning(a#34, 200) +- *(3) HashAggregate(keys=[a#34], functions=[partial_max(b#35), partial_min(c#36)], output=[a#34, max#47, min#48]) +- LocalTableScan [a#34, b#35, c#36] {code} I look at the code, ReuseExchange first judged that schame is consistent, and some scenes will do a lot of complicated operations for a certain df and then join, this time it is very likely that the scheme will not match。 I feel that we need to consider solving this situation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org