ulysses-you commented on a change in pull request #32883:
URL: https://github.com/apache/spark/pull/32883#discussion_r658653908



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
##########
@@ -296,4 +296,123 @@ object ShufflePartitionsUtil extends Logging {
     tryMergePartitions()
     partitionStartIndices.toArray
   }
+
+  /**
+   * Splits the skewed partition based on the map size and the target 
partition size
+   * after split, and create a list of `PartialMapperPartitionSpec`. Returns 
None if can't split.
+   */
+  def createSkewPartitionSpecs(
+      shuffleId: Int,
+      reducerId: Int,
+      targetSize: Long): Option[Seq[PartialReducerPartitionSpec]] = {
+    val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId)
+    val mapStartIndices = splitSizeListByTargetSize(mapPartitionSizes, 
targetSize)
+    if (mapStartIndices.length > 1) {
+      Some(mapStartIndices.indices.map { i =>
+        val startMapIndex = mapStartIndices(i)
+        val endMapIndex = if (i == mapStartIndices.length - 1) {
+          mapPartitionSizes.length
+        } else {
+          mapStartIndices(i + 1)
+        }
+        val dataSize = 
startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum
+        PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, 
dataSize)
+      })
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Get the map size of the specific reduce shuffle Id.
+   */
+  def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): Array[Long] = {
+    val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+    
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  private def expandPartitionsWithoutCoalesced(
+      shuffleId: Int,
+      bytesByPartitionId: Array[Long],
+      targetSize: Long): Seq[ShufflePartitionSpec] = {
+    bytesByPartitionId.indices.flatMap { reduceIndex =>
+      if (bytesByPartitionId(reduceIndex) > targetSize) {
+        val newPartitionSpec = createSkewPartitionSpecs(shuffleId, 
reduceIndex, targetSize)
+        if (newPartitionSpec.isEmpty) {
+          CoalescedPartitionSpec(reduceIndex, reduceIndex + 1) :: Nil
+        } else {
+          logDebug(s"For shuffle $shuffleId, partition $reduceIndex is skew, " 
+
+            s"split it into ${newPartitionSpec.get.size} parts.")
+          newPartitionSpec.get
+        }
+      } else {
+        CoalescedPartitionSpec(reduceIndex, reduceIndex + 1) :: Nil
+      }
+    }
+  }
+
+  private def expandPartitionsWithCoalesced(
+      shuffleId: Int,
+      bytesByPartitionId: Array[Long],
+      inputPartitionSpecs: Seq[ShufflePartitionSpec],
+      targetSize: Long): Seq[ShufflePartitionSpec] = {
+    inputPartitionSpecs.flatMap {
+      // We only need to handle no-coalesced partition(start + 1 == end)
+      // which may bigger than targetSize
+      case CoalescedPartitionSpec(start, end) if start + 1 == end &&
+        bytesByPartitionId(start) > targetSize =>
+        val newPartitionSpec = createSkewPartitionSpecs(shuffleId, start, 
targetSize)
+        if (newPartitionSpec.isEmpty) {
+          CoalescedPartitionSpec(start, end) :: Nil
+        } else {
+          logDebug(s"For shuffle: $shuffleId, partition $start is skew, " +
+            s"split it into ${newPartitionSpec.get.size} parts.")
+          newPartitionSpec.get
+        }
+
+      case other => other :: Nil
+    }
+  }
+
+  /**
+   * Expand the partitions with two options:
+   * - has no `ShufflePartitionSpec` before expand
+   *   We use some PartialReducerPartitionSpecs for every large partition, and 
use
+   *   CoalescedPartition for normal partition.
+   * - has `ShufflePartitionSpec` before expand that should be 
CoalescedPartition
+   *   We use some PartialReducerPartitionSpecs to replace CoalescedPartition 
for every
+   *   large partition, and use origin ShufflePartitionSpec for normal 
partition.

Review comment:
       I think so, will simplify it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to