cloud-fan commented on a change in pull request #32594:
URL: https://github.com/apache/spark/pull/32594#discussion_r635490613
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
##########
@@ -35,72 +35,73 @@ case class CoalesceShufflePartitions(session: SparkSession)
extends CustomShuffl
if (!conf.coalesceShufflePartitionsEnabled) {
return plan
}
- if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])
- || plan.find(_.isInstanceOf[CustomShuffleReaderExec]).isDefined) {
+ if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])) {
// If not all leaf nodes are query stages, it's not safe to reduce the
number of
// shuffle partitions, because we may break the assumption that all
children of a spark plan
// have same number of output partitions.
return plan
}
- def collectShuffleStages(plan: SparkPlan): Seq[ShuffleQueryStageExec] =
plan match {
- case stage: ShuffleQueryStageExec => Seq(stage)
- case _ => plan.children.flatMap(collectShuffleStages)
+ def collectShuffleStageInfos(plan: SparkPlan): Seq[ShuffleStageInfo] =
plan match {
+ case ShuffleStageInfo(stage, specs) => Seq(new ShuffleStageInfo(stage,
specs))
+ case _ => plan.children.flatMap(collectShuffleStageInfos)
}
- val shuffleStages = collectShuffleStages(plan)
+ val shuffleStageInfos = collectShuffleStageInfos(plan)
// ShuffleExchanges introduced by repartition do not support changing the
number of partitions.
// We change the number of partitions in the stage only if all the
ShuffleExchanges support it.
- if (!shuffleStages.forall(s => supportCoalesce(s.shuffle))) {
+ if (!shuffleStageInfos.forall(s =>
supportCoalesce(s.shuffleStage.shuffle))) {
plan
} else {
- def insertCustomShuffleReader(partitionSpecs:
Seq[ShufflePartitionSpec]): SparkPlan = {
- // This transformation adds new nodes, so we must use `transformUp`
here.
- val stageIds = shuffleStages.map(_.id).toSet
- plan.transformUp {
- // even for shuffle exchange whose input RDD has 0 partition, we
should still update its
- // `partitionStartIndices`, so that all the leaf shuffles in a stage
have the same
- // number of output partitions.
- case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) =>
- CustomShuffleReaderExec(stage, partitionSpecs)
- }
- }
-
- // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has
0 partitions,
- // we should skip it when calculating the `partitionStartIndices`.
- // If all input RDDs have 0 partition, we create empty partition for
every shuffle reader.
- val validMetrics = shuffleStages.flatMap(_.mapStats)
+ // We fall back to Spark default parallelism if the minimum number of
coalesced partitions
+ // is not set, so to avoid perf regressions compared to no coalescing.
+ val minPartitionNum =
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
+ .getOrElse(session.sparkContext.defaultParallelism)
+ val newPartitionSpecs = ShufflePartitionsUtil.coalescePartitions(
+ shuffleStageInfos.map(_.shuffleStage.mapStats),
+ shuffleStageInfos.map(_.partitionSpecs),
+ advisoryTargetSize =
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
+ minNumPartitions = minPartitionNum)
- // We may have different pre-shuffle partition numbers, don't reduce
shuffle partition number
- // in that case. For example when we union fully aggregated data (data
is arranged to a single
- // partition) and a result of a SortMergeJoin (multiple partitions).
- val distinctNumPreShufflePartitions =
- validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
- if (validMetrics.isEmpty) {
- insertCustomShuffleReader(ShufflePartitionsUtil.createEmptyPartition()
:: Nil)
- } else if (distinctNumPreShufflePartitions.length == 1) {
- // We fall back to Spark default parallelism if the minimum number of
coalesced partitions
- // is not set, so to avoid perf regressions compared to no coalescing.
- val minPartitionNum =
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
- .getOrElse(session.sparkContext.defaultParallelism)
Review comment:
The logic above has been moved to
`ShufflePartitionsUtil.coalescePartitions`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]