leixm opened a new pull request, #53523:
URL: https://github.com/apache/spark/pull/53523
### What changes were proposed in this pull request?
For BoradcastQueryStageExec, if it's logical plan was convert to an empty
relation by AQEPropagateEmptyRelation
Will throw error.
```
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at
scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at
scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$doExecuteBroadcast$1(AdaptiveSparkPlanExec.scala:369)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:376)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecuteBroadcast(AdaptiveSparkPlanExec.scala:364)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:197)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
at
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:193)
at
org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$2(SubqueryBroadcastExec.scala:81)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:146)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:170)
at
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:144)
at
org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$1(SubqueryBroadcastExec.scala:78)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
... 5 more
```
```
=== Applying Rule
org.apache.spark.sql.execution.adaptive.AQEPropagateEmptyRelation ===
Project [bm_id#1469, uid#1491L, ... 4 more fields]
Project [bm_id#1469, uid#1491L, ... 4 more fields]
!+- Join Inner, (uid#1491L = uid#1495L)
+- LocalRelation <empty>, [bm_id#1469, uid#1491L, ... 5 more fields]
! :- LogicalQueryStage Project [xx#1469, uid#1491L, ... 4 more fields],
BroadcastQueryStage 1
! +- Project [uid#1495L]
! +- Filter xxxx
! +- Relation db.table[uid#1495L,request_id#1496,... 40 more fields]
parquet
```
Also https://github.com/apache/spark/pull/32741 fix some of such case, but
in our prod we meet
```
24/07/01 18:57:31 WARN [dynamicpruning-0] AdaptiveSparkPlanExec: Plan
changed from
BroadcastExchange HashedRelationBroadcastMode(List(input[5, string, true],
input[3, date, true]),false), [id=#2258]
+- Project [xx#1469, xx#1491L, ... 4 more fields]
+- BroadcastHashJoin [xx#1491L], [xx#1495L], Inner, BuildLeft, false,
false
:- BroadcastQueryStage 1
: +- ReusedExchange [xx#1469, xx#1491L, ... 4 more fields],
BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint,
true]),false), [id=#2059]
+- Project [uid#1495L]
+- Filter xxx
+- FileScan parquet db.table
to
BroadcastExchange HashedRelationBroadcastMode(List(input[5, string, true],
input[3, date, true]),false), [id=#2261]
+- LocalTableScan <empty>, [xx#1469, xx#1491L, ... 4 more fields]
24/07/01 18:57:31 WARN [dynamicpruning-0] AdaptiveSparkPlanExec: Plan
changed from
BroadcastQueryStage 2
+- BroadcastExchange HashedRelationBroadcastMode(List(input[5, string,
true], input[3, date, true]),false), [id=#2261]
+- LocalTableScan <empty>, [xx#1469, xx#1491L, ... 4 more fields]
to LocalTableScan <empty>, [xx#1469, xx#1491L, ... 4 more fields]
```
The next loop the currentPhysicalPlan is BroadcastQueryStageExec
After this pr
```
24/07/02 09:58:13 WARN [dynamicpruning-0] AdaptiveSparkPlanExec: Plan
changed from BroadcastQueryStage 2
+- BroadcastExchange HashedRelationBroadcastMode(List(input[5, string,
true], input[3, date, true]),false), [id=#2281]
+- *(2) Project [xx#1469, xx#1491L, ... 4 more fields]
+- *(2) BroadcastHashJoin [uid#1491L], [uid#1495L], Inner, BuildLeft,
false, false
:- BroadcastQueryStage 1
: +- ReusedExchange [bm_id#1469, uid#1491L, ... 4 more fields],
BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint,
true]),false), [id=#2059]
+- *(2) Project [uid#1495L]
+- *(2) Filter ((isnotnull(xx#1523) AND (xx#1523 = 2)) AND
isnotnull(xx#1495L))
+- *(2) ColumnarToRow
+- FileScan parquet db.table[xx#1495L,xx#1523,... 3 more
fields] Batched: true, DataFilters: [isnotnull(xx#1523), (xx#1523 = 2),
isnotnull(xx#1495L)], ... 5 more fields
to BroadcastExchange HashedRelationBroadcastMode(List(input[5, string,
true], input[3, date, true]),false), [id=#2298]
+- LocalTableScan <empty>, [xx#1469, xx#1491L, ... 4 more fields]
```
### Why are the changes needed?
Fix AQE.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
### Was this patch authored or co-authored using generative AI tooling?
No.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]