ekoifman commented on a change in pull request #31653:
URL: https://github.com/apache/spark/pull/31653#discussion_r604313167



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
##########
@@ -35,11 +37,17 @@ case class CoalesceShufflePartitions(session: SparkSession) 
extends CustomShuffl
     if (!conf.coalesceShufflePartitionsEnabled) {
       return plan
     }
-    if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])
-        || plan.find(_.isInstanceOf[CustomShuffleReaderExec]).isDefined) {
+    /* This is running before new QueryStageExec creation so either all leaves 
are
+     QueryStageExec nodes or all leaves are CustomShuffleReaderExec if 
OptimizeSkewJoin
+     mitigated something in the new stage. */

Review comment:
       I don't understand this comment.  `CoalesceShufflePartitons` runs the 
same place as before.  The difference is that before it bailed out if 
`SparkPlan` it was processing already had `CustomShuffleReaderExec` nodes in 
it.  It cannot do so any more since `OptimizeSkewedJoin` now runs before 
coalesce and may insert `CustomShuffleReaderExec`.   
   So I had to add `CoalesceShufflePartitions.newPartitionSpec` to make 
coalesce run after skew mitigation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to