Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21754#discussion_r203437795
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
    @@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
      */
     case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
     
    +  private def supportReuseExchange(exchange: Exchange): Boolean = exchange 
match {
    +    // If a coordinator defined in an exchange operator, the exchange 
cannot be reused
    --- End diff --
    
    In the cache case, the reuse just doesn't happen, so no exception is thrown;
    ```
    // the cache case
    == Physical Plan ==
    *(3) HashAggregate(keys=[imei#31], functions=[])
    +- Exchange(coordinator id: 1599206176) hashpartitioning(imei#31, 200), 
coordinator[target post-shuffle partition size: 67108864]
       +- *(2) HashAggregate(keys=[imei#31], functions=[])
          +- *(2) Project [imei#31]
             +- *(2) BroadcastHashJoin [imei#31], [imei#101], Inner, BuildRight
                :- *(2) Filter isnotnull(imei#31)
                :  +- *(2) InMemoryTableScan [imei#31], [isnotnull(imei#31)]
                :        +- InMemoryRelation [imei#31, speed#32], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) Scan JDBCRelation(device_loc) [numPartitions=1] 
[imei#31,speed#32] PushedFilters: [], ReadSchema: struct<imei:int,speed:int>
    ,None)
                :              +- *(1) Scan JDBCRelation(device_loc) 
[numPartitions=1] [imei#31,speed#32] PushedFilters: [], ReadSchema: 
struct<imei:int,speed:int>
                +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                   +- *(1) Filter isnotnull(imei#101)
                      +- *(1) InMemoryTableScan [imei#101], 
[isnotnull(imei#101)]
                            +- InMemoryRelation [imei#101, speed#102], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) Scan JDBCRelation(device_loc) [numPartitions=1] 
[imei#31,speed#32] PushedFilters: [], ReadSchema: struct<imei:int,speed:int>
    ,None)
                                  +- *(1) Scan JDBCRelation(device_loc) 
[numPartitions=1] [imei#31,speed#32] PushedFilters: [], ReadSchema: 
struct<imei:int,speed:int>
    
    // the non-cache case
    scala> df.explain
    == Physical Plan ==
    *(5) HashAggregate(keys=[imei#0], functions=[])
    +- *(5) HashAggregate(keys=[imei#0], functions=[])
       +- *(5) Project [imei#0]
          +- *(5) SortMergeJoin [imei#0], [imei#27], Inner
             :- *(2) Sort [imei#0 ASC NULLS FIRST], false, 0
             :  +- Exchange(coordinator id: 973215530) hashpartitioning(imei#0, 
200), coordinator[target post-shuffle partition size: 67108864]
             :     +- *(1) Scan JDBCRelation(device_loc) [numPartitions=1] 
[imei#0] PushedFilters: [*IsNotNull(imei)], ReadSchema: struct<imei:int>
             +- *(4) Sort [imei#27 ASC NULLS FIRST], false, 0
                +- ReusedExchange [imei#27], Exchange(coordinator id: 
973215530) hashpartitioning(imei#0, 200), coordinator[target post-shuffle 
partition size: 67108864]
    ```
    `ExchangeCoordinator` determines how we shuffle data between stages, so if 
totally-unrelated stages share an exchange, IIUC the share easily breaks the 
coordinator semantics. My hunch is that, to support the reuse an exchange with 
a coordinator, it needs more logics in `ExchangeCoordinator` to take the share 
into consideration.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to