erenavsarogullari commented on code in PR #45234:
URL: https://github.com/apache/spark/pull/45234#discussion_r1573096494
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -897,6 +900,85 @@ class AdaptiveQueryExecSuite
}
}
+ test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the
cancellation") {
+ try {
+ spark.experimental.extraStrategies = TestProblematicCoalesceStrategy ::
Nil
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ val joinedDF = createJoinedDF()
+
+ val error = intercept[SparkException] {
+ joinedDF.collect()
+ }
+ assert(error.getMessage() contains "ProblematicCoalesce execution is
failed")
+
+ val adaptivePlan =
joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
+
+ // All QueryStages should be based on ShuffleQueryStageExec
+ val shuffleQueryStageExecs = collect(adaptivePlan) {
+ case sqse: ShuffleQueryStageExec => sqse
+ }
+ assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should
include " +
+ s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan")
+ shuffleQueryStageExecs.foreach(sqse =>
assert(sqse.name.contains("ShuffleQueryStageExec-")))
+ // First ShuffleQueryStage is materialized so it needs to be canceled.
+ assert(shuffleQueryStageExecs(0).isMaterializationStarted(),
+ "Materialization should be started.")
+ // Second ShuffleQueryStage materialization is failed so
+ // it is excluded from the cancellation due to earlyFailedStage.
+ assert(shuffleQueryStageExecs(1).isMaterializationStarted(),
+ "Materialization should be started but it is failed.")
+ // Last ShuffleQueryStage is not materialized yet so it does not
require
+ // to be canceled and it is just skipped from the cancellation.
+ assert(!shuffleQueryStageExecs(2).isMaterializationStarted(),
+ "Materialization should not be started.")
+ }
+ } finally {
+ spark.experimental.extraStrategies = Nil
+ }
+ }
+
+ test("SPARK-47148: Check if BroadcastQueryStage materialization is started")
{
+ try {
+ spark.experimental.extraStrategies = TestProblematicCoalesceStrategy ::
Nil
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
+ val joinedDF = createJoinedDF()
+
+ val error = intercept[SparkException] {
+ joinedDF.collect()
+ }
+ assert(error.getMessage() contains "ProblematicCoalesce execution is
failed")
+
+ val adaptivePlan =
joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
+
+ // All QueryStages should be based on BroadcastQueryStageExec
+ val broadcastQueryStageExecs = collect(adaptivePlan) {
+ case bqse: BroadcastQueryStageExec => bqse
+ }
+ assert(broadcastQueryStageExecs.length == 2, adaptivePlan)
+ broadcastQueryStageExecs.foreach { bqse =>
+ assert(bqse.name.contains("BroadcastQueryStageExec-"))
+ // Both BroadcastQueryStages are materialized at the beginning.
Review Comment:
I have added `BROADCAST` hint, however, both BroadcastQueryStages
materializations still seem to be kicked off. Please see last commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]