Github user maropu commented on a diff in the pull request:
https://github.com/apache/spark/pull/21754#discussion_r203437795
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
---
@@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output:
Seq[Attribute], child: Exchan
*/
case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
+ private def supportReuseExchange(exchange: Exchange): Boolean = exchange
match {
+ // If a coordinator defined in an exchange operator, the exchange
cannot be reused
--- End diff --
In the cache case, the reuse just doesn't happen, so no exception is thrown;
```
// the cache case
== Physical Plan ==
*(3) HashAggregate(keys=[imei#31], functions=[])
+- Exchange(coordinator id: 1599206176) hashpartitioning(imei#31, 200),
coordinator[target post-shuffle partition size: 67108864]
+- *(2) HashAggregate(keys=[imei#31], functions=[])
+- *(2) Project [imei#31]
+- *(2) BroadcastHashJoin [imei#31], [imei#101], Inner, BuildRight
:- *(2) Filter isnotnull(imei#31)
: +- *(2) InMemoryTableScan [imei#31], [isnotnull(imei#31)]
: +- InMemoryRelation [imei#31, speed#32],
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1
replicas),*(1) Scan JDBCRelation(device_loc) [numPartitions=1]
[imei#31,speed#32] PushedFilters: [], ReadSchema: struct<imei:int,speed:int>
,None)
: +- *(1) Scan JDBCRelation(device_loc)
[numPartitions=1] [imei#31,speed#32] PushedFilters: [], ReadSchema:
struct<imei:int,speed:int>
+- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *(1) Filter isnotnull(imei#101)
+- *(1) InMemoryTableScan [imei#101],
[isnotnull(imei#101)]
+- InMemoryRelation [imei#101, speed#102],
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1
replicas),*(1) Scan JDBCRelation(device_loc) [numPartitions=1]
[imei#31,speed#32] PushedFilters: [], ReadSchema: struct<imei:int,speed:int>
,None)
+- *(1) Scan JDBCRelation(device_loc)
[numPartitions=1] [imei#31,speed#32] PushedFilters: [], ReadSchema:
struct<imei:int,speed:int>
// the non-cache case
scala> df.explain
== Physical Plan ==
*(5) HashAggregate(keys=[imei#0], functions=[])
+- *(5) HashAggregate(keys=[imei#0], functions=[])
+- *(5) Project [imei#0]
+- *(5) SortMergeJoin [imei#0], [imei#27], Inner
:- *(2) Sort [imei#0 ASC NULLS FIRST], false, 0
: +- Exchange(coordinator id: 973215530) hashpartitioning(imei#0,
200), coordinator[target post-shuffle partition size: 67108864]
: +- *(1) Scan JDBCRelation(device_loc) [numPartitions=1]
[imei#0] PushedFilters: [*IsNotNull(imei)], ReadSchema: struct<imei:int>
+- *(4) Sort [imei#27 ASC NULLS FIRST], false, 0
+- ReusedExchange [imei#27], Exchange(coordinator id:
973215530) hashpartitioning(imei#0, 200), coordinator[target post-shuffle
partition size: 67108864]
```
`ExchangeCoordinator` determines how we shuffle data between stages, so if
totally-unrelated stages share an exchange, IIUC the share easily breaks the
coordinator semantics. My hunch is that, to support the reuse an exchange with
a coordinator, it needs more logics in `ExchangeCoordinator` to take the share
into consideration.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]