viirya commented on a change in pull request #32594:
URL: https://github.com/apache/spark/pull/32594#discussion_r637693175
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
##########
@@ -21,16 +21,160 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.MapOutputStatistics
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.{CoalescedPartitionSpec,
ShufflePartitionSpec}
+import org.apache.spark.sql.execution.{CoalescedPartitionSpec,
PartialReducerPartitionSpec, ShufflePartitionSpec}
object ShufflePartitionsUtil extends Logging {
final val SMALL_PARTITION_FACTOR = 0.2
final val MERGED_PARTITION_FACTOR = 1.2
/**
- * Coalesce the partitions from multiple shuffles. This method assumes that
all the shuffles
- * have the same number of partitions, and the partitions of same index will
be read together
- * by one task.
+ * Coalesce the partitions from multiple shuffles, either in their original
states, or applied
+ * with skew handling partition specs. If called on partitions containing
skew partition specs,
+ * this method will keep the skew partition specs intact and only coalesce
the partitions outside
+ * the skew sections.
+ *
+ * This method will return an empty result if the shuffles have been
coalesced already, or if
+ * they do not have the same number of partitions, or if the coalesced
result is the same as the
+ * input partition layout.
+ *
+ * @return A sequence of sequence of [[ShufflePartitionSpec]]s, which each
inner sequence as the
+ * new partition specs for its corresponding shuffle after
coalescing. If Nil is returned,
+ * then no coalescing is applied.
+ */
+ def coalescePartitions(
+ mapOutputStatistics: Seq[Option[MapOutputStatistics]],
+ inputPartitionSpecs: Seq[Option[Seq[ShufflePartitionSpec]]],
+ advisoryTargetSize: Long,
+ minNumPartitions: Int): Seq[Seq[ShufflePartitionSpec]] = {
+ assert(mapOutputStatistics.length == inputPartitionSpecs.length)
+
+ if (mapOutputStatistics.isEmpty) {
+ return Seq.empty
+ }
+
+ // If `minNumPartitions` is very large, it is possible that we need to use
a value less than
+ // `advisoryTargetSize` as the target size of a coalesced task.
+ val totalPostShuffleInputSize =
mapOutputStatistics.flatMap(_.map(_.bytesByPartitionId.sum)).sum
+ // The max at here is to make sure that when we have an empty table, we
only have a single
+ // coalesced partition.
+ // There is no particular reason that we pick 16. We just need a number to
prevent
+ // `maxTargetSize` from being set to 0.
+ val maxTargetSize = math.max(
+ math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong,
16)
+ val targetSize = math.min(maxTargetSize, advisoryTargetSize)
+
+ val shuffleIds =
mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ")
+ logInfo(s"For shuffle($shuffleIds), advisory target size:
$advisoryTargetSize, " +
+ s"actual target size $targetSize.")
+
+ val numShuffles = mapOutputStatistics.length
+ // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0
partitions,
+ // we should skip it when calculating the `partitionStartIndices`.
+ val validMetrics = mapOutputStatistics.flatten
+
+ if (inputPartitionSpecs.forall(_.isEmpty)) {
+ // If all input RDDs have 0 partition, we create an empty partition for
every shuffle reader.
+ if (validMetrics.isEmpty) {
+ return Seq.fill(numShuffles)(Seq(CoalescedPartitionSpec(0, 0)))
+ }
+
+ // We may have different pre-shuffle partition numbers, don't reduce
shuffle partition number
+ // in that case. For example when we union fully aggregated data (data
is arranged to a single
+ // partition) and a result of a SortMergeJoin (multiple partitions).
+ if (validMetrics.map(_.bytesByPartitionId.length).distinct.length > 1) {
+ return Seq.empty
+ }
+
+ val numPartitions = validMetrics.head.bytesByPartitionId.length
+ val newPartitionSpecs = coalescePartitions(
+ 0, numPartitions, validMetrics, targetSize)
+ if (newPartitionSpecs.length < numPartitions) {
+ return Seq.fill(numShuffles)(newPartitionSpecs)
+ } else {
+ return Seq.empty
+ }
+ }
+
+ // Do not coalesce if any of the map output stats are missing or if not
all shuffles have
+ // partition specs, which should not happen in practice.
+ if (!mapOutputStatistics.forall(_.isDefined) ||
!inputPartitionSpecs.forall(_.isDefined)) {
+ logWarning("Could not apply partition coalescing because of missing
MapOutputStatistics " +
+ "or shuffle partition specs.")
+ return Seq.empty
+ }
+
+ // Extract the start indices of each partition spec. Give invalid index -1
to unexpected
+ // partition specs. When we reach here, it means skew join optimization
has been applied.
+ val partitionIndicesSeq = inputPartitionSpecs.map(_.get.map {
+ case CoalescedPartitionSpec(start, end) if start + 1 == end => start
+ case PartialReducerPartitionSpec(reducerId, _, _, _) => reducerId
+ case _ => -1 // invalid
+ })
+
+ // There should be no unexpected partition specs and the start indices
should be identical
+ // across all different shuffles.
+ assert(partitionIndicesSeq.distinct.length == 1 &&
partitionIndicesSeq.head.forall(_ >= 0),
+ s"Invalid shuffle partition specs: $inputPartitionSpecs")
+
+ // The indices may look like [0, 1, 2, 2, 2, 3, 4, 4, 5], and the repeated
`2` and `4` mean
+ // skewed partitions.
+ val partitionIndices = partitionIndicesSeq.head
+ val newPartitionSpecsSeq =
Seq.fill(numShuffles)(ArrayBuffer.empty[ShufflePartitionSpec])
+ val numPartitions = partitionIndices.length
+ var i = 0
+ var start = 0
+ while (i < numPartitions) {
+ if (i > 0 && partitionIndices(i - 1) == partitionIndices(i)) {
+ // a skew section detected, starting from partition(i - 1).
+ val repeatValue = partitionIndices(i)
+ // coalesce any partitions before partition(i - 1) and after the end
of latest skew section.
Review comment:
I think the following only does "coalesce any partitions before
partition(i - 1)" but not "coalesce any partitions before partition(i - 1)"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]