[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19763


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152921579
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
--- End diff --

Yeah, I left the comment before 
https://github.com/apache/spark/pull/19763#discussion_r152914613. I think it is 
good enough to add more comment to the config entry.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-24 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152921091
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
--- End diff --

I think that code will make people confused, and we need more comments to 
explain, that seems unworthy. 
In most cases the default value is enough, so we just add some assertion 
and docs explanation will be good?  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-24 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152920483
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Yeah, I will add some.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152914684
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
--- End diff --

For zero or negative threshold, see my above comment: 
https://github.com/apache/spark/pull/19763#discussion_r152914613.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152914613
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

After rethinking about this, I think it is better to indicate this 
threshold also determines the number of threads in parallelism. So it should 
not be set to zero or negative number.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152913671
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
--- End diff --

Maybe a little picky, but should we do:
```scala
val parallelAggThreshold = conf.get(
  SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + 1
...
val parallelism = math.min(
  Runtime.getRuntime.availableProcessors(),
  (statuses.length.toLong * totalSizes.length + 1) / parallelAggThreshold + 
1).toInt
```

In case of the threshold being set to zero?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152912720
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

I think we don't need to indicate the calculation way in config 
description. The current one is enough.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152912363
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

It's ok. I misread the equation. Nvm.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152912084
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Do you think it's necessary to indicate the actual parallelism's 
calculation way here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152911936
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Oh, I see...nvm. I misread the `+ 1`...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152911829
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Says `statuses.length.toLong * totalSizes.length` is `1001`, for 
example:

```scala
scala> 1001 / 1001
res0: Int = 1
```

Now, it is more than the threshold, but the parallel aggregation is not 
enabled...



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152911325
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

`statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1` >= 
2 -> `statuses.length.toLong * totalSizes.length >= parallelAggThreshold`, so 
it doesn't need to be 2 times, just not smaller than 1x is good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152908363
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

From above `statuses.length.toLong * totalSizes.length / 
parallelAggThreshold + 1`, looks like we need to have at least two times of 
this threshold to enable this parallel aggregation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152907606
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Looks like only `parallelism` >= 2, this parallel aggregation is enabled. 
Is it equal to `the number of mappers * shuffle partitions >= this threshold`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152907079
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Sorry, but didn't get you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152906960
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length.toLong * totalSizes.length / parallelAggThreshold 
+ 1).toInt
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, 
"map-output-aggregate")
--- End diff --

I think we don't need to fully utilize all available processors. 
`parallelAggThreshold` is default to be 10^7, which means a relatively small 
task to deal with. Therefore the tasks don't need to be cut smaller in most 
cases. 
For some cases where the split is a big task, `parallelAggThreshold` should 
be tuned. This is not very direct because you don't have a `xx.parallelism` 
config to set, but the benefit is we introduced less configs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152896879
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length.toLong * totalSizes.length / parallelAggThreshold 
+ 1).toInt
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, 
"map-output-aggregate")
--- End diff --

The value of `parallelism` seems making us not fully utilize all processors 
at all time? E.g, if `availableProcessors` returns 8, but `parallelism` is 2, 
we pick 2 as number of threads.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152896399
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Is this condition to enable parallel aggregation still true?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152888380
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

@cloud-fan `We can shut down the pool after some certain idle time, but not 
sure if it's worth the complexity` I know we don't need to do this now. But if 
we did it how to do?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152888257
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

My fault!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152870781
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

ah good catch! I misread it...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152863418
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

@gczsjdy Oh, sorry. I didn't realize there is already a `threadpool` field 
in `MapOutputTrackerMaster`. That's why there is no error. Here you are 
shutting down a wrong thread pool.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152827467
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

@zsxwing Actually I built using sbt/mvn, no errors...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152702214
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
--- End diff --

`statuses.length.toLong`. It's easy to overflow here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152693924
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

@gczsjdy could you fix the compile error?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152693851
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

