cloud-fan commented on a change in pull request #32883:
URL: https://github.com/apache/spark/pull/32883#discussion_r661263063
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
##########
@@ -308,4 +309,64 @@ object ShufflePartitionsUtil extends Logging {
tryMergePartitions()
partitionStartIndices.toArray
}
+
+ /**
+ * Get the map size of the specific reduce shuffle Id.
+ */
+ private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int):
Array[Long] = {
+ val mapOutputTracker =
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+ }
+
+ /**
+ * Splits the skewed partition based on the map size and the target
partition size
+ * after split, and create a list of `PartialMapperPartitionSpec`. Returns
None if can't split.
+ */
+ def createSkewPartitionSpecs(
+ shuffleId: Int,
+ reducerId: Int,
+ targetSize: Long): Option[Seq[PartialReducerPartitionSpec]] = {
+ val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId)
+ val mapStartIndices = splitSizeListByTargetSize(mapPartitionSizes,
targetSize)
+ if (mapStartIndices.length > 1) {
+ Some(mapStartIndices.indices.map { i =>
+ val startMapIndex = mapStartIndices(i)
+ val endMapIndex = if (i == mapStartIndices.length - 1) {
+ mapPartitionSizes.length
+ } else {
+ mapStartIndices(i + 1)
+ }
+ val dataSize =
startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum
+ PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex,
dataSize)
+ })
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Splits the skewed partition based on the map size and the target
partition size
+ * after split. Create a list of `PartialMapperPartitionSpec` for skewed
partition and
+ * create `CoalescedPartition` for normal partition.
+ */
+ def optimizeSkewedPartitions(
Review comment:
If this is only called in the new rule, shall we move it to the new rule?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]