peter-toth commented on a change in pull request #28885:
URL: https://github.com/apache/spark/pull/28885#discussion_r459314352



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
##########
@@ -156,4 +158,46 @@ class ExchangeSuite extends SparkPlanTest with 
SharedSparkSession {
     val projection2 = cached.select("_1", "_3").queryExecution.executedPlan
     assert(!projection1.sameResult(projection2))
   }
+
+  test("Exchange reuse across the whole plan") {
+    val df = sql(
+      """
+        |SELECT
+        |  (SELECT max(a.key) FROM testData AS a JOIN testData AS b ON b.key = 
a.key),
+        |  a.key
+        |FROM testData AS a
+        |JOIN testData AS b ON b.key = a.key
+      """.stripMargin)
+
+    val plan = df.queryExecution.executedPlan
+
+    val exchangeIds = plan.collectWithSubqueries { case e: Exchange => e.id }
+    val reusedExchangeIds = plan.collectWithSubqueries {
+      case re: ReusedExchangeExec => re.child.id
+    }
+
+    assert(exchangeIds.size == 2, "Whole plan exchange reusing not working 
correctly")
+    assert(reusedExchangeIds.size == 3, "Whole plan exchange reusing not 
working correctly")
+    assert(reusedExchangeIds.forall(exchangeIds.contains(_)),
+      "ReusedExchangeExec should reuse an existing exchange")
+
+    val df2 = sql(

Review comment:
       Ok I will add some comments to here.
   
   About the one rule issue, please see 2. in the description and 
https://github.com/apache/spark/pull/28885#discussion_r459255589
   But let me add some more explanation. Currently the plan of the query in 2. 
is the following:
   ```
   == Physical Plan ==
   *(15) SortMergeJoin [id#46L], [id#58L], Inner
   :- *(7) Sort [id#46L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#46L, 5), true, [id=#979]
   :     +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :        +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#975]
   :           +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :              +- Union
   :                 :- *(2) Project [id#46L, k#49L]
   :                 :  +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, 
BuildRight
   :                 :     :- *(2) Project [id#46L, k#47L]
   :                 :     :  +- *(2) Filter isnotnull(id#46L)
   :                 :     :     +- *(2) ColumnarToRow
   :                 :     :        +- FileScan parquet 
default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], 
Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar...,
 PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN 
dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct<id:bigint>
   :                 :     :              +- SubqueryBroadcast 
dynamicpruning#66, 0, [k#49L], [id=#926]
   :                 :     :                 +- ReusedExchange [k#49L], 
BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), 
[id=#656]
   :                 :     +- BroadcastExchange 
HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
   :                 :        +- *(1) Project [k#49L]
   :                 :           +- *(1) Filter ((isnotnull(id#48L) AND (id#48L 
< 2)) AND isnotnull(k#49L))
   :                 :              +- *(1) ColumnarToRow
   :                 :                 +- FileScan parquet 
default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), 
(id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar...,
 PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), 
IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   :                 +- *(4) Project [id#46L, k#49L]
   :                    +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, 
BuildRight
   :                       :- *(4) Project [id#46L, k#47L]
   :                       :  +- *(4) Filter isnotnull(id#46L)
   :                       :     +- *(4) ColumnarToRow
   :                       :        +- FileScan parquet 
default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], 
Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar...,
 PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN 
dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct<id:bigint>
   :                       :              +- ReusedSubquery SubqueryBroadcast 
dynamicpruning#66, 0, [k#49L], [id=#926]
   :                       +- ReusedExchange [k#49L], BroadcastExchange 
HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
   +- *(14) Sort [id#58L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), 
true, [id=#761] <== this reuse node points to a non-existing node
   ```
   Obviously the `ReusedExchange` pointing to the non-existing `id=#761` is the 
issue. It should be pointing to `id=#979`. But why did this happen? When the 
rule `ReuseExchange` inserted the `ReusedExchange` node into the plan, the id 
of the referenced exchange was `id=#761`. But then `ReuseSubquery` came and 
replaced the immutable `id=#761` node to another instance (`id=#979`). It 
didn't replace that particular exchange node, it just made a change under it 
somewhere and the change propagated up even to the root. (The nodes are 
immutable, so if you change a node in the tree all the ancestors will be 
replaced to a different instance). How do we know that this did happen due to 
the rule `ReuseSubquery`? Well there is a `ReusedSubquery` node under `id=#979` 
that can be there due to `ReuseSubquery` only.
   
   Just to mention all the details, the example of 2. contains the issue of 1. 
as well (I simply couldn't figure out an issue where only the 2. happens, but 
that doesn't mean that they are the same issue). You can find a 
`ReusedExchange` node pointing `id=#656`. That reuse node is correct, as 
`id=#656` does exist in the plan, but it is inserted under the exchange node 
`id=#761` by the second traversal (`transformAllExpressions`) in 
`ReuseExchange`, which also altered `id=#761` so in fact the in 2. the 
following events happened:
   - rule `ReuseExchange` inserted the `ReusedExchange` pointing to `id=#761` 
into the plan
   - rule `ReuseExchange` altered the exchange node `id=#761` to `id=X`
   - rule `ReuseSubquery` altered the  exchange node `id=X` to `id=#975`
   
   The source of issues is that currently we might alter a node that a previous 
sweep was referenced to. The fix I suggest in this PR is to do 1 combined 
sweep, do it on the whole plan, and do it bottom-up manner.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to