leixm opened a new pull request, #53523:
URL: https://github.com/apache/spark/pull/53523

   ### What changes were proposed in this pull request?
   For BoradcastQueryStageExec, if it's logical plan was convert to an empty 
relation by AQEPropagateEmptyRelation
   Will throw error.
   ```
       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.util.concurrent.ExecutionException: Boxed Error
       at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
       at 
scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
       at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
       at scala.concurrent.Promise.complete(Promise.scala:53)
       at scala.concurrent.Promise.complete$(Promise.scala:52)
       at 
scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
       at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
       at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.AssertionError: assertion failed
       at scala.Predef$.assert(Predef.scala:208)
       at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$doExecuteBroadcast$1(AdaptiveSparkPlanExec.scala:369)
       at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:376)
       at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecuteBroadcast(AdaptiveSparkPlanExec.scala:364)
       at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:197)
       at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
       at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
       at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
       at 
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:193)
       at 
org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$2(SubqueryBroadcastExec.scala:81)
       at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:146)
       at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:170)
       at 
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:144)
       at 
org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$1(SubqueryBroadcastExec.scala:78)
       at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
       at scala.util.Success.$anonfun$map$1(Try.scala:255)
       at scala.util.Success.map(Try.scala:213)
       at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
       at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
       ... 5 more
   ```
   
   ```
   === Applying Rule 
org.apache.spark.sql.execution.adaptive.AQEPropagateEmptyRelation ===
    Project [bm_id#1469, uid#1491L, ... 4 more fields]                          
                                                                                
                                                                               
Project [bm_id#1469, uid#1491L, ... 4 more fields]
   !+- Join Inner, (uid#1491L = uid#1495L)                                      
                                                                                
                                                                               
+- LocalRelation <empty>, [bm_id#1469, uid#1491L, ... 5 more fields]
   !   :- LogicalQueryStage Project [xx#1469, uid#1491L, ... 4 more fields], 
BroadcastQueryStage 1
   !   +- Project [uid#1495L]
   !      +- Filter xxxx
   !         +- Relation db.table[uid#1495L,request_id#1496,... 40 more fields] 
parquet 
   ```
   
   Also https://github.com/apache/spark/pull/32741 fix some of such case, but 
in our prod we meet
   
   ```
   24/07/01 18:57:31 WARN [dynamicpruning-0] AdaptiveSparkPlanExec: Plan 
changed from
   BroadcastExchange HashedRelationBroadcastMode(List(input[5, string, true], 
input[3, date, true]),false), [id=#2258]
   +- Project [xx#1469, xx#1491L, ... 4 more fields]
      +- BroadcastHashJoin [xx#1491L], [xx#1495L], Inner, BuildLeft, false, 
false
         :- BroadcastQueryStage 1
         :  +- ReusedExchange [xx#1469, xx#1491L, ... 4 more fields], 
BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, 
true]),false), [id=#2059]
         +- Project [uid#1495L]
            +- Filter xxx
               +- FileScan parquet db.table
    to 
   BroadcastExchange HashedRelationBroadcastMode(List(input[5, string, true], 
input[3, date, true]),false), [id=#2261]
   +- LocalTableScan <empty>, [xx#1469, xx#1491L, ... 4 more fields]
   
   
   
   
   24/07/01 18:57:31 WARN [dynamicpruning-0] AdaptiveSparkPlanExec: Plan 
changed from 
   BroadcastQueryStage 2
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[5, string, 
true], input[3, date, true]),false), [id=#2261]
      +- LocalTableScan <empty>, [xx#1469, xx#1491L, ... 4 more fields]
    to LocalTableScan <empty>, [xx#1469, xx#1491L, ... 4 more fields]
   ```
   
   The next loop the currentPhysicalPlan is BroadcastQueryStageExec
   
   
   After this pr
   ```
   24/07/02 09:58:13 WARN [dynamicpruning-0] AdaptiveSparkPlanExec: Plan 
changed from BroadcastQueryStage 2
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[5, string, 
true], input[3, date, true]),false), [id=#2281]
      +- *(2) Project [xx#1469, xx#1491L, ... 4 more fields]
         +- *(2) BroadcastHashJoin [uid#1491L], [uid#1495L], Inner, BuildLeft, 
false, false
            :- BroadcastQueryStage 1
            :  +- ReusedExchange [bm_id#1469, uid#1491L, ... 4 more fields], 
BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, 
true]),false), [id=#2059]
            +- *(2) Project [uid#1495L]
               +- *(2) Filter ((isnotnull(xx#1523) AND (xx#1523 = 2)) AND 
isnotnull(xx#1495L))
                  +- *(2) ColumnarToRow
                     +- FileScan parquet db.table[xx#1495L,xx#1523,... 3 more 
fields] Batched: true, DataFilters: [isnotnull(xx#1523), (xx#1523 = 2), 
isnotnull(xx#1495L)], ... 5 more fields
    to BroadcastExchange HashedRelationBroadcastMode(List(input[5, string, 
true], input[3, date, true]),false), [id=#2298]
   +- LocalTableScan <empty>, [xx#1469, xx#1491L, ... 4 more fields]
   ```
   
   
   ### Why are the changes needed?
   Fix AQE.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   UT.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to