Repository: spark
Updated Branches:
  refs/heads/master f2b3525c1 -> 14c4a62c1


[SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's Files API to replace 
FileInputStream/FileOutputStream in some critical paths"

## What changes were proposed in this pull request?

This reverts commit 5fd0294ff8960982cfb3b901f84bc91a9f51bf28 because of a huge 
performance regression.
I manually fixed a minor conflict in `OneForOneBlockFetcher.java`.

`Files.newInputStream` returns `sun.nio.ch.ChannelInputStream`. 
`ChannelInputStream` doesn't override `InputStream.skip`, so it's using the 
default `InputStream.skip` which just consumes and discards data. This causes a 
huge performance regression when reading shuffle files.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zsxw...@gmail.com>

Closes #20119 from zsxwing/revert-SPARK-21475.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14c4a62c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14c4a62c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14c4a62c

Branch: refs/heads/master
Commit: 14c4a62c126ac5e542f7b82565b5f0fcf3e7fa5a
Parents: f2b3525
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Fri Dec 29 22:33:29 2017 -0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Fri Dec 29 22:33:29 2017 -0800

----------------------------------------------------------------------
 .../buffer/FileSegmentManagedBuffer.java        |  9 ++++-----
 .../network/shuffle/OneForOneBlockFetcher.java  |  4 ++--
 .../shuffle/ShuffleIndexInformation.java        |  4 ++--
 .../sort/BypassMergeSortShuffleWriter.java      | 16 +++++++--------
 .../spark/shuffle/sort/UnsafeShuffleWriter.java |  9 ++++-----
 .../shuffle/IndexShuffleBlockResolver.scala     |  6 ++----
 .../util/collection/ExternalAppendOnlyMap.scala | 21 +++++++++-----------
 .../spark/util/collection/ExternalSorter.scala  | 17 +++++++---------
 8 files changed, 37 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
index ea9b3ce..c20fab8 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -18,13 +18,12 @@
 package org.apache.spark.network.buffer;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
 
 import com.google.common.base.Objects;
 import com.google.common.io.ByteStreams;
@@ -94,9 +93,9 @@ public final class FileSegmentManagedBuffer extends 
ManagedBuffer {
 
   @Override
   public InputStream createInputStream() throws IOException {
-    InputStream is = null;
+    FileInputStream is = null;
     try {
-      is = Files.newInputStream(file.toPath());
+      is = new FileInputStream(file);
       ByteStreams.skipFully(is, offset);
       return new LimitedInputStream(is, length);
     } catch (IOException e) {
@@ -133,7 +132,7 @@ public final class FileSegmentManagedBuffer extends 
ManagedBuffer {
     if (conf.lazyFileDescriptor()) {
       return new DefaultFileRegion(file, offset, length);
     } else {
-      FileChannel fileChannel = FileChannel.open(file.toPath(), 
StandardOpenOption.READ);
+      FileChannel fileChannel = new FileInputStream(file).getChannel();
       return new DefaultFileRegion(fileChannel, offset, length);
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
index 3f2f20b..9cac7d0 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -18,11 +18,11 @@
 package org.apache.spark.network.shuffle;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
-import java.nio.file.Files;
 import java.util.Arrays;
 
 import org.slf4j.Logger;
@@ -165,7 +165,7 @@ public class OneForOneBlockFetcher {
 
     DownloadCallback(int chunkIndex) throws IOException {
       this.targetFile = tempFileManager.createTempFile();
-      this.channel = 
Channels.newChannel(Files.newOutputStream(targetFile.toPath()));
+      this.channel = Channels.newChannel(new FileOutputStream(targetFile));
       this.chunkIndex = chunkIndex;
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
index 386738e..eacf485 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
@@ -19,10 +19,10 @@ package org.apache.spark.network.shuffle;
 
 import java.io.DataInputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.LongBuffer;
-import java.nio.file.Files;
 
 /**
  * Keeps the index information for a particular map output
@@ -39,7 +39,7 @@ public class ShuffleIndexInformation {
     offsets = buffer.asLongBuffer();
     DataInputStream dis = null;
     try {
-      dis = new DataInputStream(Files.newInputStream(indexFile.toPath()));
+      dis = new DataInputStream(new FileInputStream(indexFile));
       dis.readFully(buffer.array());
     } finally {
       if (dis != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index a9b5236..323a5d3 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -18,9 +18,9 @@
 package org.apache.spark.shuffle.sort;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.channels.FileChannel;
-import static java.nio.file.StandardOpenOption.*;
 import javax.annotation.Nullable;
 
 import scala.None$;
@@ -75,6 +75,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
   private static final Logger logger = 
LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);
 
   private final int fileBufferSize;
+  private final boolean transferToEnabled;
   private final int numPartitions;
   private final BlockManager blockManager;
   private final Partitioner partitioner;
@@ -106,6 +107,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
       SparkConf conf) {
     // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
     this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", 
"32k") * 1024;
+    this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
     this.blockManager = blockManager;
     final ShuffleDependency<K, V, V> dep = handle.dependency();
     this.mapId = mapId;
@@ -186,21 +188,17 @@ final class BypassMergeSortShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
       return lengths;
     }
 
-    // This file needs to opened in append mode in order to work around a 
Linux kernel bug that
-    // affects transferTo; see SPARK-3948 for more details.
-    final FileChannel out = FileChannel.open(outputFile.toPath(), WRITE, 
APPEND, CREATE);
+    final FileOutputStream out = new FileOutputStream(outputFile, true);
     final long writeStartTime = System.nanoTime();
     boolean threwException = true;
     try {
       for (int i = 0; i < numPartitions; i++) {
         final File file = partitionWriterSegments[i].file();
         if (file.exists()) {
-          final FileChannel in = FileChannel.open(file.toPath(), READ);
+          final FileInputStream in = new FileInputStream(file);
           boolean copyThrewException = true;
           try {
-            long size = in.size();
-            Utils.copyFileStreamNIO(in, out, 0, size);
-            lengths[i] = size;
+            lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
             copyThrewException = false;
           } finally {
             Closeables.close(in, copyThrewException);

http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index e9c2a69..4839d04 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -20,7 +20,6 @@ package org.apache.spark.shuffle.sort;
 import javax.annotation.Nullable;
 import java.io.*;
 import java.nio.channels.FileChannel;
-import static java.nio.file.StandardOpenOption.*;
 import java.util.Iterator;
 
 import scala.Option;
@@ -291,7 +290,7 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
     final boolean encryptionEnabled = 
blockManager.serializerManager().encryptionEnabled();
     try {
       if (spills.length == 0) {
-        java.nio.file.Files.newOutputStream(outputFile.toPath()).close(); // 
Create an empty file
+        new FileOutputStream(outputFile).close(); // Create an empty file
         return new long[partitioner.numPartitions()];
       } else if (spills.length == 1) {
         // Here, we don't need to perform any metrics updates because the 
bytes written to this
@@ -368,7 +367,7 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
     final InputStream[] spillInputStreams = new InputStream[spills.length];
 
     final OutputStream bos = new BufferedOutputStream(
-            java.nio.file.Files.newOutputStream(outputFile.toPath()),
+            new FileOutputStream(outputFile),
             outputBufferSizeInBytes);
     // Use a counting output stream to avoid having to close the underlying 
file and ask
     // the file system for its size after each partition is written.
@@ -443,11 +442,11 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
     boolean threwException = true;
     try {
       for (int i = 0; i < spills.length; i++) {
-        spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), 
READ);
+        spillInputChannels[i] = new 
FileInputStream(spills[i].file).getChannel();
       }
       // This file needs to opened in append mode in order to work around a 
Linux kernel bug that
       // affects transferTo; see SPARK-3948 for more details.
-      mergedFileOutputChannel = FileChannel.open(outputFile.toPath(), WRITE, 
CREATE, APPEND);
+      mergedFileOutputChannel = new FileOutputStream(outputFile, 
true).getChannel();
 
       long bytesWrittenToMergedFile = 0;
       for (int partition = 0; partition < numPartitions; partition++) {

http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 94a3a78..1554048 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.shuffle
 
 import java.io._
-import java.nio.file.Files
 
 import com.google.common.io.ByteStreams
 
@@ -142,8 +141,7 @@ private[spark] class IndexShuffleBlockResolver(
     val indexFile = getIndexFile(shuffleId, mapId)
     val indexTmp = Utils.tempFileWith(indexFile)
     try {
-      val out = new DataOutputStream(
-        new BufferedOutputStream(Files.newOutputStream(indexTmp.toPath)))
+      val out = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(indexTmp)))
       Utils.tryWithSafeFinally {
         // We take in lengths of each block, need to convert it to offsets.
         var offset = 0L
@@ -198,7 +196,7 @@ private[spark] class IndexShuffleBlockResolver(
     // find out the consolidated file, then the offset within that from our 
index
     val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
-    val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
+    val in = new DataInputStream(new FileInputStream(indexFile))
     try {
       ByteStreams.skipFully(in, blockId.reduceId * 8)
       val offset = in.readLong()

http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 6f5b5bb..375f4a6 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -18,8 +18,6 @@
 package org.apache.spark.util.collection
 
 import java.io._
-import java.nio.channels.{Channels, FileChannel}
-import java.nio.file.StandardOpenOption
 import java.util.Comparator
 
 import scala.collection.BufferedIterator
@@ -461,7 +459,7 @@ class ExternalAppendOnlyMap[K, V, C](
     )
 
     private var batchIndex = 0  // Which batch we're in
-    private var fileChannel: FileChannel = null
+    private var fileStream: FileInputStream = null
 
     // An intermediate stream that reads from exactly one batch
     // This guards against pre-fetching and other arbitrary behavior of higher 
level streams
@@ -478,14 +476,14 @@ class ExternalAppendOnlyMap[K, V, C](
       if (batchIndex < batchOffsets.length - 1) {
         if (deserializeStream != null) {
           deserializeStream.close()
-          fileChannel.close()
+          fileStream.close()
           deserializeStream = null
-          fileChannel = null
+          fileStream = null
         }
 
         val start = batchOffsets(batchIndex)
-        fileChannel = FileChannel.open(file.toPath, StandardOpenOption.READ)
-        fileChannel.position(start)
+        fileStream = new FileInputStream(file)
+        fileStream.getChannel.position(start)
         batchIndex += 1
 
         val end = batchOffsets(batchIndex)
@@ -493,8 +491,7 @@ class ExternalAppendOnlyMap[K, V, C](
         assert(end >= start, "start = " + start + ", end = " + end +
           ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
 
-        val bufferedStream = new BufferedInputStream(
-          ByteStreams.limit(Channels.newInputStream(fileChannel), end - start))
+        val bufferedStream = new 
BufferedInputStream(ByteStreams.limit(fileStream, end - start))
         val wrappedStream = serializerManager.wrapStream(blockId, 
bufferedStream)
         ser.deserializeStream(wrappedStream)
       } else {
@@ -554,9 +551,9 @@ class ExternalAppendOnlyMap[K, V, C](
         ds.close()
         deserializeStream = null
       }
-      if (fileChannel != null) {
-        fileChannel.close()
-        fileChannel = null
+      if (fileStream != null) {
+        fileStream.close()
+        fileStream = null
       }
       if (file.exists()) {
         if (!file.delete()) {

http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 3593cfd..176f84f 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -18,8 +18,6 @@
 package org.apache.spark.util.collection
 
 import java.io._
-import java.nio.channels.{Channels, FileChannel}
-import java.nio.file.StandardOpenOption
 import java.util.Comparator
 
 import scala.collection.mutable
@@ -494,7 +492,7 @@ private[spark] class ExternalSorter[K, V, C](
 
     // Intermediate file and deserializer streams that read from exactly one 
batch
     // This guards against pre-fetching and other arbitrary behavior of higher 
level streams
-    var fileChannel: FileChannel = null
+    var fileStream: FileInputStream = null
     var deserializeStream = nextBatchStream()  // Also sets fileStream
 
     var nextItem: (K, C) = null
@@ -507,14 +505,14 @@ private[spark] class ExternalSorter[K, V, C](
       if (batchId < batchOffsets.length - 1) {
         if (deserializeStream != null) {
           deserializeStream.close()
-          fileChannel.close()
+          fileStream.close()
           deserializeStream = null
-          fileChannel = null
+          fileStream = null
         }
 
         val start = batchOffsets(batchId)
-        fileChannel = FileChannel.open(spill.file.toPath, 
StandardOpenOption.READ)
-        fileChannel.position(start)
+        fileStream = new FileInputStream(spill.file)
+        fileStream.getChannel.position(start)
         batchId += 1
 
         val end = batchOffsets(batchId)
@@ -522,8 +520,7 @@ private[spark] class ExternalSorter[K, V, C](
         assert(end >= start, "start = " + start + ", end = " + end +
           ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
 
-        val bufferedStream = new BufferedInputStream(
-          ByteStreams.limit(Channels.newInputStream(fileChannel), end - start))
+        val bufferedStream = new 
BufferedInputStream(ByteStreams.limit(fileStream, end - start))
 
         val wrappedStream = serializerManager.wrapStream(spill.blockId, 
bufferedStream)
         serInstance.deserializeStream(wrappedStream)
@@ -613,7 +610,7 @@ private[spark] class ExternalSorter[K, V, C](
       batchId = batchOffsets.length  // Prevent reading any other batch
       val ds = deserializeStream
       deserializeStream = null
-      fileChannel = null
+      fileStream = null
       if (ds != null) {
         ds.close()
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to