spark git commit: [SPARK-23524] Big local shuffle blocks should not be checked for corruption.

2018-03-07 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 ee6e79737 -> 86ca91551


[SPARK-23524] Big local shuffle blocks should not be checked for corruption.

## What changes were proposed in this pull request?

In current code, all local blocks will be checked for corruption no matter it's 
big or not.  The reasons are as below:

Size in FetchResult for local block is set to be 0 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L327)
SPARK-4105 meant to only check the small blocks(sizehttps://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L420

We can fix this and avoid the OOM.

## How was this patch tested?

UT added

Author: jx158167 

Closes #20685 from jinxing64/SPARK-23524.

(cherry picked from commit 77c91cc746f93e609c412f3a220495d9e931f696)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86ca9155
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86ca9155
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86ca9155

Branch: refs/heads/branch-2.3
Commit: 86ca91551522832141aedc17ba1e47dbeb44d970
Parents: ee6e797
Author: jx158167 
Authored: Wed Mar 7 20:08:32 2018 -0800
Committer: Wenchen Fan 
Committed: Wed Mar 7 20:08:43 2018 -0800

--
 .../storage/ShuffleBlockFetcherIterator.scala   | 14 +++---
 .../ShuffleBlockFetcherIteratorSuite.scala  | 45 
 2 files changed, 54 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/86ca9155/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 98b5a73..dd9df74 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -90,7 +90,7 @@ final class ShuffleBlockFetcherIterator(
   private[this] val startTime = System.currentTimeMillis
 
   /** Local blocks to fetch, excluding zero-sized blocks. */
-  private[this] val localBlocks = new ArrayBuffer[BlockId]()
+  private[this] val localBlocks = 
scala.collection.mutable.LinkedHashSet[BlockId]()
 
   /** Remote blocks to fetch, excluding zero-sized blocks. */
   private[this] val remoteBlocks = new HashSet[BlockId]()
@@ -316,6 +316,7 @@ final class ShuffleBlockFetcherIterator(
* track in-memory are the ManagedBuffer references themselves.
*/
   private[this] def fetchLocalBlocks() {
+logDebug(s"Start fetching local blocks: ${localBlocks.mkString(", ")}")
 val iter = localBlocks.iterator
 while (iter.hasNext) {
   val blockId = iter.next()
@@ -324,7 +325,8 @@ final class ShuffleBlockFetcherIterator(
 shuffleMetrics.incLocalBlocksFetched(1)
 shuffleMetrics.incLocalBytesRead(buf.size)
 buf.retain()
-results.put(new SuccessFetchResult(blockId, 
blockManager.blockManagerId, 0, buf, false))
+results.put(new SuccessFetchResult(blockId, 
blockManager.blockManagerId,
+  buf.size(), buf, false))
   } catch {
 case e: Exception =>
   // If we see an exception, stop immediately.
@@ -397,7 +399,9 @@ final class ShuffleBlockFetcherIterator(
 }
 shuffleMetrics.incRemoteBlocksFetched(1)
   }
-  bytesInFlight -= size
+  if (!localBlocks.contains(blockId)) {
+bytesInFlight -= size
+  }
   if (isNetworkReqDone) {
 reqsInFlight -= 1
 logDebug("Number of requests in flight " + reqsInFlight)
@@ -583,8 +587,8 @@ object ShuffleBlockFetcherIterator {
* Result of a fetch from a remote block successfully.
* @param blockId block id
* @param address BlockManager that the block was fetched from.
-   * @param size estimated size of the block, used to calculate bytesInFlight.
-   * Note that this is NOT the exact bytes.
+   * @param size estimated size of the block. Note that this is NOT the exact 
bytes.
+   * Size of remote block is used to calculate bytesInFlight.
* @param buf `ManagedBuffer` for the content.
* @param isNetworkReqDone Is this the last network request for this host in 
this fetch request.
*/

http://git-wip-us.apache.org/repos/asf/spark/blob/86ca9155/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala

spark git commit: [SPARK-23524] Big local shuffle blocks should not be checked for corruption.

2018-03-07 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master ac76eff6a -> 77c91cc74


[SPARK-23524] Big local shuffle blocks should not be checked for corruption.

## What changes were proposed in this pull request?

In current code, all local blocks will be checked for corruption no matter it's 
big or not.  The reasons are as below:

Size in FetchResult for local block is set to be 0 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L327)
SPARK-4105 meant to only check the small blocks(sizehttps://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L420

We can fix this and avoid the OOM.

## How was this patch tested?

UT added

Author: jx158167 

Closes #20685 from jinxing64/SPARK-23524.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77c91cc7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77c91cc7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77c91cc7

Branch: refs/heads/master
Commit: 77c91cc746f93e609c412f3a220495d9e931f696
Parents: ac76eff
Author: jx158167 
Authored: Wed Mar 7 20:08:32 2018 -0800
Committer: Wenchen Fan 
Committed: Wed Mar 7 20:08:32 2018 -0800

--
 .../storage/ShuffleBlockFetcherIterator.scala   | 14 +++---
 .../ShuffleBlockFetcherIteratorSuite.scala  | 45 
 2 files changed, 54 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/77c91cc7/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 98b5a73..dd9df74 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -90,7 +90,7 @@ final class ShuffleBlockFetcherIterator(
   private[this] val startTime = System.currentTimeMillis
 
   /** Local blocks to fetch, excluding zero-sized blocks. */
-  private[this] val localBlocks = new ArrayBuffer[BlockId]()
+  private[this] val localBlocks = 
scala.collection.mutable.LinkedHashSet[BlockId]()
 
   /** Remote blocks to fetch, excluding zero-sized blocks. */
   private[this] val remoteBlocks = new HashSet[BlockId]()
@@ -316,6 +316,7 @@ final class ShuffleBlockFetcherIterator(
* track in-memory are the ManagedBuffer references themselves.
*/
   private[this] def fetchLocalBlocks() {
+logDebug(s"Start fetching local blocks: ${localBlocks.mkString(", ")}")
 val iter = localBlocks.iterator
 while (iter.hasNext) {
   val blockId = iter.next()
@@ -324,7 +325,8 @@ final class ShuffleBlockFetcherIterator(
 shuffleMetrics.incLocalBlocksFetched(1)
 shuffleMetrics.incLocalBytesRead(buf.size)
 buf.retain()
-results.put(new SuccessFetchResult(blockId, 
blockManager.blockManagerId, 0, buf, false))
+results.put(new SuccessFetchResult(blockId, 
blockManager.blockManagerId,
+  buf.size(), buf, false))
   } catch {
 case e: Exception =>
   // If we see an exception, stop immediately.
@@ -397,7 +399,9 @@ final class ShuffleBlockFetcherIterator(
 }
 shuffleMetrics.incRemoteBlocksFetched(1)
   }
-  bytesInFlight -= size
+  if (!localBlocks.contains(blockId)) {
+bytesInFlight -= size
+  }
   if (isNetworkReqDone) {
 reqsInFlight -= 1
 logDebug("Number of requests in flight " + reqsInFlight)
@@ -583,8 +587,8 @@ object ShuffleBlockFetcherIterator {
* Result of a fetch from a remote block successfully.
* @param blockId block id
* @param address BlockManager that the block was fetched from.
-   * @param size estimated size of the block, used to calculate bytesInFlight.
-   * Note that this is NOT the exact bytes.
+   * @param size estimated size of the block. Note that this is NOT the exact 
bytes.
+   * Size of remote block is used to calculate bytesInFlight.
* @param buf `ManagedBuffer` for the content.
* @param isNetworkReqDone Is this the last network request for this host in 
this fetch request.
*/

http://git-wip-us.apache.org/repos/asf/spark/blob/77c91cc7/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala