Repository: spark
Updated Branches:
  refs/heads/master 02f967795 -> efef55388


[SPARK-24705][SQL] ExchangeCoordinator broken when duplicate exchanges reused

## What changes were proposed in this pull request?
In the current master, `EnsureRequirements` sets the number of exchanges in 
`ExchangeCoordinator` before `ReuseExchange`. Then, `ReuseExchange` removes 
some duplicate exchange and the actual number of registered exchanges changes. 
Finally, the assertion in `ExchangeCoordinator` fails because the logical 
number of exchanges and the actual number of registered exchanges become 
different;
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala#L201

This pr fixed the issue and the code to reproduce this is as follows;
```
scala> sql("SET spark.sql.adaptive.enabled=true")
scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
scala> val df = spark.range(1).selectExpr("id AS key", "id AS value")
scala> val resultDf = df.join(df, "key").join(df, "key")
scala> resultDf.show
...
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  ... 101 more
Caused by: java.lang.AssertionError: assertion failed
  at scala.Predef$.assert(Predef.scala:156)
  at 
org.apache.spark.sql.execution.exchange.ExchangeCoordinator.doEstimationIfNecessary(ExchangeCoordinator.scala:201)
  at 
org.apache.spark.sql.execution.exchange.ExchangeCoordinator.postShuffleRDD(ExchangeCoordinator.scala:259)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:124)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
...
```

## How was this patch tested?
Added tests in `ExchangeCoordinatorSuite`.

Author: Takeshi Yamamuro <yamam...@apache.org>

Closes #21754 from maropu/SPARK-24705-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/efef5538
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/efef5538
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/efef5538

Branch: refs/heads/master
Commit: efef55388fedef3f7954a385776e666ad4597a58
Parents: 02f9677
Author: Takeshi Yamamuro <yamam...@apache.org>
Authored: Thu Aug 2 13:05:36 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Thu Aug 2 13:05:36 2018 -0700

----------------------------------------------------------------------
 .../execution/exchange/EnsureRequirements.scala |  1 -
 .../exchange/ExchangeCoordinator.scala          | 17 ++++++++++------
 .../execution/ExchangeCoordinatorSuite.scala    | 21 ++++++++++++++++----
 3 files changed, 28 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/efef5538/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index d96ecba..d2d5011 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -82,7 +82,6 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
       if (adaptiveExecutionEnabled && supportsCoordinator) {
         val coordinator =
           new ExchangeCoordinator(
-            children.length,
             targetPostShuffleInputSize,
             minNumPostShufflePartitions)
         children.zip(requiredChildDistributions).map {

http://git-wip-us.apache.org/repos/asf/spark/blob/efef5538/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
index 051e610..f5d93ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
@@ -83,7 +83,6 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, 
SparkPlan}
  *  - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MB)
  */
 class ExchangeCoordinator(
-    numExchanges: Int,
     advisoryTargetPostShuffleInputSize: Long,
     minNumPostShufflePartitions: Option[Int] = None)
   extends Logging {
@@ -91,8 +90,14 @@ class ExchangeCoordinator(
   // The registered Exchange operators.
   private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]()
 
+  // `lazy val` is used here so that we could notice the wrong use of this 
class, e.g., all the
+  // exchanges should be registered before `postShuffleRDD` called first time. 
If a new exchange is
+  // registered after the `postShuffleRDD` call, `assert(exchanges.length == 
numExchanges)` fails
+  // in `doEstimationIfNecessary`.
+  private[this] lazy val numExchanges = exchanges.size
+
   // This map is used to lookup the post-shuffle ShuffledRowRDD for an 
Exchange operator.
-  private[this] val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] 
=
+  private[this] lazy val postShuffleRDDs: JMap[ShuffleExchangeExec, 
ShuffledRowRDD] =
     new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)
 
   // A boolean that indicates if this coordinator has made decision on how to 
shuffle data.
@@ -117,10 +122,6 @@ class ExchangeCoordinator(
    */
   def estimatePartitionStartIndices(
       mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
-    // If we have mapOutputStatistics.length < numExchange, it is because we 
do not submit
-    // a stage when the number of partitions of this dependency is 0.
-    assert(mapOutputStatistics.length <= numExchanges)
-
     // If minNumPostShufflePartitions is defined, it is possible that we need 
to use a
     // value less than advisoryTargetPostShuffleInputSize as the target input 
size of
     // a post shuffle task.
@@ -228,6 +229,10 @@ class ExchangeCoordinator(
         j += 1
       }
 
+      // If we have mapOutputStatistics.length < numExchange, it is because we 
do not submit
+      // a stage when the number of partitions of this dependency is 0.
+      assert(mapOutputStatistics.length <= numExchanges)
+
       // Now, we estimate partitionStartIndices. partitionStartIndices.length 
will be the
       // number of post-shuffle partitions.
       val partitionStartIndices =

http://git-wip-us.apache.org/repos/asf/spark/blob/efef5538/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
index 737eeb0..b736d43 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
@@ -21,7 +21,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.{MapOutputStatistics, SparkConf, SparkFunSuite}
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, 
ReusedExchangeExec, ShuffleExchangeExec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 
@@ -58,7 +58,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with 
BeforeAndAfterAll {
   }
 
   test("test estimatePartitionStartIndices - 1 Exchange") {
-    val coordinator = new ExchangeCoordinator(1, 100L)
+    val coordinator = new ExchangeCoordinator(100L)
 
     {
       // All bytes per partition are 0.
@@ -105,7 +105,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with 
BeforeAndAfterAll {
   }
 
   test("test estimatePartitionStartIndices - 2 Exchanges") {
-    val coordinator = new ExchangeCoordinator(2, 100L)
+    val coordinator = new ExchangeCoordinator(100L)
 
     {
       // If there are multiple values of the number of pre-shuffle partitions,
@@ -199,7 +199,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with 
BeforeAndAfterAll {
   }
 
   test("test estimatePartitionStartIndices and enforce minimal number of 
reducers") {
-    val coordinator = new ExchangeCoordinator(2, 100L, Some(2))
+    val coordinator = new ExchangeCoordinator(100L, Some(2))
 
     {
       // The minimal number of post-shuffle partitions is not enforced because
@@ -480,4 +480,17 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with 
BeforeAndAfterAll {
       withSparkSession(test, 6144, minNumPostShufflePartitions)
     }
   }
+
+  test("SPARK-24705 adaptive query execution works correctly when exchange 
reuse enabled") {
+    val test = { spark: SparkSession =>
+      spark.sql("SET spark.sql.exchange.reuse=true")
+      val df = spark.range(1).selectExpr("id AS key", "id AS value")
+      val resultDf = df.join(df, "key").join(df, "key")
+      val sparkPlan = resultDf.queryExecution.executedPlan
+      assert(sparkPlan.collect { case p: ReusedExchangeExec => p }.length == 1)
+      assert(sparkPlan.collect { case p @ ShuffleExchangeExec(_, _, Some(c)) 
=> p }.length == 3)
+      checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil)
+    }
+    withSparkSession(test, 4, None)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to