ekoifman commented on a change in pull request #31653:
URL: https://github.com/apache/spark/pull/31653#discussion_r604313167
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
##########
@@ -35,11 +37,17 @@ case class CoalesceShufflePartitions(session: SparkSession)
extends CustomShuffl
if (!conf.coalesceShufflePartitionsEnabled) {
return plan
}
- if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])
- || plan.find(_.isInstanceOf[CustomShuffleReaderExec]).isDefined) {
+ /* This is running before new QueryStageExec creation so either all leaves
are
+ QueryStageExec nodes or all leaves are CustomShuffleReaderExec if
OptimizeSkewJoin
+ mitigated something in the new stage. */
Review comment:
I don't understand this comment. `CoalesceShufflePartitons` runs the
same place as before. The difference is that before it bailed out if
`SparkPlan` it was processing already had `CustomShuffleReaderExec` nodes in
it. It cannot do so any more since `OptimizeSkewedJoin` now runs before
coalesce and may insert `CustomShuffleReaderExec`.
So I had to add `CoalesceShufflePartitions.newPartitionSpec` to make
coalesce run after skew mitigation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]