[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r169967506
  
--- Diff: project/MimaExcludes.scala ---
@@ -1129,6 +1129,12 @@ object MimaExcludes {
   
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.org$apache$spark$ml$param$shared$HasLoss$_setter_$loss_="),
   
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.getLoss"),
   
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.loss")
+) ++ Seq(
+  // [SPARK-9853][Core] Optimize shuffle fetch of contiguous partition 
IDs.
+  
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.FetchFailed$"),
--- End diff --

yup


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r169902931
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -295,8 +307,8 @@ private[spark] abstract class MapOutputTracker(conf: 
SparkConf) extends Logging
* and the second item is a sequence of (shuffle block id, 
shuffle block size) tuples
* describing the shuffle blocks that are stored at that block 
manager.
*/
-  def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, 
endPartition: Int)
-  : Seq[(BlockManagerId, Seq[(BlockId, Long)])]
+  def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, 
endPartition: Int,
+  serializer: Serializer = null): Seq[(BlockManagerId, Seq[(BlockId, 
Long)])]
--- End diff --

I'd like to pass in a `serializerRelocatable: Boolean` instead of a 
serializer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-22 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r169901683
  
--- Diff: project/MimaExcludes.scala ---
@@ -1129,6 +1129,12 @@ object MimaExcludes {
   
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.org$apache$spark$ml$param$shared$HasLoss$_setter_$loss_="),
   
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.getLoss"),
   
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.loss")
+) ++ Seq(
+  // [SPARK-9853][Core] Optimize shuffle fetch of contiguous partition 
IDs.
+  
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.FetchFailed$"),
--- End diff --

I see, so I will keep FetchFailed as it was (no numBlocks is introduced), 
is it OK?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r169899114
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -295,8 +307,8 @@ private[spark] abstract class MapOutputTracker(conf: 
SparkConf) extends Logging
* and the second item is a sequence of (shuffle block id, 
shuffle block size) tuples
* describing the shuffle blocks that are stored at that block 
manager.
*/
-  def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, 
endPartition: Int)
-  : Seq[(BlockManagerId, Seq[(BlockId, Long)])]
+  def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, 
endPartition: Int,
+  serializer: Serializer = null): Seq[(BlockManagerId, Seq[(BlockId, 
Long)])]
--- End diff --

if possible let's not introduce a default value here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r169898393
  
--- Diff: project/MimaExcludes.scala ---
@@ -1129,6 +1129,12 @@ object MimaExcludes {
   
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.org$apache$spark$ml$param$shared$HasLoss$_setter_$loss_="),
   
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.getLoss"),
   
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.loss")
+) ++ Seq(
+  // [SPARK-9853][Core] Optimize shuffle fetch of contiguous partition 
IDs.
+  
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.FetchFailed$"),
--- End diff --

We can get rid of this if we don't touch `FetchFailed`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r169873362
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -280,6 +282,16 @@ private[spark] abstract class MapOutputTracker(conf: 
SparkConf) extends Logging
 }
   }
 
+  protected def supportsContinuousBlockBatchFetch(serializer: Serializer): 
Boolean = {
+if (serializer == null || 
!serializer.supportsRelocationOfSerializedObjects) {
+  return false
+}
+val compressionEnabled: Boolean = 
conf.getBoolean("spark.shuffle.compress", true)
+val compressionCodec: CompressionCodec = 
CompressionCodec.createCodec(conf)
--- End diff --

We only need to create the codec if `compressionEnabled` is true.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-21 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r169843676
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -280,6 +281,16 @@ private[spark] abstract class MapOutputTracker(conf: 
SparkConf) extends Logging
 }
   }
 
