peter-toth commented on a change in pull request #28885:
URL: https://github.com/apache/spark/pull/28885#discussion_r455648388
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
##########
@@ -95,46 +89,3 @@ case class ReusedExchangeExec(override val output:
Seq[Attribute], child: Exchan
|""".stripMargin
}
}
-
-/**
- * Find out duplicated exchanges in the spark plan, then use the same exchange
for all the
- * references.
- */
-case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
-
- def apply(plan: SparkPlan): SparkPlan = {
- if (!conf.exchangeReuseEnabled) {
- return plan
- }
- // Build a hash map using schema of exchanges to avoid O(N*N) sameResult
calls.
- val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
-
- // Replace a Exchange duplicate with a ReusedExchange
- def reuse: PartialFunction[Exchange, SparkPlan] = {
- case exchange: Exchange =>
- val sameSchema = exchanges.getOrElseUpdate(exchange.schema,
ArrayBuffer[Exchange]())
- val samePlan = sameSchema.find { e =>
- exchange.sameResult(e)
- }
- if (samePlan.isDefined) {
- // Keep the output of this exchange, the following plans require
that to resolve
- // attributes.
- ReusedExchangeExec(exchange.output, samePlan.get)
- } else {
- sameSchema += exchange
- exchange
- }
- }
-
- plan transformUp {
- case exchange: Exchange => reuse(exchange)
- } transformAllExpressions {
- // Lookup inside subqueries for duplicate exchanges
- case in: InSubqueryExec =>
- val newIn = in.plan.transformUp {
- case exchange: Exchange => reuse(exchange)
- }
- in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
- }
- }
Review comment:
This `transformUp` and then `transformAllExpressions` was actually a bit
weird for 2 reasons:
- A minor issue is that `reuse` is a partial function and yet we use it as a
normal function, I mean this code probably should have been written as
```
plan transformUp reuse transformAllExpressions {
// Lookup inside subqueries for duplicate exchanges
case in: InSubqueryExec =>
val newIn = in.plan.transformUp reuse
in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
}
```
and the definition of `reuse` should have been `def reuse:
PartialFunction[SparkPlan, SparkPlan] = {`
- A bigger issue is that this part of the code traverses the plan 2 times,
first with the `transformUp` and then with the `transformAllExpressions` and
both traversals can insert reuse references to the plan. Imagine that the first
traversal (`transformUp`) inserts a reuse reference to an exchange. Let's say
it is inserts a `ReuseExchange` node pointing to the `Exchange id=1` but then
the second traversal (`transformAllExpressions`) finds an `InSubqueryExec`
expression in a `FileSourceScanExec` under the node `Exchange id=1`. If there
is reuse opportunity in the subplan of that `InSubqueryExec` then the change
due to inserting another reuse node into the subplan propagates up and results
that `Exchange id=1` gets replaced to `Exchange id=x` in the parent plan. In
this case the reuse node created in the first traversal pointing to `Exchange
id=1` will be invalid.
(Please note that this issue is very similar to what I showed in the
description of the PR but in that case the 2 traversals were due to the
separate `ReuseSubquery` and `ReuseExchange` rules.)
I think the fix to this issue is to use an 1 pass, whole-plan, bottom-up
traversal like I did in `ReuseExchangeAndSubquery` in this PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]