Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21754#discussion_r207090221
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
    @@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
       // Ignore this wrapper for canonicalizing.
       override def doCanonicalize(): SparkPlan = child.canonicalized
     
    +  override protected def doPrepare(): Unit = {
    +    child match {
    +      case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) 
=>
    +        coordinator.registerExchange(shuffleExchange)
    --- End diff --
    
    `EnsureRequirements` sets the number of exchanges in `ExchangeCoordinator` 
before `ReuseExchange`;
    
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L85
    For example, in the test in this pr, it sets `3` in `ExchangeCoordinator`;
    
https://github.com/apache/spark/pull/21754/files#diff-3cd46a3f60c5352282bd3f2c9efff7fcR505
    
    `ReuseExchange` reuses some exchange and the actual number of registered 
exchanges changes, e.g., in the test in this pr, the number changes from `3` to 
`2`.
    
    Then, the assertion below in `ExchangeCoordinator` fails because the 
logical number of exchanges and the actual number of registered exchanges;
    
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala#L201
    
    The objective of this fix is to respect the number of reused exchanges in 
`ExchangeCoordinator`.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to