ulysses-you commented on a change in pull request #32362:
URL: https://github.com/apache/spark/pull/32362#discussion_r620976903



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
##########
@@ -54,16 +54,31 @@ case class CoalesceShufflePartitions(session: SparkSession) 
extends CustomShuffl
     if (!shuffleStages.forall(s => supportCoalesce(s.shuffle))) {
       plan
     } else {
+      def insertCustomShuffleReader(partitionSpecs: 
Seq[ShufflePartitionSpec]): SparkPlan = {
+        // This transformation adds new nodes, so we must use `transformUp` 
here.
+        val stageIds = shuffleStages.map(_.id).toSet
+        plan.transformUp {
+          // even for shuffle exchange whose input RDD has 0 partition, we 
should still update its
+          // `partitionStartIndices`, so that all the leaf shuffles in a stage 
have the same
+          // number of output partitions.
+          case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) =>
+            CustomShuffleReaderExec(stage, partitionSpecs)
+        }
+      }
+
       // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 
0 partitions,
       // we should skip it when calculating the `partitionStartIndices`.
+      // If all input RDDs have 0 partition, we create empty partition for 
every shuffle reader.

Review comment:
       Logically it is. But for Spark server like `SparkThriftServer`, we 
always use large shuffle partitions (e.g. 8192) and depend on aqe to coalesce 
it. If some users query on a empty table it will waste too many tasks. And an 
another issue is that driver can be busy (unnecessary task event) with the 
single point problem.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to