[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19763 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152921579 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) --- End diff -- Yeah, I left the comment before https://github.com/apache/spark/pull/19763#discussion_r152914613. I think it is good enough to add more comment to the config entry. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152921091 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) --- End diff -- I think that code will make people confused, and we need more comments to explain, that seems unworthy. In most cases the default value is enough, so we just add some assertion and docs explanation will be good? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152920483 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- Yeah, I will add some. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152914684 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) --- End diff -- For zero or negative threshold, see my above comment: https://github.com/apache/spark/pull/19763#discussion_r152914613. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152914613 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- After rethinking about this, I think it is better to indicate this threshold also determines the number of threads in parallelism. So it should not be set to zero or negative number. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152913671 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) --- End diff -- Maybe a little picky, but should we do: ```scala val parallelAggThreshold = conf.get( SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + 1 ... val parallelism = math.min( Runtime.getRuntime.availableProcessors(), (statuses.length.toLong * totalSizes.length + 1) / parallelAggThreshold + 1).toInt ``` In case of the threshold being set to zero? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152912720 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- I think we don't need to indicate the calculation way in config description. The current one is enough. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152912363 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- It's ok. I misread the equation. Nvm. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152912084 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- Do you think it's necessary to indicate the actual parallelism's calculation way here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152911936 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- Oh, I see...nvm. I misread the `+ 1`... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152911829 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- Says `statuses.length.toLong * totalSizes.length` is `1001`, for example: ```scala scala> 1001 / 1001 res0: Int = 1 ``` Now, it is more than the threshold, but the parallel aggregation is not enabled... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152911325 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- `statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1` >= 2 -> `statuses.length.toLong * totalSizes.length >= parallelAggThreshold`, so it doesn't need to be 2 times, just not smaller than 1x is good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152908363 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- From above `statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1`, looks like we need to have at least two times of this threshold to enable this parallel aggregation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152907606 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- Looks like only `parallelism` >= 2, this parallel aggregation is enabled. Is it equal to `the number of mappers * shuffle partitions >= this threshold`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152907079 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- Sorry, but didn't get you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152906960 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1).toInt + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") --- End diff -- I think we don't need to fully utilize all available processors. `parallelAggThreshold` is default to be 10^7, which means a relatively small task to deal with. Therefore the tasks don't need to be cut smaller in most cases. For some cases where the split is a big task, `parallelAggThreshold` should be tuned. This is not very direct because you don't have a `xx.parallelism` config to set, but the benefit is we introduced less configs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152896879 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1).toInt + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") --- End diff -- The value of `parallelism` seems making us not fully utilize all processors at all time? E.g, if `availableProcessors` returns 8, but `parallelism` is 2, we pick 2 as number of threads. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152896399 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- Is this condition to enable parallel aggregation still true? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152888380 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length * totalSizes.length / parallelAggThreshold + 1) + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +try { + val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { +reduceIds => Future { + for (s <- statuses; i <- reduceIds) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } + ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf) +} finally { + threadpool.shutdown() --- End diff -- @cloud-fan `We can shut down the pool after some certain idle time, but not sure if it's worth the complexity` I know we don't need to do this now. But if we did it how to do? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152888257 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length * totalSizes.length / parallelAggThreshold + 1) + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +try { + val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { +reduceIds => Future { + for (s <- statuses; i <- reduceIds) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } + ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf) +} finally { + threadpool.shutdown() --- End diff -- My fault! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152870781 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length * totalSizes.length / parallelAggThreshold + 1) + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +try { + val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { +reduceIds => Future { + for (s <- statuses; i <- reduceIds) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } + ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf) +} finally { + threadpool.shutdown() --- End diff -- ah good catch! I misread it... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152863418 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length * totalSizes.length / parallelAggThreshold + 1) + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +try { + val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { +reduceIds => Future { + for (s <- statuses; i <- reduceIds) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } + ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf) +} finally { + threadpool.shutdown() --- End diff -- @gczsjdy Oh, sorry. I didn't realize there is already a `threadpool` field in `MapOutputTrackerMaster`. That's why there is no error. Here you are shutting down a wrong thread pool. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152827467 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length * totalSizes.length / parallelAggThreshold + 1) + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +try { + val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { +reduceIds => Future { + for (s <- statuses; i <- reduceIds) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } + ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf) +} finally { + threadpool.shutdown() --- End diff -- @zsxwing Actually I built using sbt/mvn, no errors... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152702214 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length * totalSizes.length / parallelAggThreshold + 1) --- End diff -- `statuses.length.toLong`. It's easy to overflow here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152693924 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length * totalSizes.length / parallelAggThreshold + 1) + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +try { + val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { +reduceIds => Future { + for (s <- statuses; i <- reduceIds) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } + ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf) +} finally { + threadpool.shutdown() --- End diff -- @gczsjdy could you fix the compile error? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152693851 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length * totalSizes.length / parallelAggThreshold + 1) + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +try { + val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { +reduceIds => Future { + for (s <- statuses; i <- reduceIds) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } + ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf) +} finally { + threadpool.shutdown() --- End diff -- I'm fine to create a thread pool every time since this code path seems not run pretty frequently because - Using a shared cached thread poll is just like creating new thread pool since the idle time of a thread is pretty large and is likely killed before the next call. - Using a shared fixed thread pool is totally a waste for most of use cases. - The cost of creating threads is trivial comparing the total time of a job. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152496569 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length * totalSizes.length / parallelAggThreshold + 1) + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +try { + val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { +reduceIds => Future { + for (s <- statuses; i <- reduceIds) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } + ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf) +} finally { + threadpool.shutdown() --- End diff -- We can shut down the pool after some certain idle time, but not sure if it's worth the complexity --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152493779 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length * totalSizes.length / parallelAggThreshold + 1) + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +try { + val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { +reduceIds => Future { + for (s <- statuses; i <- reduceIds) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } + ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf) +} finally { + threadpool.shutdown() --- End diff -- I agree with you, with putting the thread pool in the class, the only lost is that: even if when single-thread is used, this pool still exists. The gain is reducing creating the pool after every shuffle. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152193763 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length * totalSizes.length / parallelAggThreshold + 1) + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +try { + val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { +reduceIds => Future { + for (s <- statuses; i <- reduceIds) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } + ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf) +} finally { + threadpool.shutdown() --- End diff -- cc @zsxwing do we really need to shut down the thread pool every time? This method may be called many times and is it better to cache this thread pool? like the dispatcher thread pool. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152185531 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD = --- End diff -- Actually there are 3 confs like that... all need? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152084018 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,16 +475,48 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Iterator[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +if (remaining == 0) { + 0.until(numElements).grouped(elementsPerBucket) +} else { + val splitPoint = (elementsPerBucket + 1) * remaining + 0.to(splitPoint).grouped(elementsPerBucket + 1) ++ +(splitPoint + 1).until(numElements).grouped(elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD) + if (statuses.length * totalSizes.length < parallelAggThreshold) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +val parallelism = conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLELISM) +val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-statistics") --- End diff -- please put `threadPool.shutdown` in `finally` to shut down the thread pool --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152084974 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,16 +475,48 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Iterator[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +if (remaining == 0) { + 0.until(numElements).grouped(elementsPerBucket) +} else { + val splitPoint = (elementsPerBucket + 1) * remaining + 0.to(splitPoint).grouped(elementsPerBucket + 1) ++ +(splitPoint + 1).until(numElements).grouped(elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD) + if (statuses.length * totalSizes.length < parallelAggThreshold) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +val parallelism = conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLELISM) --- End diff -- How about setting `parallelism = math.min(Runtime.getRuntime.availableProcessors(), statuses.length.toLong * totalSizes.length / parallelAggThreshold)` rather than introducing a new config, such as: ``` val parallelism = math.min( Runtime.getRuntime.availableProcessors(), statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1) if (parallelism <= 1) { ... } else { } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152087573 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,16 +475,48 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Iterator[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +if (remaining == 0) { + 0.until(numElements).grouped(elementsPerBucket) +} else { + val splitPoint = (elementsPerBucket + 1) * remaining + 0.to(splitPoint).grouped(elementsPerBucket + 1) ++ --- End diff -- `grouped` is expensive here. I saw it generates Vector rather than `Range`: ``` scala> (1 to 100).grouped(10).foreach(g => println(g.getClass)) class scala.collection.immutable.Vector class scala.collection.immutable.Vector class scala.collection.immutable.Vector class scala.collection.immutable.Vector class scala.collection.immutable.Vector class scala.collection.immutable.Vector class scala.collection.immutable.Vector class scala.collection.immutable.Vector class scala.collection.immutable.Vector class scala.collection.immutable.Vector ``` It means we need to generate all of numbers between 0 and `numElements`. Could you implement a special `grouped` for Range instead? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152035574 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD = --- End diff -- yea let's add it. BTW shall we also use `mapOutput` instead of `mapOutputStatistics`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152022126 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Try to equally divide Range(0, num) to divisor slices + */ + def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = { +assert(divisor > 0, "Divisor should be positive") +val (each, remain) = (num / divisor, num % divisor) +val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each) --- End diff -- my proposal ``` def equallyDivide(numElements: Int, numBuckets: Int) { val elementsPerBucket = numElements / numBuckets val remaining = numElements % numBuckets if (remaining == 0) { 0.until(num).grouped(elementsPerBucket) } else { val splitPoint = (elementsPerBucket + 1) * remaining 0.to(splitPoint).grouped(elementsPerBucket + 1) ++ (splitPoint + 1).until(numElements).grouped(elementsPerBucket) } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152021708 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Try to equally divide Range(0, num) to divisor slices + */ + def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = { +assert(divisor > 0, "Divisor should be positive") +val (each, remain) = (num / divisor, num % divisor) +val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each) --- End diff -- Sure : ) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152020601 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Try to equally divide Range(0, num) to divisor slices + */ + def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = { +assert(divisor > 0, "Divisor should be positive") +val (each, remain) = (num / divisor, num % divisor) +val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each) --- End diff -- can you add some comment to describe the algorithm? I'd expect something like: ``` to equally divide n elements to m buckets each bucket should have n/m elements for the remaining n%m elements pick the first n/m buckets and add one more element ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152017736 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD = --- End diff -- There is also a `spark.shuffle.mapOutput.dispatcher.numThreads` in this file without config entry, do I need to add one? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152016632 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Try to equally divide Range(0, num) to divisor slices + */ + def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = { +assert(divisor > 0, "Divisor should be positive") +val (each, remain) = (num / divisor, num % divisor) +val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each) +if (each != 0) { + smaller.grouped(each) ++ bigger.grouped(each + 1) +} else { + bigger.grouped(each + 1) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + if (statuses.length * totalSizes.length <= + conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD)) { --- End diff -- nit: ``` val parallelAggThreshold = ... if (statuses.length * totalSizes.length < parallelAggThreshold) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152016336 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,20 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD = + ConfigBuilder("spark.shuffle.mapOutputStatistics.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions exceeds this " + +"threshold.") + .intConf + .createWithDefault(1000) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_CORES = +ConfigBuilder("spark.shuffle.mapOutputStatistics.cores") --- End diff -- nit: `cores` -> `parallelism` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152016240 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Try to equally divide Range(0, num) to divisor slices + */ + def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = { +assert(divisor > 0, "Divisor should be positive") +val (each, remain) = (num / divisor, num % divisor) +val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each) +if (each != 0) { + smaller.grouped(each) ++ bigger.grouped(each + 1) +} else { + bigger.grouped(each + 1) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + if (statuses.length * totalSizes.length <= +conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD)) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +val parallelism = conf.getInt("spark.adaptive.map.statistics.cores", 8) --- End diff -- I thought only adaptive execution code will call this. But actually it seems after all `ShuffleMapTask`s of a stage completed this will be called, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152008181 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD = --- End diff -- `spark.adaptive.map.statistics.cores` needs config entry, but I thought adaptive.xxx item has been put under `spark.sql.` already, so it might be inconsitent. Now I think it's no big deal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152007204 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Try to equally divide Range(0, num) to divisor slices + */ + def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = { +assert(divisor > 0, "Divisor should be positive") +val (each, remain) = (num / divisor, num % divisor) +val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each) +if (each != 0) { + smaller.grouped(each) ++ bigger.grouped(each + 1) +} else { + bigger.grouped(each + 1) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + if (statuses.length * totalSizes.length <= +conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD)) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +val parallelism = conf.getInt("spark.adaptive.map.statistics.cores", 8) --- End diff -- how is this related to `adaptive`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152007061 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD = --- End diff -- I don't get it. You showed me that `spark.sql.adaptive.xxx` have config entries, why `spark.adaptive.map.statistics.cores` doesn't need config entry? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152006310 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD = --- End diff -- I think that's not a big problem, adaptive execution need both core and sql code, so both confs are needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152005905 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD = --- End diff -- Like https://github.com/gczsjdy/spark/blob/11b60af737a04d931356aa74ebf3c6cf4a6b08d6/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L204-L204 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152004301 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD = --- End diff -- Really? I grep the code base but can't find it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152002860 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD = + ConfigBuilder("spark.shuffle.mapOutputStatistics.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions exceeds this " + +"threshold") + .intConf + .createWithDefault(1) --- End diff -- Now I also think it's a little bit large... In the case I mentioned, the 5s gap is created by 10^8 of this value. Maybe 10^7 is good? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152002262 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD = --- End diff -- `spark.sql.adaptive.xxx` already exists, will this be a problem? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r151962620 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD = + ConfigBuilder("spark.shuffle.mapOutputStatistics.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions exceeds this " + +"threshold") + .intConf + .createWithDefault(1) --- End diff -- wow 100 million is really a large threshold, how do you pick this number? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r151962448 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD = --- End diff -- `spark.adaptive.map.statistics.cores` should also be a config entry like this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r151921740 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutputStatisticsMultithreadThreshold") --- End diff -- Yes, it's better! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r151901370 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutputStatisticsMultithreadThreshold") --- End diff -- `spark.shuffle.mapOutputStatistics.parallelAggregationThreshold`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r151801786 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,17 +474,36 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Try to equally divide Range(0, num) to divisor slices + */ + def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = { +assert(divisor > 0, "Divisor should be positive") +val (each, remain) = (num / divisor, num % divisor) +val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each) +if (each != 0) { + smaller.grouped(each) ++ bigger.grouped(each + 1) +} else { + bigger.grouped(each + 1) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelism = conf.getInt("spark.adaptive.map.statistics.cores", 8) + + val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { --- End diff -- Doing this is not cheap. I would add a config and only run this in multiple threads when `#mapper * #shuffle_partitions` is large. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r151801570 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster( } /** + * Try to equally divide Range(0, num) to divisor slices + */ + def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = { +assert(divisor > 0, "Divisor should be positive") +val (each, remain) = (num / divisor, num % divisor) +val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each) +if (each != 0) { + smaller.grouped(each) ++ bigger.grouped(each + 1) +} else { + bigger.grouped(each + 1) +} + } + + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) -} + val mapStatusSubmitTasks = ArrayBuffer[Future[_]]() + var taskSlices = parallelism + + equallyDivide(totalSizes.length, taskSlices).foreach { +reduceIds => + mapStatusSubmitTasks += threadPoolMapStats.submit( +new Runnable { + override def run(): Unit = { +for (s <- statuses; i <- reduceIds) { + totalSizes(i) += s.getSizeForBlock(i) +} + } +} + ) } + mapStatusSubmitTasks.foreach(_.get()) --- End diff -- Don't use `scala.concurrent.ExecutionContext.Implicits.global`. You need to create a thread pool. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r151339166 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster( } /** + * Try to equally divide Range(0, num) to divisor slices + */ + def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = { +assert(divisor > 0, "Divisor should be positive") +val (each, remain) = (num / divisor, num % divisor) +val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each) +if (each != 0) { + smaller.grouped(each) ++ bigger.grouped(each + 1) +} else { + bigger.grouped(each + 1) +} + } + + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) -} + val mapStatusSubmitTasks = ArrayBuffer[Future[_]]() + var taskSlices = parallelism + + equallyDivide(totalSizes.length, taskSlices).foreach { +reduceIds => + mapStatusSubmitTasks += threadPoolMapStats.submit( +new Runnable { + override def run(): Unit = { +for (s <- statuses; i <- reduceIds) { + totalSizes(i) += s.getSizeForBlock(i) +} + } +} + ) } + mapStatusSubmitTasks.foreach(_.get()) --- End diff -- Should I use the `scala.concurrent.ExecutionContext.Implicits.global` ExecutionContextï¼ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r151332369 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster( } /** + * Try to equally divide Range(0, num) to divisor slices + */ + def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = { +assert(divisor > 0, "Divisor should be positive") +val (each, remain) = (num / divisor, num % divisor) +val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each) +if (each != 0) { + smaller.grouped(each) ++ bigger.grouped(each + 1) +} else { + bigger.grouped(each + 1) +} + } + + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) -} + val mapStatusSubmitTasks = ArrayBuffer[Future[_]]() + var taskSlices = parallelism + + equallyDivide(totalSizes.length, taskSlices).foreach { +reduceIds => + mapStatusSubmitTasks += threadPoolMapStats.submit( +new Runnable { + override def run(): Unit = { +for (s <- statuses; i <- reduceIds) { + totalSizes(i) += s.getSizeForBlock(i) +} + } +} + ) } + mapStatusSubmitTasks.foreach(_.get()) --- End diff -- Good idea, thx! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r151331447 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster( } /** + * Try to equally divide Range(0, num) to divisor slices + */ + def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = { +assert(divisor > 0, "Divisor should be positive") +val (each, remain) = (num / divisor, num % divisor) +val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each) +if (each != 0) { + smaller.grouped(each) ++ bigger.grouped(each + 1) +} else { + bigger.grouped(each + 1) +} + } + + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) -} + val mapStatusSubmitTasks = ArrayBuffer[Future[_]]() + var taskSlices = parallelism --- End diff -- Why `var`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r151322438 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster( } /** + * Try to equally divide Range(0, num) to divisor slices + */ + def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = { +assert(divisor > 0, "Divisor should be positive") +val (each, remain) = (num / divisor, num % divisor) +val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each) +if (each != 0) { + smaller.grouped(each) ++ bigger.grouped(each + 1) +} else { + bigger.grouped(each + 1) +} + } + + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) -} + val mapStatusSubmitTasks = ArrayBuffer[Future[_]]() + var taskSlices = parallelism + + equallyDivide(totalSizes.length, taskSlices).foreach { +reduceIds => + mapStatusSubmitTasks += threadPoolMapStats.submit( +new Runnable { + override def run(): Unit = { +for (s <- statuses; i <- reduceIds) { + totalSizes(i) += s.getSizeForBlock(i) +} + } +} + ) } + mapStatusSubmitTasks.foreach(_.get()) --- End diff -- this part can be simplified by using scala's Future, ```scala val futureArray = equallyDivide(totalSizes.length, taskSlices).map { reduceIds => Future { // whatever you want to do here } } Await.result(Future.sequence(futureArray), Duration.Inf) // or some timeout value you prefer ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org