cloud-fan commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size URL: https://github.com/apache/spark/pull/26434#discussion_r358057065
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala ########## @@ -91,14 +93,21 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { val distinctNumPreShufflePartitions = validMetrics.map(stats => stats.bytesByPartitionId.length).distinct if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) { - val partitionStartIndices = estimatePartitionStartIndices(validMetrics.toArray) - // This transformation adds new nodes, so we must use `transformUp` here. - plan.transformUp { - // even for shuffle exchange whose input RDD has 0 partition, we should still update its - // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same - // number of output partitions. - case stage: QueryStageExec if ShuffleQueryStageExec.isShuffleQueryStageExec(stage) => - CoalescedShuffleReaderExec(stage, partitionStartIndices) + val visitedStage = mutable.HashSet[QueryStageExec]() + plan.transformDown { + case stage: QueryStageExec if (ShuffleQueryStageExec.isShuffleQueryStageExec(stage) + && !visitedStage.contains(stage)) => + val excludedPartitions = + ShuffleQueryStageExec.getShuffleStage(stage).excludedPartitions + val partitionIndices = estimatePartitionStartAndEndIndices( + validMetrics.toArray, excludedPartitions) + visitedStage += stage + CoalescedShuffleReaderExec(stage, partitionIndices) + case partialReader: PartialShuffleReader => Review comment: we don't need to do it if we remove `PartialShuffleReader` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org