Github user maropu commented on a diff in the pull request:
https://github.com/apache/spark/pull/21754#discussion_r207090221
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
---
@@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output:
Seq[Attribute], child: Exchan
// Ignore this wrapper for canonicalizing.
override def doCanonicalize(): SparkPlan = child.canonicalized
+ override protected def doPrepare(): Unit = {
+ child match {
+ case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator))
=>
+ coordinator.registerExchange(shuffleExchange)
--- End diff --
`EnsureRequirements` sets the number of exchanges in `ExchangeCoordinator`
before `ReuseExchange`;
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L85
For example, in the test in this pr, it sets `3` in `ExchangeCoordinator`;
https://github.com/apache/spark/pull/21754/files#diff-3cd46a3f60c5352282bd3f2c9efff7fcR505
`ReuseExchange` reuses some exchange and the actual number of registered
exchanges changes, e.g., in the test in this pr, the number changes from `3` to
`2`.
Then, the assertion below in `ExchangeCoordinator` fails because the
logical number of exchanges and the actual number of registered exchanges;
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala#L201
The objective of this fix is to respect the number of reused exchanges in
`ExchangeCoordinator`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]