Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207090221 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan // Ignore this wrapper for canonicalizing. override def doCanonicalize(): SparkPlan = child.canonicalized + override protected def doPrepare(): Unit = { + child match { + case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => + coordinator.registerExchange(shuffleExchange) --- End diff -- `EnsureRequirements` sets the number of exchanges in `ExchangeCoordinator` before `ReuseExchange`; https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L85 For example, in the test in this pr, it sets `3` in `ExchangeCoordinator`; https://github.com/apache/spark/pull/21754/files#diff-3cd46a3f60c5352282bd3f2c9efff7fcR505 `ReuseExchange` reuses some exchange and the actual number of registered exchanges changes, e.g., in the test in this pr, the number changes from `3` to `2`. Then, the assertion below in `ExchangeCoordinator` fails because the logical number of exchanges and the actual number of registered exchanges; https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala#L201 The objective of this fix is to respect the number of reused exchanges in `ExchangeCoordinator`.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org