Github user cenyuhai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19756#discussion_r151611386
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
    @@ -109,3 +109,67 @@ case class ReuseExchange(conf: SQLConf) extends 
Rule[SparkPlan] {
         }
       }
     }
    +
    +/**
    + * Find out duplicated coordinated exchanges in the spark plan, then use 
the same exchange for all
    + * the references.
    + */
    +case class ReuseExchangeWithCoordinator(conf: SQLConf) extends 
Rule[SparkPlan] {
    +
    +  // Returns true if a SparkPlan has coordinated ShuffleExchangeExec 
children.
    +  private def hasCoordinatedExchanges(plan: SparkPlan): Boolean = {
    +    plan.children.nonEmpty && plan.children.forall {
    +      case ShuffleExchangeExec(_, _, Some(_)) => true
    +      case _ => false
    +    }
    +  }
    +
    +  // Returns true if two sequences of exchanges are producing the same 
results.
    +  private def hasExchangesWithSameResults(
    +      source: Seq[ShuffleExchangeExec],
    +      target: Seq[ShuffleExchangeExec]): Boolean = {
    +    source.length == target.length &&
    +      source.zip(target).forall(x => 
x._1.withoutCoordinator.sameResult(x._2.withoutCoordinator))
    +  }
    +
    +  type CoordinatorSignature = (Int, Long, Option[Int])
    +
    +  def apply(plan: SparkPlan): SparkPlan = {
    +    if (!conf.exchangeReuseEnabled) {
    --- End diff --
    
    exchangeReuseEnabled still has a bug: SPARK-20295, can we use a new 
configuration?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to