Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/19763#discussion_r152921579
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
}
+ /**
+ * Grouped function of Range, this is to avoid traverse of all elements
of Range using
+ * IterableLike's grouped function.
+ */
+ def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+ val start = range.start
+ val step = range.step
+ val end = range.end
+ for (i <- start.until(end, size * step)) yield {
+ i.until(i + size * step, step)
+ }
+ }
+
+ /**
+ * To equally divide n elements into m buckets, basically each bucket
should have n/m elements,
+ * for the remaining n%m elements, add one more element to the first n%m
buckets each.
+ */
+ def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+ val elementsPerBucket = numElements / numBuckets
+ val remaining = numElements % numBuckets
+ val splitPoint = (elementsPerBucket + 1) * remaining
+ if (elementsPerBucket == 0) {
+ rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+ } else {
+ rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+ rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+ }
+ }
+
/**
* Return statistics about all of the outputs for a given shuffle.
*/
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics
= {
shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
- for (s <- statuses) {
- for (i <- 0 until totalSizes.length) {
- totalSizes(i) += s.getSizeForBlock(i)
+ val parallelAggThreshold = conf.get(
+ SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
--- End diff --
Yeah, I left the comment before
https://github.com/apache/spark/pull/19763#discussion_r152914613. I think it is
good enough to add more comment to the config entry.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]