peter-toth commented on a change in pull request #28885:
URL: https://github.com/apache/spark/pull/28885#discussion_r459255589
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
##########
@@ -95,46 +89,3 @@ case class ReusedExchangeExec(override val output:
Seq[Attribute], child: Exchan
|""".stripMargin
}
}
-
-/**
- * Find out duplicated exchanges in the spark plan, then use the same exchange
for all the
- * references.
- */
-case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
-
- def apply(plan: SparkPlan): SparkPlan = {
- if (!conf.exchangeReuseEnabled) {
- return plan
- }
- // Build a hash map using schema of exchanges to avoid O(N*N) sameResult
calls.
- val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
-
- // Replace a Exchange duplicate with a ReusedExchange
- def reuse: PartialFunction[Exchange, SparkPlan] = {
- case exchange: Exchange =>
- val sameSchema = exchanges.getOrElseUpdate(exchange.schema,
ArrayBuffer[Exchange]())
- val samePlan = sameSchema.find { e =>
- exchange.sameResult(e)
- }
- if (samePlan.isDefined) {
- // Keep the output of this exchange, the following plans require
that to resolve
- // attributes.
- ReusedExchangeExec(exchange.output, samePlan.get)
- } else {
- sameSchema += exchange
- exchange
- }
- }
-
- plan transformUp {
- case exchange: Exchange => reuse(exchange)
- } transformAllExpressions {
- // Lookup inside subqueries for duplicate exchanges
- case in: InSubqueryExec =>
- val newIn = in.plan.transformUp {
- case exchange: Exchange => reuse(exchange)
- }
- in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
- }
- }
Review comment:
This is the issue described in 1. in the PR description and tested with
the case `SPARK-32041: No reuse interference inside ReuseExchange` in the new
`ReuseExchangeAndSubquerySuite`:
https://github.com/apache/spark/pull/28885/files#diff-f6f54d5cfc4254d8ed9122013394351bR28
Combining the 2 rules are required to fix 2. and tested with the case
`SPARK-32041: No reuse interference between ReuseExchange and ReuseSubquery`:
https://github.com/apache/spark/pull/28885/files#diff-f6f54d5cfc4254d8ed9122013394351bR67
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]