cloud-fan commented on a change in pull request #32362:
URL: https://github.com/apache/spark/pull/32362#discussion_r620964889



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
##########
@@ -54,16 +54,31 @@ case class CoalesceShufflePartitions(session: SparkSession) 
extends CustomShuffl
     if (!shuffleStages.forall(s => supportCoalesce(s.shuffle))) {
       plan
     } else {
+      def insertCustomShuffleReader(partitionSpecs: 
Seq[ShufflePartitionSpec]): SparkPlan = {
+        // This transformation adds new nodes, so we must use `transformUp` 
here.
+        val stageIds = shuffleStages.map(_.id).toSet
+        plan.transformUp {
+          // even for shuffle exchange whose input RDD has 0 partition, we 
should still update its
+          // `partitionStartIndices`, so that all the leaf shuffles in a stage 
have the same
+          // number of output partitions.
+          case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) =>
+            CustomShuffleReaderExec(stage, partitionSpecs)
+        }
+      }
+
       // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 
0 partitions,
       // we should skip it when calculating the `partitionStartIndices`.
+      // If all input RDDs have 0 partition, we create empty partition for 
every shuffle reader.

Review comment:
       If all input RDDs have 0 partition, the query is very fast and we don't 
need to optimize?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to