Github user maropu commented on a diff in the pull request:
https://github.com/apache/spark/pull/21754#discussion_r207090637
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
---
@@ -89,23 +97,42 @@ case class ReuseExchange(conf: SQLConf) extends
Rule[SparkPlan] {
if (!conf.exchangeReuseEnabled) {
return plan
}
+
// Build a hash map using schema of exchanges to avoid O(N*N)
sameResult calls.
val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
+
+ def tryReuseExchange(exchange: Exchange, filterCondition: Exchange =>
Boolean): SparkPlan = {
+ // the exchanges that have same results usually also have same
schemas (same column names).
+ val sameSchema = exchanges.getOrElseUpdate(exchange.schema,
ArrayBuffer[Exchange]())
+ val samePlan = sameSchema.filter(filterCondition).find { e =>
+ exchange.sameResult(e)
+ }
+ if (samePlan.isDefined) {
+ // Keep the output of this exchange, the following plans require
that to resolve
+ // attributes.
+ ReusedExchangeExec(exchange.output, samePlan.get)
+ } else {
+ sameSchema += exchange
+ exchange
+ }
+ }
+
plan.transformUp {
+ // For coordinated exchange
+ case exchange @ ShuffleExchangeExec(_, _, Some(coordinator)) =>
+ tryReuseExchange(exchange, {
+ // We can reuse an exchange with the same coordinator only
+ case ShuffleExchangeExec(_, _, Some(c)) => coordinator == c
--- End diff --
ok, I will.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]