spark git commit: [SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher.
Repository: spark Updated Branches: refs/heads/branch-2.2 3bfad9d42 -> 40fd0ce7f [SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher. When `RetryingBlockFetcher` retries fetching blocks. There could be two `DownloadCallback`s download the same content to the same target file. It could cause `ShuffleBlockFetcherIterator` reading a partial result. This pr proposes to create and delete the tmp files in `OneForOneBlockFetcher` Author: jinxingAuthor: Shixiong Zhu Closes #18565 from jinxing64/SPARK-21342. (cherry picked from commit 6a06c4b03c4dd86241fb9d11b4360371488f0e53) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40fd0ce7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40fd0ce7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40fd0ce7 Branch: refs/heads/branch-2.2 Commit: 40fd0ce7f2c2facb96fc5d613bc7b6e4b573d9f7 Parents: 3bfad9d Author: jinxing Authored: Mon Jul 10 21:06:58 2017 +0800 Committer: Wenchen Fan Committed: Mon Jul 10 21:10:02 2017 +0800 -- .../network/shuffle/ExternalShuffleClient.java | 7 ++-- .../network/shuffle/OneForOneBlockFetcher.java | 34 +++--- .../spark/network/shuffle/ShuffleClient.java| 13 +-- .../network/shuffle/TempShuffleFileManager.java | 36 .../network/sasl/SaslIntegrationSuite.java | 2 +- .../shuffle/OneForOneBlockFetcherSuite.java | 2 +- .../spark/network/BlockTransferService.scala| 8 ++--- .../netty/NettyBlockTransferService.scala | 9 +++-- .../storage/ShuffleBlockFetcherIterator.scala | 28 ++- .../spark/storage/BlockManagerSuite.scala | 5 ++- .../ShuffleBlockFetcherIteratorSuite.scala | 10 +++--- 11 files changed, 108 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java -- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 269fa72..39af9d5 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -17,7 +17,6 @@ package org.apache.spark.network.shuffle; -import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -88,15 +87,15 @@ public class ExternalShuffleClient extends ShuffleClient { String execId, String[] blockIds, BlockFetchingListener listener, - File[] shuffleFiles) { + TempShuffleFileManager tempShuffleFileManager) { checkInit(); logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId); try { RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = (blockIds1, listener1) -> { TransportClient client = clientFactory.createClient(host, port); -new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1, conf, - shuffleFiles).start(); +new OneForOneBlockFetcher(client, appId, execId, + blockIds1, listener1, conf, tempShuffleFileManager).start(); }; int maxRetries = conf.maxIORetries(); http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java -- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java index d46ce2e..2f160d1 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java @@ -57,30 +57,36 @@ public class OneForOneBlockFetcher { private final String[] blockIds; private final BlockFetchingListener listener; private final ChunkReceivedCallback chunkCallback; - private TransportConf transportConf = null; - private File[] shuffleFiles = null; + private final TransportConf transportConf; + private final TempShuffleFileManager tempShuffleFileManager; private StreamHandle streamHandle = null; public
spark git commit: [SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher.
Repository: spark Updated Branches: refs/heads/master 647963a26 -> 6a06c4b03 [SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher. ## What changes were proposed in this pull request? When `RetryingBlockFetcher` retries fetching blocks. There could be two `DownloadCallback`s download the same content to the same target file. It could cause `ShuffleBlockFetcherIterator` reading a partial result. This pr proposes to create and delete the tmp files in `OneForOneBlockFetcher` Author: jinxingAuthor: Shixiong Zhu Closes #18565 from jinxing64/SPARK-21342. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a06c4b0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a06c4b0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a06c4b0 Branch: refs/heads/master Commit: 6a06c4b03c4dd86241fb9d11b4360371488f0e53 Parents: 647963a Author: jinxing Authored: Mon Jul 10 21:06:58 2017 +0800 Committer: Wenchen Fan Committed: Mon Jul 10 21:06:58 2017 +0800 -- .../network/shuffle/ExternalShuffleClient.java | 7 ++-- .../network/shuffle/OneForOneBlockFetcher.java | 34 +++--- .../spark/network/shuffle/ShuffleClient.java| 13 +-- .../network/shuffle/TempShuffleFileManager.java | 36 .../network/sasl/SaslIntegrationSuite.java | 2 +- .../shuffle/OneForOneBlockFetcherSuite.java | 2 +- .../spark/network/BlockTransferService.scala| 8 ++--- .../netty/NettyBlockTransferService.scala | 9 +++-- .../storage/ShuffleBlockFetcherIterator.scala | 28 ++- .../spark/storage/BlockManagerSuite.scala | 4 +-- .../ShuffleBlockFetcherIteratorSuite.scala | 10 +++--- 11 files changed, 108 insertions(+), 45 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java -- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 6ac9302..31bd24e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -17,7 +17,6 @@ package org.apache.spark.network.shuffle; -import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -91,15 +90,15 @@ public class ExternalShuffleClient extends ShuffleClient { String execId, String[] blockIds, BlockFetchingListener listener, - File[] shuffleFiles) { + TempShuffleFileManager tempShuffleFileManager) { checkInit(); logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId); try { RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = (blockIds1, listener1) -> { TransportClient client = clientFactory.createClient(host, port); -new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1, conf, - shuffleFiles).start(); +new OneForOneBlockFetcher(client, appId, execId, + blockIds1, listener1, conf, tempShuffleFileManager).start(); }; int maxRetries = conf.maxIORetries(); http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java -- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java index d46ce2e..2f160d1 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java @@ -57,30 +57,36 @@ public class OneForOneBlockFetcher { private final String[] blockIds; private final BlockFetchingListener listener; private final ChunkReceivedCallback chunkCallback; - private TransportConf transportConf = null; - private File[] shuffleFiles = null; + private final TransportConf transportConf; + private final TempShuffleFileManager tempShuffleFileManager; private StreamHandle streamHandle = null; public OneForOneBlockFetcher( +TransportClient client, +String appId, +String execId, +