Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19763#discussion_r152084974
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,16 +475,48 @@ private[spark] class MapOutputTrackerMaster(
shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
}
+ /**
+ * To equally divide n elements into m buckets, basically each bucket
should have n/m elements,
+ * for the remaining n%m elements, add one more element to the first n%m
buckets each.
+ */
+ def equallyDivide(numElements: Int, numBuckets: Int): Iterator[Seq[Int]]
= {
+ val elementsPerBucket = numElements / numBuckets
+ val remaining = numElements % numBuckets
+ if (remaining == 0) {
+ 0.until(numElements).grouped(elementsPerBucket)
+ } else {
+ val splitPoint = (elementsPerBucket + 1) * remaining
+ 0.to(splitPoint).grouped(elementsPerBucket + 1) ++
+ (splitPoint + 1).until(numElements).grouped(elementsPerBucket)
+ }
+ }
+
/**
* Return statistics about all of the outputs for a given shuffle.
*/
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics
= {
shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
- for (s <- statuses) {
- for (i <- 0 until totalSizes.length) {
- totalSizes(i) += s.getSizeForBlock(i)
+ val parallelAggThreshold = conf.get(
+ SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD)
+ if (statuses.length * totalSizes.length < parallelAggThreshold) {
+ for (s <- statuses) {
+ for (i <- 0 until totalSizes.length) {
+ totalSizes(i) += s.getSizeForBlock(i)
+ }
+ }
+ } else {
+ val parallelism =
conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLELISM)
--- End diff --
How about setting `parallelism =
math.min(Runtime.getRuntime.availableProcessors(), statuses.length.toLong *
totalSizes.length / parallelAggThreshold)` rather than introducing a new
config, such as:
```
val parallelism = math.min(
Runtime.getRuntime.availableProcessors(),
statuses.length.toLong * totalSizes.length / parallelAggThreshold
+ 1)
if (parallelism <= 1) {
...
} else {
....
}
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]