I'm fine to create a thread pool every time since this code path seems not 
run pretty frequently because 
- Using a shared cached thread poll is just like creating new thread pool 
since the idle time of a thread is pretty large and is likely killed before the 
next call.
- Using a shared fixed thread pool is totally a waste for most of use cases.
- The cost of creating threads is trivial comparing the total time of a job.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152496569
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

We can shut down the pool after some certain idle time, but not sure if 
it's worth the complexity


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-22 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152493779
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

I agree with you, with putting the thread pool in the class, the only lost 
is that: even if when single-thread is used, this pool still exists. The gain 
is reducing creating the pool after every shuffle.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152193763
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

cc @zsxwing do we really need to shut down the thread pool every time? This 
method may be called many times and is it better to cache this thread pool? 
like the dispatcher thread pool.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152185531
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

Actually there are 3 confs like that... all need?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152084018
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,16 +475,48 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Iterator[Seq[Int]] 
= {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+if (remaining == 0) {
+  0.until(numElements).grouped(elementsPerBucket)
+} else {
+  val splitPoint = (elementsPerBucket + 1) * remaining
+  0.to(splitPoint).grouped(elementsPerBucket + 1) ++
+(splitPoint + 1).until(numElements).grouped(elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD)
+  if (statuses.length * totalSizes.length < parallelAggThreshold) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+val parallelism = 
conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLELISM)
+val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, 
"map-output-statistics")
--- End diff --

