cloud-fan commented on a change in pull request #20303: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/20303#discussion_r265899696
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
 ##########
 @@ -463,38 +465,111 @@ class ExchangeCoordinatorSuite extends SparkFunSuite 
with BeforeAndAfterAll {
 
         // Then, let's look at the number of post-shuffle partitions estimated
         // by the ExchangeCoordinator.
-        val exchanges = join.queryExecution.executedPlan.collect {
-          case e: ShuffleExchangeExec => e
+        val finalPlan = join.queryExecution.executedPlan
+          .asInstanceOf[AdaptiveSparkPlanExec].finalPlan
+        val shuffleReaders = finalPlan.collect {
+          case reader: CoalescedShuffleReaderExec => reader
         }
-        assert(exchanges.length === 3)
+        assert(shuffleReaders.length === 2)
         minNumPostShufflePartitions match {
           case Some(numPartitions) =>
-            exchanges.foreach {
-              case e: ShuffleExchangeExec =>
-                assert(e.coordinator.isDefined)
-                assert(e.outputPartitioning.numPartitions === 5)
-              case o =>
+            shuffleReaders.foreach { reader =>
+              assert(reader.outputPartitioning.numPartitions === numPartitions)
             }
 
           case None =>
-            assert(exchanges.forall(_.coordinator.isDefined))
-            assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === 
Set(5, 3))
+            shuffleReaders.foreach { reader =>
+              assert(reader.outputPartitioning.numPartitions === 3)
+            }
         }
       }
 
-      withSparkSession(test, 6144, minNumPostShufflePartitions)
+      withSparkSession(test, 12000, minNumPostShufflePartitions)
+    }
+
+    test(s"determining the number of reducers: plan already 
partitioned$testNameNote") {
+      val test: SparkSession => Unit = { spark: SparkSession =>
+        try {
+          spark.range(1000).write.bucketBy(30, "id").saveAsTable("t")
+          // `df1` is hash partitioned by `id`.
+          val df1 = spark.read.table("t")
+          val df2 =
+            spark
+              .range(0, 1000, 1, numInputPartitions)
+              .selectExpr("id % 500 as key2", "id as value2")
+
+          val join = df1.join(df2, col("id") === 
col("key2")).select(col("id"), col("value2"))
+
+          // Check the answer first.
+          val expectedAnswer = spark.range(0, 500).selectExpr("id % 500", "id 
as value")
+            .union(spark.range(500, 1000).selectExpr("id % 500", "id as 
value"))
+          checkAnswer(
+            join,
+            expectedAnswer.collect())
+
+          // Then, let's make sure we do not reduce number of ppst shuffle 
partitions.
+          val finalPlan = join.queryExecution.executedPlan
+            .asInstanceOf[AdaptiveSparkPlanExec].finalPlan
+          val shuffleReaders = finalPlan.collect {
+            case reader: CoalescedShuffleReaderExec => reader
+          }
+          assert(shuffleReaders.length === 0)
+        } finally {
+          spark.sql("drop table t")
+        }
+      }
+      withSparkSession(test, 12000, minNumPostShufflePartitions)
     }
   }
 
   test("SPARK-24705 adaptive query execution works correctly when exchange 
reuse enabled") {
     val test = { spark: SparkSession =>
       spark.sql("SET spark.sql.exchange.reuse=true")
       val df = spark.range(1).selectExpr("id AS key", "id AS value")
+
+      // test case 1: a fragment has 3 child fragments but they are the same 
fragment.
 
 Review comment:
   nit: `fragment` -> `query stage`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to