Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19763#discussion_r152693851
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
}
+ /**
+ * Grouped function of Range, this is to avoid traverse of all elements
of Range using
+ * IterableLike's grouped function.
+ */
+ def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+ val start = range.start
+ val step = range.step
+ val end = range.end
+ for (i <- start.until(end, size * step)) yield {
+ i.until(i + size * step, step)
+ }
+ }
+
+ /**
+ * To equally divide n elements into m buckets, basically each bucket
should have n/m elements,
+ * for the remaining n%m elements, add one more element to the first n%m
buckets each.
+ */
+ def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+ val elementsPerBucket = numElements / numBuckets
+ val remaining = numElements % numBuckets
+ val splitPoint = (elementsPerBucket + 1) * remaining
+ if (elementsPerBucket == 0) {
+ rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+ } else {
+ rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+ rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+ }
+ }
+
/**
* Return statistics about all of the outputs for a given shuffle.
*/
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics
= {
shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
- for (s <- statuses) {
- for (i <- 0 until totalSizes.length) {
- totalSizes(i) += s.getSizeForBlock(i)
+ val parallelAggThreshold = conf.get(
+ SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+ val parallelism = math.min(
+ Runtime.getRuntime.availableProcessors(),
+ statuses.length * totalSizes.length / parallelAggThreshold + 1)
+ if (parallelism <= 1) {
+ for (s <- statuses) {
+ for (i <- 0 until totalSizes.length) {
+ totalSizes(i) += s.getSizeForBlock(i)
+ }
+ }
+ } else {
+ try {
+ val threadPool =
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+ implicit val executionContext =
ExecutionContext.fromExecutor(threadPool)
+ val mapStatusSubmitTasks = equallyDivide(totalSizes.length,
parallelism).map {
+ reduceIds => Future {
+ for (s <- statuses; i <- reduceIds) {
+ totalSizes(i) += s.getSizeForBlock(i)
+ }
+ }
+ }
+ ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks),
Duration.Inf)
+ } finally {
+ threadpool.shutdown()
--- End diff --
I'm fine to create a thread pool every time since this code path seems not
run pretty frequently because
- Using a shared cached thread poll is just like creating new thread pool
since the idle time of a thread is pretty large and is likely killed before the
next call.
- Using a shared fixed thread pool is totally a waste for most of use cases.
- The cost of creating threads is trivial comparing the total time of a job.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]