Github user gczsjdy commented on a diff in the pull request:
https://github.com/apache/spark/pull/19763#discussion_r152921091
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
}
+ /**
+ * Grouped function of Range, this is to avoid traverse of all elements
of Range using
+ * IterableLike's grouped function.
+ */
+ def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+ val start = range.start
+ val step = range.step
+ val end = range.end
+ for (i <- start.until(end, size * step)) yield {
+ i.until(i + size * step, step)
+ }
+ }
+
+ /**
+ * To equally divide n elements into m buckets, basically each bucket
should have n/m elements,
+ * for the remaining n%m elements, add one more element to the first n%m
buckets each.
+ */
+ def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+ val elementsPerBucket = numElements / numBuckets
+ val remaining = numElements % numBuckets
+ val splitPoint = (elementsPerBucket + 1) * remaining
+ if (elementsPerBucket == 0) {
+ rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+ } else {
+ rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+ rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+ }
+ }
+
/**
* Return statistics about all of the outputs for a given shuffle.
*/
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics
= {
shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
- for (s <- statuses) {
- for (i <- 0 until totalSizes.length) {
- totalSizes(i) += s.getSizeForBlock(i)
+ val parallelAggThreshold = conf.get(
+ SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
--- End diff --
I think that code will make people confused, and we need more comments to
explain, that seems unworthy.
In most cases the default value is enough, so we just add some assertion
and docs explanation will be good?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]