peter-toth commented on a change in pull request #28885:
URL: https://github.com/apache/spark/pull/28885#discussion_r459314352
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
##########
@@ -156,4 +158,46 @@ class ExchangeSuite extends SparkPlanTest with
SharedSparkSession {
val projection2 = cached.select("_1", "_3").queryExecution.executedPlan
assert(!projection1.sameResult(projection2))
}
+
+ test("Exchange reuse across the whole plan") {
+ val df = sql(
+ """
+ |SELECT
+ | (SELECT max(a.key) FROM testData AS a JOIN testData AS b ON b.key =
a.key),
+ | a.key
+ |FROM testData AS a
+ |JOIN testData AS b ON b.key = a.key
+ """.stripMargin)
+
+ val plan = df.queryExecution.executedPlan
+
+ val exchangeIds = plan.collectWithSubqueries { case e: Exchange => e.id }
+ val reusedExchangeIds = plan.collectWithSubqueries {
+ case re: ReusedExchangeExec => re.child.id
+ }
+
+ assert(exchangeIds.size == 2, "Whole plan exchange reusing not working
correctly")
+ assert(reusedExchangeIds.size == 3, "Whole plan exchange reusing not
working correctly")
+ assert(reusedExchangeIds.forall(exchangeIds.contains(_)),
+ "ReusedExchangeExec should reuse an existing exchange")
+
+ val df2 = sql(
Review comment:
Ok I will add some comments to here.
About the one rule issue, please see 2. in the description and
https://github.com/apache/spark/pull/28885#discussion_r459255589
But let me add some more explanation. Currently the plan of the query in 2.
is the following:
```
== Physical Plan ==
*(15) SortMergeJoin [id#46L], [id#58L], Inner
:- *(7) Sort [id#46L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#46L, 5), true, [id=#979]
: +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[])
: +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#975]
: +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[])
: +- Union
: :- *(2) Project [id#46L, k#49L]
: : +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner,
BuildRight
: : :- *(2) Project [id#46L, k#47L]
: : : +- *(2) Filter isnotnull(id#46L)
: : : +- *(2) ColumnarToRow
: : : +- FileScan parquet
default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)],
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar...,
PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN
dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema:
struct<id:bigint>
: : : +- SubqueryBroadcast
dynamicpruning#66, 0, [k#49L], [id=#926]
: : : +- ReusedExchange [k#49L],
BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])),
[id=#656]
: : +- BroadcastExchange
HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
: : +- *(1) Project [k#49L]
: : +- *(1) Filter ((isnotnull(id#48L) AND (id#48L
< 2)) AND isnotnull(k#49L))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet
default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L),
(id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar...,
PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2),
IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
: +- *(4) Project [id#46L, k#49L]
: +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner,
BuildRight
: :- *(4) Project [id#46L, k#47L]
: : +- *(4) Filter isnotnull(id#46L)
: : +- *(4) ColumnarToRow
: : +- FileScan parquet
default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)],
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar...,
PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN
dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema:
struct<id:bigint>
: : +- ReusedSubquery SubqueryBroadcast
dynamicpruning#66, 0, [k#49L], [id=#926]
: +- ReusedExchange [k#49L], BroadcastExchange
HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
+- *(14) Sort [id#58L ASC NULLS FIRST], false, 0
+- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5),
true, [id=#761] <== this reuse node points to a non-existing node
```
Obviously the `ReusedExchange` pointing to the non-existing `id=#761` is the
issue. It should be pointing to `id=#979`. But why did this happen? When the
rule `ReuseExchange` inserted the `ReusedExchange` node into the plan, the id
of the referenced exchange was `id=#761`. But then `ReuseSubquery` came and
replaced the immutable `id=#761` node to another instance (`id=#979`). It
didn't replace that particular exchange node, it just made a change under it
somewhere and the change propagated up even to the root. (The nodes are
immutable, so if you change a node in the tree all the ancestors will be
replaced to a different instance). How do we know that this did happen due to
the rule `ReuseSubquery`? Well there is a `ReusedSubquery` node under `id=#979`
that can be there due to `ReuseSubquery` only.
Just to mention all the details, the example of 2. contains the issue of 1.
as well (I simply couldn't figure out an issue where only the 2. happens, but
that doesn't mean that they are the same issue). You can find a
`ReusedExchange` node pointing `id=#656`. That reuse node is correct, as
`id=#656` does exist in the plan, but it is inserted under the exchange node
`id=#761` by the second traversal (`transformAllExpressions`) in
`ReuseExchange`, which also altered `id=#761` so in fact the in 2. the
following events happened:
- rule `ReuseExchange` inserted the `ReusedExchange` pointing to `id=#761`
into the plan
- rule `ReuseExchange` altered the exchange node `id=#761` to `id=X`
- rule `ReuseSubquery` altered the exchange node `id=X` to `id=#975`
The source of issues is that currently we might alter a node that a previous
sweep was referenced to. The fix I suggest in this PR is to do 1 combined
sweep, do it on the whole plan, and do it bottom-up manner.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]