sunchao commented on code in PR #38196:
URL: https://github.com/apache/spark/pull/38196#discussion_r992455054


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala:
##########
@@ -277,7 +277,7 @@ class PlannerSuite extends SharedSparkSession with 
AdaptiveSparkPlanHelper {
           val numExchanges = collect(plan) {
             case exchange: ShuffleExchangeExec => exchange
           }.length
-          assert(numExchanges === 3)
+          assert(numExchanges === 5)

Review Comment:
   Restored the test to the previous state as in Spark 3.2. Query plan:
   
   After this PR:
   ```
   AdaptiveSparkPlan isFinalPlan=false
   +- SortMergeJoin [key#220], [key#222], Inner
      :- SortMergeJoin [key#13], [key#220], Inner
      :  :- Sort [key#13 ASC NULLS FIRST], false, 0
      :  :  +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, 
[plan_id=133]
      :  :     +- SerializeFromObject [knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, 
true) AS value#14]
      :  :        +- Scan[obj#12]
      :  +- Sort [key#220 ASC NULLS FIRST], false, 0
      :     +- Exchange hashpartitioning(key#220, 5), ENSURE_REQUIREMENTS, 
[plan_id=134]
      :        +- GlobalLimit 10, 0
      :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
[plan_id=129]
      :              +- LocalLimit 10
      :                 +- SerializeFromObject 
[knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#220, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, 
true) AS value#221]
      :                    +- Scan[obj#218]
      +- Sort [key#222 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(key#222, 5), ENSURE_REQUIREMENTS, 
[plan_id=142]
            +- GlobalLimit 3, 0
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=138]
                  +- LocalLimit 3
                     +- SerializeFromObject 
[knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#222, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, 
true) AS value#223]
                        +- Scan[obj#219]
   ```
   
   Before this PR:
   ```
   AdaptiveSparkPlan isFinalPlan=false
   +- SortMergeJoin [key#220], [key#222], Inner
      :- SortMergeJoin [key#13], [key#220], Inner
      :  :- Sort [key#13 ASC NULLS FIRST], false, 0
      :  :  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=134]
      :  :     +- SerializeFromObject [knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, 
true) AS value#14]
      :  :        +- Scan[obj#12]
      :  +- Sort [key#220 ASC NULLS FIRST], false, 0
      :     +- GlobalLimit 10, 0
      :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=129]
      :           +- LocalLimit 10
      :              +- SerializeFromObject 
[knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#220, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, 
true) AS value#221]
      :                 +- Scan[obj#218]
      +- Sort [key#222 ASC NULLS FIRST], false, 0
         +- GlobalLimit 3, 0
            +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=138]
               +- LocalLimit 3
                  +- SerializeFromObject [knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#222, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, 
true) AS value#223]
                     +- Scan[obj#219]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to