please put `threadPool.shutdown` in `finally` to shut down the thread pool 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152084974
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,16 +475,48 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Iterator[Seq[Int]] 
= {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+if (remaining == 0) {
+  0.until(numElements).grouped(elementsPerBucket)
+} else {
+  val splitPoint = (elementsPerBucket + 1) * remaining
+  0.to(splitPoint).grouped(elementsPerBucket + 1) ++
+(splitPoint + 1).until(numElements).grouped(elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD)
+  if (statuses.length * totalSizes.length < parallelAggThreshold) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+val parallelism = 
conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLELISM)
--- End diff --

How about setting  `parallelism =  
math.min(Runtime.getRuntime.availableProcessors(), statuses.length.toLong * 
totalSizes.length / parallelAggThreshold)` rather than introducing a new 
config, such as:
```
 val parallelism = math.min(
 Runtime.getRuntime.availableProcessors(), 
 statuses.length.toLong * totalSizes.length / parallelAggThreshold 
+ 1)
  if (parallelism <= 1) {
   ...
  } else {

  }
```




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152087573
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,16 +475,48 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Iterator[Seq[Int]] 
= {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+if (remaining == 0) {
+  0.until(numElements).grouped(elementsPerBucket)
+} else {
+  val splitPoint = (elementsPerBucket + 1) * remaining
+  0.to(splitPoint).grouped(elementsPerBucket + 1) ++
--- End diff --

`grouped` is expensive here. I saw it generates Vector rather than `Range`:
```
scala> (1 to 100).grouped(10).foreach(g => println(g.getClass))
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
```
It means we need to generate all of numbers between 0 and `numElements`. 
Could you implement a special `grouped` for Range instead?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152035574
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

yea let's add it. BTW shall we also use `mapOutput` instead of 
`mapOutputStatistics`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152022126
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
--- End diff --

my proposal
```
def equallyDivide(numElements: Int, numBuckets: Int) {
  val elementsPerBucket = numElements / numBuckets
  val remaining = numElements % numBuckets
  if (remaining == 0) {
0.until(num).grouped(elementsPerBucket)
  } else {
val splitPoint = (elementsPerBucket + 1) * remaining
0.to(splitPoint).grouped(elementsPerBucket + 1) ++ (splitPoint + 
1).until(numElements).grouped(elementsPerBucket)
  }
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152021708
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
--- End diff --

Sure : )


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152020601
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
--- End diff --

can you add some comment to describe the algorithm? I'd expect something 
like:
```
to equally divide n elements to m buckets
each bucket should have n/m elements
for the remaining n%m elements
pick the first n/m buckets and add one more element
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152017736
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

There is also a `spark.shuffle.mapOutput.dispatcher.numThreads` in this 
file without config entry, do I need to add one?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152016632
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
+if (each != 0) {
+  smaller.grouped(each) ++ bigger.grouped(each + 1)
+} else {
+  bigger.grouped(each + 1)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  if (statuses.length * totalSizes.length <=
+
conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD)) {
--- End diff --

nit:
```
val parallelAggThreshold = ...
if (statuses.length * totalSizes.length < parallelAggThreshold)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152016336
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,20 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
+
ConfigBuilder("spark.shuffle.mapOutputStatistics.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions exceeds this " +
+"threshold.")
+  .intConf
+  .createWithDefault(1000)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_CORES =
+ConfigBuilder("spark.shuffle.mapOutputStatistics.cores")
--- End diff --

nit: `cores` -> `parallelism`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152016240
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
+if (each != 0) {
+  smaller.grouped(each) ++ bigger.grouped(each + 1)
+} else {
+  bigger.grouped(each + 1)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  if (statuses.length * totalSizes.length <=
+conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD)) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+val parallelism = 
conf.getInt("spark.adaptive.map.statistics.cores", 8)
--- End diff --

I thought only adaptive execution code will call this. But actually it 
seems after all `ShuffleMapTask`s of a stage completed this will be called, 
right?  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152008181
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

`spark.adaptive.map.statistics.cores` needs config entry, but I thought 
adaptive.xxx item has been put under `spark.sql.` already, so it might be 
inconsitent. Now I think it's no big deal.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152007204
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
+if (each != 0) {
+  smaller.grouped(each) ++ bigger.grouped(each + 1)
+} else {
+  bigger.grouped(each + 1)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  if (statuses.length * totalSizes.length <=
+conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD)) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+val parallelism = 
conf.getInt("spark.adaptive.map.statistics.cores", 8)
--- End diff --

how is this related to `adaptive`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152007061
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

I don't get it. You showed me that `spark.sql.adaptive.xxx` have config 
entries, why `spark.adaptive.map.statistics.cores` doesn't need config entry?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152006310
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

I think that's not a big problem, adaptive execution need both core and sql 
code, so both confs are needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152005905
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

Like 
https://github.com/gczsjdy/spark/blob/11b60af737a04d931356aa74ebf3c6cf4a6b08d6/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L204-L204
 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152004301
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

Really? I grep the code base but can't find it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152002860
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
+
ConfigBuilder("spark.shuffle.mapOutputStatistics.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions exceeds this " +
+"threshold")
+  .intConf
+  .createWithDefault(1)
--- End diff --

Now I also think it's a little bit large... In the case I mentioned, the 5s 
gap is created by 10^8 of this value. Maybe 10^7 is good? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152002262
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

`spark.sql.adaptive.xxx` already exists, will this be a problem? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r151962620
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
+
ConfigBuilder("spark.shuffle.mapOutputStatistics.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions exceeds this " +
+"threshold")
+  .intConf
+  .createWithDefault(1)
--- End diff --

wow 100 million is really a large threshold, how do you pick this number?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r151962448
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

`spark.adaptive.map.statistics.cores` should also be a config entry like 
this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r151921740
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutputStatisticsMultithreadThreshold")
--- End diff --

Yes, it's better!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r151901370
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutputStatisticsMultithreadThreshold")
--- End diff --

`spark.shuffle.mapOutputStatistics.parallelAggregationThreshold`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r151801786
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,17 +474,36 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
+if (each != 0) {
+  smaller.grouped(each) ++ bigger.grouped(each + 1)
+} else {
+  bigger.grouped(each + 1)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelism = conf.getInt("spark.adaptive.map.statistics.cores", 
8)
+
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
--- End diff --

Doing this is not cheap. I would add a config and only run this in multiple 
threads when `#mapper * #shuffle_partitions` is large.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r151801570
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster(
   }
 
   /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
+if (each != 0) {
+  smaller.grouped(each) ++ bigger.grouped(each + 1)
+} else {
+  bigger.grouped(each + 1)
+}
+  }
+
+  /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
-}
+  val mapStatusSubmitTasks = ArrayBuffer[Future[_]]()
+  var taskSlices = parallelism
+
+  equallyDivide(totalSizes.length, taskSlices).foreach {
+reduceIds =>
+  mapStatusSubmitTasks += threadPoolMapStats.submit(
+new Runnable {
+  override def run(): Unit = {
+for (s <- statuses; i <- reduceIds) {
+  totalSizes(i) += s.getSizeForBlock(i)
+}
+  }
+}
+  )
   }
+  mapStatusSubmitTasks.foreach(_.get())
--- End diff --

Don't use `scala.concurrent.ExecutionContext.Implicits.global`. You need to 
create a thread pool.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-15 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r151339166
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster(
   }
 
   /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
+if (each != 0) {
+  smaller.grouped(each) ++ bigger.grouped(each + 1)
+} else {
+  bigger.grouped(each + 1)
+}
+  }
+
+  /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
-}
+  val mapStatusSubmitTasks = ArrayBuffer[Future[_]]()
+  var taskSlices = parallelism
+
+  equallyDivide(totalSizes.length, taskSlices).foreach {
+reduceIds =>
+  mapStatusSubmitTasks += threadPoolMapStats.submit(
+new Runnable {
+  override def run(): Unit = {
+for (s <- statuses; i <- reduceIds) {
+  totalSizes(i) += s.getSizeForBlock(i)
+}
+  }
+}
+  )
   }
+  mapStatusSubmitTasks.foreach(_.get())
--- End diff --

Should I use the `scala.concurrent.ExecutionContext.Implicits.global` 
ExecutionContext?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-15 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r151332369
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster(
   }
 
   /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
+if (each != 0) {
+  smaller.grouped(each) ++ bigger.grouped(each + 1)
+} else {
+  bigger.grouped(each + 1)
+}
+  }
+
+  /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
-}
+  val mapStatusSubmitTasks = ArrayBuffer[Future[_]]()
+  var taskSlices = parallelism
+
+  equallyDivide(totalSizes.length, taskSlices).foreach {
+reduceIds =>
+  mapStatusSubmitTasks += threadPoolMapStats.submit(
+new Runnable {
+  override def run(): Unit = {
+for (s <- statuses; i <- reduceIds) {
+  totalSizes(i) += s.getSizeForBlock(i)
+}
+  }
+}
+  )
   }
+  mapStatusSubmitTasks.foreach(_.get())
--- End diff --

Good idea, thx!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-15 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r151331447
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster(
   }
 
   /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
+if (each != 0) {
+  smaller.grouped(each) ++ bigger.grouped(each + 1)
+} else {
+  bigger.grouped(each + 1)
+}
+  }
+
+  /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
-}
+  val mapStatusSubmitTasks = ArrayBuffer[Future[_]]()
+  var taskSlices = parallelism
--- End diff --

Why `var`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-15 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r151322438
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster(
   }
 
   /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
+if (each != 0) {
+  smaller.grouped(each) ++ bigger.grouped(each + 1)
+} else {
+  bigger.grouped(each + 1)
+}
+  }
+
+  /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
-}
+  val mapStatusSubmitTasks = ArrayBuffer[Future[_]]()
+  var taskSlices = parallelism
+
+  equallyDivide(totalSizes.length, taskSlices).foreach {
+reduceIds =>
+  mapStatusSubmitTasks += threadPoolMapStats.submit(
+new Runnable {
+  override def run(): Unit = {
+for (s <- statuses; i <- reduceIds) {
+  totalSizes(i) += s.getSizeForBlock(i)
+}
+  }
+}
+  )
   }
+  mapStatusSubmitTasks.foreach(_.get())
--- End diff --

this part can be simplified by using scala's Future, 

```scala
val futureArray = equallyDivide(totalSizes.length, taskSlices).map {
 reduceIds => Future {
 // whatever you want to do here
 }
}
Await.result(Future.sequence(futureArray), Duration.Inf) // or some timeout 
value you prefer
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org