maropu commented on a change in pull request #31994:
URL: https://github.com/apache/spark/pull/31994#discussion_r603743803



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
##########
@@ -72,14 +72,20 @@ case class CoalesceShufflePartitions(session: SparkSession) 
extends CustomShuffl
           validMetrics.toArray,
           advisoryTargetSize = 
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
           minNumPartitions = minPartitionNum)
-        // This transformation adds new nodes, so we must use `transformUp` 
here.
-        val stageIds = shuffleStages.map(_.id).toSet
-        plan.transformUp {
-          // even for shuffle exchange whose input RDD has 0 partition, we 
should still update its
-          // `partitionStartIndices`, so that all the leaf shuffles in a stage 
have the same
-          // number of output partitions.
-          case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) =>
-            CustomShuffleReaderExec(stage, partitionSpecs)
+        // We can never extend the shuffle partition number, so if we get the 
same number here,
+        // that means we can not coalesce shuffle partition. Just return the 
origin plan.
+        if (partitionSpecs.length == 
validMetrics.head.bytesByPartitionId.length) {

Review comment:
       The fix itself seems fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to