+  protected def supportsContinuousBlockBulkFetch: Boolean = {
+// continuousBlockBulkFetch only happens in SparkSQL, it uses 
UnsafeRowSerializer,
+// which supports relocation of serialized objects, so we only 
consider compression
+val adaptiveEnabled: Boolean = 
conf.getBoolean("spark.sql.adaptive.enabled", false)
--- End diff --

Got it, adaptiveEnabled will be removed as your suggestion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-21 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r169843133
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 ---
@@ -157,21 +157,34 @@ public void registerExecutor(
   }
 
   /**
-   * Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). 
We make assumptions
+   * Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId, 
numBlocks). We make assumptions
* about how the hash and sort based shuffles store their data.
*/
   public ManagedBuffer getBlockData(
   String appId,
   String execId,
   int shuffleId,
   int mapId,
-  int reduceId) {
+  int reduceId,
+  int numBlocks) {
 ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, 
execId));
 if (executor == null) {
   throw new RuntimeException(
 String.format("Executor is not registered (appId=%s, execId=%s)", 
appId, execId));
 }
-return getSortBasedShuffleBlockData(executor, shuffleId, mapId, 
reduceId);
+return getSortBasedShuffleBlockData(executor, shuffleId, mapId, 
reduceId, numBlocks);
+  }
+
+  /**
+   * This interface is for backward compatible.
+   */
+  public ManagedBuffer getBlockData(
--- End diff --

It is called in ExternalShuffleBlockResolverSuite only, so how about 
removing it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r169229650
  
--- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala ---
@@ -81,16 +81,17 @@ case object Resubmitted extends TaskFailedReason {
  */
 @DeveloperApi
 case class FetchFailed(
-bmAddress: BlockManagerId,  // Note that bmAddress can be null
+bmAddress: BlockManagerId, // Note that bmAddress can be null
 shuffleId: Int,
 mapId: Int,
 reduceId: Int,
-message: String)
+message: String,
+numBlocks: Int = 1)
   extends TaskFailedReason {
   override def toErrorString: String = {
 val bmAddressString = if (bmAddress == null) "null" else 
bmAddress.toString
 s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, 
reduceId=$reduceId, " +
-  s"message=\n$message\n)"
+  s"numBlocks=$numBlocks, message=\n$message\n)"
--- End diff --

do we really care about the `numBlock` when a shuffle fetch failed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r169229539
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala ---
@@ -51,11 +51,25 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) 
extends BlockId {
 
 // Format of the shuffle block ids (including data and index) should be 
kept in sync with
 // 
org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
+trait ShuffleBlockIdBase {
--- End diff --

we can make this extend `BlockId`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r169229060
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -280,6 +281,16 @@ private[spark] abstract class MapOutputTracker(conf: 
SparkConf) extends Logging
 }
   }
 
+  protected def supportsContinuousBlockBulkFetch: Boolean = {
+// continuousBlockBulkFetch only happens in SparkSQL, it uses 
UnsafeRowSerializer,
+// which supports relocation of serialized objects, so we only 
consider compression
+val adaptiveEnabled: Boolean = 
conf.getBoolean("spark.sql.adaptive.enabled", false)
--- End diff --

I don't think we can get this conf here. SQL conf is per-session and is 
kind of a layer above Spark conf.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r169228298
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 ---
@@ -157,21 +157,34 @@ public void registerExecutor(
   }
 
   /**
-   * Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). 
We make assumptions
+   * Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId, 
numBlocks). We make assumptions
* about how the hash and sort based shuffles store their data.
*/
   public ManagedBuffer getBlockData(
   String appId,
   String execId,
   int shuffleId,
   int mapId,
-  int reduceId) {
+  int reduceId,
+  int numBlocks) {
 ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, 
execId));
 if (executor == null) {
   throw new RuntimeException(
 String.format("Executor is not registered (appId=%s, execId=%s)", 
appId, execId));
 }
-return getSortBasedShuffleBlockData(executor, shuffleId, mapId, 
reduceId);
+return getSortBasedShuffleBlockData(executor, shuffleId, mapId, 
reduceId, numBlocks);
+  }
+
+  /**
+   * This interface is for backward compatible.
+   */
+  public ManagedBuffer getBlockData(
--- End diff --

where will we call this method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r169227734
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
 ---
@@ -196,42 +196,51 @@ private ShuffleMetrics() {
 private final String appId;
 private final String execId;
 private final int shuffleId;
-// An array containing mapId and reduceId pairs.
-private final int[] mapIdAndReduceIds;
+// An array containing mapId, reduceId and numBlocks tuple
+private final int[] shuffleBlockIds;
 
 ManagedBufferIterator(String appId, String execId, String[] blockIds) {
   this.appId = appId;
   this.execId = execId;
   String[] blockId0Parts = blockIds[0].split("_");
-  if (blockId0Parts.length != 4 || 
!blockId0Parts[0].equals("shuffle")) {
+  // length == 4: ShuffleBlockId
+  // length == 5: ContinuousShuffleBlockId
+  if (!(blockId0Parts.length == 4 || blockId0Parts.length == 5) ||
+!blockId0Parts[0].equals("shuffle")) {
 throw new IllegalArgumentException("Unexpected shuffle block id 
format: " + blockIds[0]);
   }
   this.shuffleId = Integer.parseInt(blockId0Parts[1]);
-  mapIdAndReduceIds = new int[2 * blockIds.length];
+  shuffleBlockIds = new int[3 * blockIds.length];
   for (int i = 0; i < blockIds.length; i++) {
 String[] blockIdParts = blockIds[i].split("_");
-if (blockIdParts.length != 4 || 
!blockIdParts[0].equals("shuffle")) {
+if (!(blockIdParts.length == 4 || blockIdParts.length == 5) ||
+  !blockIdParts[0].equals("shuffle")) {
--- End diff --

shall we create a `boolean isShuffleBlock(String[] blockIdParts)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-01 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r165409015
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 ---
@@ -165,13 +165,23 @@ public ManagedBuffer getBlockData(
   String execId,
   int shuffleId,
   int mapId,
-  int reduceId) {
+  int reduceId,
+  int length) {
--- End diff --

Ok, will use numBlocks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2018-02-01 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r165408800
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
 ---
@@ -203,22 +203,23 @@ private ShuffleMetrics() {
   this.appId = appId;
   this.execId = execId;
   String[] blockId0Parts = blockIds[0].split("_");
-  if (blockId0Parts.length != 4 || 
!blockId0Parts[0].equals("shuffle")) {
+  if (blockId0Parts.length != 5 || 
!blockId0Parts[0].equals("shuffle")) {
 throw new IllegalArgumentException("Unexpected shuffle block id 
format: " + blockIds[0]);
   }
   this.shuffleId = Integer.parseInt(blockId0Parts[1]);
-  mapIdAndReduceIds = new int[2 * blockIds.length];
+  mapIdAndReduceIds = new int[3 * blockIds.length];
--- End diff --

Thanks, fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-12-20 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r158124202
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
 ---
@@ -203,22 +203,23 @@ private ShuffleMetrics() {
   this.appId = appId;
   this.execId = execId;
   String[] blockId0Parts = blockIds[0].split("_");
-  if (blockId0Parts.length != 4 || 
!blockId0Parts[0].equals("shuffle")) {
+  if (blockId0Parts.length != 5 || 
!blockId0Parts[0].equals("shuffle")) {
--- End diff --

This format change can cause incompatibility between shuffle service and 
spark application - causing a restart of the cluster and update of all spark 
applications  I wish we had a better way to encode this information which 
was not so brittle.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-12-20 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r158121392
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 ---
@@ -165,13 +165,23 @@ public ManagedBuffer getBlockData(
   String execId,
   int shuffleId,
   int mapId,
-  int reduceId) {
+  int reduceId,
+  int length) {
--- End diff --

Please rename the variable - `length` is incorrect (here and other places), 
please rename to make it clear : `numBlocks `perhaps ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-12-20 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r158120856
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
 ---
@@ -203,22 +203,23 @@ private ShuffleMetrics() {
   this.appId = appId;
   this.execId = execId;
   String[] blockId0Parts = blockIds[0].split("_");
-  if (blockId0Parts.length != 4 || 
!blockId0Parts[0].equals("shuffle")) {
+  if (blockId0Parts.length != 5 || 
!blockId0Parts[0].equals("shuffle")) {
 throw new IllegalArgumentException("Unexpected shuffle block id 
format: " + blockIds[0]);
   }
   this.shuffleId = Integer.parseInt(blockId0Parts[1]);
-  mapIdAndReduceIds = new int[2 * blockIds.length];
+  mapIdAndReduceIds = new int[3 * blockIds.length];
--- End diff --

Please update description of the variable as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-12-20 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r158123441
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -812,10 +812,10 @@ private[spark] object MapOutputTracker extends 
Logging {
 logError(errorMessage)
 throw new MetadataFetchFailedException(shuffleId, startPartition, 
errorMessage)
   } else {
-for (part <- startPartition until endPartition) {
-  splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) 
+=
-((ShuffleBlockId(shuffleId, mapId, part), 
status.getSizeForBlock(part)))
-}
+val totalSize: Long = (startPartition until 
endPartition).map(status.getSizeForBlock).sum
+splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
+  ((ShuffleBlockId(shuffleId, mapId, startPartition, endPartition 
- startPartition),
+totalSize))
--- End diff --

This is going to create some very heavy shuffle fetches - and looks 
incorrect.
This merge should not be happening here, but in 
`ShuffleBlockFetcherIterator`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-12-20 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r158124746
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
 ---
@@ -59,9 +59,9 @@ public int getSize() {
   /**
* Get index offset for a particular reducer.
*/
-  public ShuffleIndexRecord getIndex(int reduceId) {
+  public ShuffleIndexRecord getIndex(int reduceId, int length) {
--- End diff --

perhaps `require` that length (number of Blocks) is >= 1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-12-20 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r158124569
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 ---
@@ -165,13 +165,23 @@ public ManagedBuffer getBlockData(
   String execId,
   int shuffleId,
   int mapId,
-  int reduceId) {
+  int reduceId,
+  int length) {
 ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, 
execId));
 if (executor == null) {
   throw new RuntimeException(
 String.format("Executor is not registered (appId=%s, execId=%s)", 
appId, execId));
 }
-return getSortBasedShuffleBlockData(executor, shuffleId, mapId, 
reduceId);
+return getSortBasedShuffleBlockData(executor, shuffleId, mapId, 
reduceId, length);
+  }
+
+  public ManagedBuffer getBlockData(
+  String appId,
+  String execId,
+  int shuffleId,
+  int mapId,
+  int reduceId) {
--- End diff --

Remove this method ? We dont need it anymore


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-26 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r153117548
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala ---
@@ -116,8 +117,8 @@ object BlockId {
   def apply(name: String): BlockId = name match {
 case RDD(rddId, splitIndex) =>
   RDDBlockId(rddId.toInt, splitIndex.toInt)
-case SHUFFLE(shuffleId, mapId, reduceId) =>
-  ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
+case SHUFFLE(shuffleId, mapId, reduceId, n) =>
--- End diff --

Yes, good catch! I will change here after using `ContinuousShuffleBlockId`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-26 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r153117088
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala ---
@@ -116,8 +117,8 @@ object BlockId {
   def apply(name: String): BlockId = name match {
 case RDD(rddId, splitIndex) =>
   RDDBlockId(rddId.toInt, splitIndex.toInt)
-case SHUFFLE(shuffleId, mapId, reduceId) =>
-  ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
+case SHUFFLE(shuffleId, mapId, reduceId, n) =>
--- End diff --

:nit length?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-26 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r153089584
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
 val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
 try {
   ByteStreams.skipFully(in, blockId.reduceId * 8)
   val offset = in.readLong()
+  ByteStreams.skipFully(in, (blockId.length - 1) * 8)
--- End diff --

I get your point, thanks for the explanation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-25 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r153057489
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 ---
@@ -165,13 +165,23 @@ public ManagedBuffer getBlockData(
   String execId,
   int shuffleId,
   int mapId,
-  int reduceId) {
+  int reduceId,
+  int length) {
 ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, 
execId));
 if (executor == null) {
   throw new RuntimeException(
 String.format("Executor is not registered (appId=%s, execId=%s)", 
appId, execId));
 }
-return getSortBasedShuffleBlockData(executor, shuffleId, mapId, 
reduceId);
+return getSortBasedShuffleBlockData(executor, shuffleId, mapId, 
reduceId, length);
+  }
+
+  public ManagedBuffer getBlockData(
--- End diff --

Thanks, will update.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-25 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r153049711
  
--- Diff: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
 ---
@@ -110,6 +110,13 @@ public void testSortShuffleBlocks() throws IOException 
{
 new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
 block1Stream.close();
 assertEquals(sortBlock1, block1);
+
+InputStream block01Stream =
+resolver.getBlockData("app0", "exec0", 0, 0, 0, 
2).createInputStream();
+String block01 = CharStreams.toString(
+new InputStreamReader(block01Stream, StandardCharsets.UTF_8));
--- End diff --

Thanks, updated!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-25 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r153049707
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala ---
@@ -52,8 +52,9 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) 
extends BlockId {
 // Format of the shuffle block ids (including data and index) should be 
kept in sync with
 // 
org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
 @DeveloperApi
-case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) 
extends BlockId {
-  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + 
reduceId
+case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int, 
length: Int = 1)
+  extends BlockId {
+  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + 
reduceId + "_" + length
--- End diff --

`ContinuousShuffleBlockIds` looks like a good idea, let me try.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-25 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r153049650
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
--- End diff --

Without this info, it looks hard to know continuous shuffle block read 
really happen, and I found `getLocalBytes` had similar debug info also.
```
logDebug(s"Getting local block $blockId as bytes")
```
How about keeping it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152980151
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala ---
@@ -52,8 +52,9 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) 
extends BlockId {
 // Format of the shuffle block ids (including data and index) should be 
kept in sync with
 // 
org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
 @DeveloperApi
-case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) 
extends BlockId {
-  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + 
reduceId
+case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int, 
length: Int = 1)
+  extends BlockId {
+  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + 
reduceId + "_" + length
--- End diff --

these are semi-public interfaces, can we create a new block id 
`ContinuousShuffleBlockIds`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-24 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152976816
  
--- Diff: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
 ---
@@ -110,6 +110,13 @@ public void testSortShuffleBlocks() throws IOException 
{
 new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
 block1Stream.close();
 assertEquals(sortBlock1, block1);
+
+InputStream block01Stream =
+resolver.getBlockData("app0", "exec0", 0, 0, 0, 
2).createInputStream();
+String block01 = CharStreams.toString(
+new InputStreamReader(block01Stream, StandardCharsets.UTF_8));
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-24 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152976135
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 ---
@@ -165,13 +165,23 @@ public ManagedBuffer getBlockData(
   String execId,
   int shuffleId,
   int mapId,
-  int reduceId) {
+  int reduceId,
+  int length) {
 ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, 
execId));
 if (executor == null) {
   throw new RuntimeException(
 String.format("Executor is not registered (appId=%s, execId=%s)", 
appId, execId));
 }
-return getSortBasedShuffleBlockData(executor, shuffleId, mapId, 
reduceId);
+return getSortBasedShuffleBlockData(executor, shuffleId, mapId, 
reduceId, length);
+  }
+
+  public ManagedBuffer getBlockData(
--- End diff --

nit: we should move the original comment here, and explain the different 
usages of these two functions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-24 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152976754
  
--- Diff: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
 ---
@@ -110,6 +110,13 @@ public void testSortShuffleBlocks() throws IOException 
{
 new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
 block1Stream.close();
 assertEquals(sortBlock1, block1);
+
+InputStream block01Stream =
+resolver.getBlockData("app0", "exec0", 0, 0, 0, 
2).createInputStream();
--- End diff --

nit: please follow the above indents format.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-24 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152978408
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala ---
@@ -52,8 +52,9 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) 
extends BlockId {
 // Format of the shuffle block ids (including data and index) should be 
kept in sync with
 // 
org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
 @DeveloperApi
-case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) 
extends BlockId {
-  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + 
reduceId
+case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int, 
length: Int = 1)
+  extends BlockId {
+  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + 
reduceId + "_" + length
--- End diff --

nit: maybe `s"shuffle_$shuffleId_$mapId_$reduceId_$length"`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-24 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152962372
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
--- End diff --

Ok, I will remove it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-24 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152961094
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
 val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
 try {
   ByteStreams.skipFully(in, blockId.reduceId * 8)
   val offset = in.readLong()
+  ByteStreams.skipFully(in, (blockId.length - 1) * 8)
--- End diff --

Sure, for example, when startPartition = 3, endPartition = 8, it means we 
need [3, 8) and length = 5.

Line 204: ByteStreams.skipFully(3 * 8), will skip 0, 1, 2
Line 205: offset = in. readLong, we got startPartition(3)'s offset
Line 206: ByteStreams.skipFully((5 - 1) * 8), will skip 4, 5, 6, 7
Line 207: nextOffset = in.readLong(), now we got endPartition(8)'s offset

When length is "1", zero should be correct. We don't need to skip anything, 
and Line 207's readLong will get endPartition's offset.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152891920
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
 val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
 try {
   ByteStreams.skipFully(in, blockId.reduceId * 8)
   val offset = in.readLong()
+  ByteStreams.skipFully(in, (blockId.length - 1) * 8)
--- End diff --

Also if length is "1", then this will always be Zero.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152891792
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -812,10 +812,13 @@ private[spark] object MapOutputTracker extends 
Logging {
 logError(errorMessage)
 throw new MetadataFetchFailedException(shuffleId, startPartition, 
errorMessage)
   } else {
+var totalSize = 0L
 for (part <- startPartition until endPartition) {
-  splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) 
+=
-((ShuffleBlockId(shuffleId, mapId, part), 
status.getSizeForBlock(part)))
+  totalSize += status.getSizeForBlock(part)
--- End diff --

This can be simplified like: `val totalSize = (startPartition until 
endPartition).map(status.getSizeForXXX).sum`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152891172
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
--- End diff --

Not necessary to add this, I guess this is mainly for your debug purpose.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152891438
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
 val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
 try {
   ByteStreams.skipFully(in, blockId.reduceId * 8)
   val offset = in.readLong()
+  ByteStreams.skipFully(in, (blockId.length - 1) * 8)
--- End diff --

I doubt this line is not correct, this seems change the semantics, for 
example if startPartition is 3, endPartition is 8, originally it should be 
(3\*8), now it changes to (4\*8), can you please explain more?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152193203
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -812,10 +812,14 @@ private[spark] object MapOutputTracker extends 
Logging {
 logError(errorMessage)
 throw new MetadataFetchFailedException(shuffleId, startPartition, 
errorMessage)
   } else {
+var n = 0
+var totalSize = 0L
 for (part <- startPartition until endPartition) {
-  splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) 
+=
-((ShuffleBlockId(shuffleId, mapId, part), 
status.getSizeForBlock(part)))
+  n += 1
+  totalSize += status.getSizeForBlock(part)
 }
--- End diff --

`n` can be `numPartitions`, and directly get by `endPartition - 
startPartition` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-19 Thread yucai
GitHub user yucai opened a pull request:

https://github.com/apache/spark/pull/19788

[SPARK-9853][Core] Optimize shuffle fetch of contiguous partition IDs

## What changes were proposed in this pull request?

Optimize shuffle fetch of contiguous partition IDs

## How was this patch tested?

Add new unit tests, AdaptiveSchedulingSuite, BlockIdSuite etc.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yucai/spark shuffle_fetch_opt

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19788.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19788


commit e947bcb39d1436751e68e033871baba5a77f6d0e
Author: yucai 
Date:   2017-11-20T03:04:28Z

[SPARK-9853][Core] Optimize shuffle fetch of contiguous partition IDs




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org