spark git commit: [SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher.

2017-07-10 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 3bfad9d42 -> 40fd0ce7f


[SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher.

When `RetryingBlockFetcher` retries fetching blocks. There could be two 
`DownloadCallback`s download the same content to the same target file. It could 
cause `ShuffleBlockFetcherIterator` reading a partial result.

This pr proposes to create and delete the tmp files in `OneForOneBlockFetcher`

Author: jinxing 
Author: Shixiong Zhu 

Closes #18565 from jinxing64/SPARK-21342.

(cherry picked from commit 6a06c4b03c4dd86241fb9d11b4360371488f0e53)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40fd0ce7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40fd0ce7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40fd0ce7

Branch: refs/heads/branch-2.2
Commit: 40fd0ce7f2c2facb96fc5d613bc7b6e4b573d9f7
Parents: 3bfad9d
Author: jinxing 
Authored: Mon Jul 10 21:06:58 2017 +0800
Committer: Wenchen Fan 
Committed: Mon Jul 10 21:10:02 2017 +0800

--
 .../network/shuffle/ExternalShuffleClient.java  |  7 ++--
 .../network/shuffle/OneForOneBlockFetcher.java  | 34 +++---
 .../spark/network/shuffle/ShuffleClient.java| 13 +--
 .../network/shuffle/TempShuffleFileManager.java | 36 
 .../network/sasl/SaslIntegrationSuite.java  |  2 +-
 .../shuffle/OneForOneBlockFetcherSuite.java |  2 +-
 .../spark/network/BlockTransferService.scala|  8 ++---
 .../netty/NettyBlockTransferService.scala   |  9 +++--
 .../storage/ShuffleBlockFetcherIterator.scala   | 28 ++-
 .../spark/storage/BlockManagerSuite.scala   |  5 ++-
 .../ShuffleBlockFetcherIteratorSuite.scala  | 10 +++---
 11 files changed, 108 insertions(+), 46 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
--
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 269fa72..39af9d5 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.network.shuffle;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -88,15 +87,15 @@ public class ExternalShuffleClient extends ShuffleClient {
   String execId,
   String[] blockIds,
   BlockFetchingListener listener,
-  File[] shuffleFiles) {
+  TempShuffleFileManager tempShuffleFileManager) {
 checkInit();
 logger.debug("External shuffle fetch from {}:{} (executor id {})", host, 
port, execId);
 try {
   RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
   (blockIds1, listener1) -> {
 TransportClient client = clientFactory.createClient(host, port);
-new OneForOneBlockFetcher(client, appId, execId, blockIds1, 
listener1, conf,
-  shuffleFiles).start();
+new OneForOneBlockFetcher(client, appId, execId,
+  blockIds1, listener1, conf, tempShuffleFileManager).start();
   };
 
   int maxRetries = conf.maxIORetries();

http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
--
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
index d46ce2e..2f160d1 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -57,30 +57,36 @@ public class OneForOneBlockFetcher {
   private final String[] blockIds;
   private final BlockFetchingListener listener;
   private final ChunkReceivedCallback chunkCallback;
-  private TransportConf transportConf = null;
-  private File[] shuffleFiles = null;
+  private final TransportConf transportConf;
+  private final TempShuffleFileManager tempShuffleFileManager;
 
   private StreamHandle streamHandle = null;
 
   public 

spark git commit: [SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher.

2017-07-10 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 647963a26 -> 6a06c4b03


[SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher.

## What changes were proposed in this pull request?

When `RetryingBlockFetcher` retries fetching blocks. There could be two 
`DownloadCallback`s download the same content to the same target file. It could 
cause `ShuffleBlockFetcherIterator` reading a partial result.

This pr proposes to create and delete the tmp files in `OneForOneBlockFetcher`

Author: jinxing 
Author: Shixiong Zhu 

Closes #18565 from jinxing64/SPARK-21342.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a06c4b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a06c4b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a06c4b0

Branch: refs/heads/master
Commit: 6a06c4b03c4dd86241fb9d11b4360371488f0e53
Parents: 647963a
Author: jinxing 
Authored: Mon Jul 10 21:06:58 2017 +0800
Committer: Wenchen Fan 
Committed: Mon Jul 10 21:06:58 2017 +0800

--
 .../network/shuffle/ExternalShuffleClient.java  |  7 ++--
 .../network/shuffle/OneForOneBlockFetcher.java  | 34 +++---
 .../spark/network/shuffle/ShuffleClient.java| 13 +--
 .../network/shuffle/TempShuffleFileManager.java | 36 
 .../network/sasl/SaslIntegrationSuite.java  |  2 +-
 .../shuffle/OneForOneBlockFetcherSuite.java |  2 +-
 .../spark/network/BlockTransferService.scala|  8 ++---
 .../netty/NettyBlockTransferService.scala   |  9 +++--
 .../storage/ShuffleBlockFetcherIterator.scala   | 28 ++-
 .../spark/storage/BlockManagerSuite.scala   |  4 +--
 .../ShuffleBlockFetcherIteratorSuite.scala  | 10 +++---
 11 files changed, 108 insertions(+), 45 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
--
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 6ac9302..31bd24e 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.network.shuffle;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -91,15 +90,15 @@ public class ExternalShuffleClient extends ShuffleClient {
   String execId,
   String[] blockIds,
   BlockFetchingListener listener,
-  File[] shuffleFiles) {
+  TempShuffleFileManager tempShuffleFileManager) {
 checkInit();
 logger.debug("External shuffle fetch from {}:{} (executor id {})", host, 
port, execId);
 try {
   RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
   (blockIds1, listener1) -> {
 TransportClient client = clientFactory.createClient(host, port);
-new OneForOneBlockFetcher(client, appId, execId, blockIds1, 
listener1, conf,
-  shuffleFiles).start();
+new OneForOneBlockFetcher(client, appId, execId,
+  blockIds1, listener1, conf, tempShuffleFileManager).start();
   };
 
   int maxRetries = conf.maxIORetries();

http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
--
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
index d46ce2e..2f160d1 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -57,30 +57,36 @@ public class OneForOneBlockFetcher {
   private final String[] blockIds;
   private final BlockFetchingListener listener;
   private final ChunkReceivedCallback chunkCallback;
-  private TransportConf transportConf = null;
-  private File[] shuffleFiles = null;
+  private final TransportConf transportConf;
+  private final TempShuffleFileManager tempShuffleFileManager;
 
   private StreamHandle streamHandle = null;
 
   public OneForOneBlockFetcher(
+TransportClient client,
+String appId,
+String execId,
+