peter-toth commented on a change in pull request #28885:
URL: https://github.com/apache/spark/pull/28885#discussion_r455648388



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
##########
@@ -95,46 +89,3 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
        |""".stripMargin
   }
 }
-
-/**
- * Find out duplicated exchanges in the spark plan, then use the same exchange 
for all the
- * references.
- */
-case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
-
-  def apply(plan: SparkPlan): SparkPlan = {
-    if (!conf.exchangeReuseEnabled) {
-      return plan
-    }
-    // Build a hash map using schema of exchanges to avoid O(N*N) sameResult 
calls.
-    val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
-
-    // Replace a Exchange duplicate with a ReusedExchange
-    def reuse: PartialFunction[Exchange, SparkPlan] = {
-      case exchange: Exchange =>
-        val sameSchema = exchanges.getOrElseUpdate(exchange.schema, 
ArrayBuffer[Exchange]())
-        val samePlan = sameSchema.find { e =>
-          exchange.sameResult(e)
-        }
-        if (samePlan.isDefined) {
-          // Keep the output of this exchange, the following plans require 
that to resolve
-          // attributes.
-          ReusedExchangeExec(exchange.output, samePlan.get)
-        } else {
-          sameSchema += exchange
-          exchange
-        }
-    }
-
-    plan transformUp {
-      case exchange: Exchange => reuse(exchange)
-    } transformAllExpressions {
-      // Lookup inside subqueries for duplicate exchanges
-      case in: InSubqueryExec =>
-        val newIn = in.plan.transformUp {
-          case exchange: Exchange => reuse(exchange)
-        }
-        in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
-    }
-  }

Review comment:
       This `transformUp` and then `transformAllExpressions` was actually a bit 
weird for 2 reasons:
   - A minor issue is that `reuse` is a partial function and yet we use it as a 
normal function, I mean this code probably should have been written as 
     ```
       plan transformUp reuse transformAllExpressions {
         // Lookup inside subqueries for duplicate exchanges
         case in: InSubqueryExec =>
           val newIn = in.plan.transformUp reuse
           in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
       }
     ```
     and the definition of `reuse` should have been `def reuse: 
PartialFunction[SparkPlan, SparkPlan] = {`
   - A bigger issue is that this part of the code traverses the plan 2 times, 
first with the `transformUp` and then with the `transformAllExpressions` and 
both traversals can insert reuse references to the plan. Imagine that the first 
traversal (`transformUp`) inserts a reuse reference to an exchange. Let's say 
it is inserts a `ReuseExchange` node pointing to the `Exchange id=1` but then 
the second traversal (`transformAllExpressions`) finds an `InSubqueryExec` 
expression in a `FileSourceScanExec` under the node `Exchange id=1`. If there 
is reuse opportunity in the subplan of that `InSubqueryExec` then the change 
due to inserting another reuse node into the subplan propagates up and results 
that `Exchange id=1` gets replaced to `Exchange id=x` in the parent plan. In 
this case the reuse node created in the first traversal pointing to `Exchange 
id=1` will be invalid.
     (Please note that this issue is very similar to what I showed in the 
description of the PR but in that case the 2 traversals were due to the 
separate `ReuseSubquery` and `ReuseExchange` rules.)
     I think the fix to this issue is to use an 1 pass, whole-plan, bottom-up 
traversal like I did in `ReuseExchangeAndSubquery` in this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to