Repository: spark Updated Branches: refs/heads/master f2b3525c1 -> 14c4a62c1
[SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths" ## What changes were proposed in this pull request? This reverts commit 5fd0294ff8960982cfb3b901f84bc91a9f51bf28 because of a huge performance regression. I manually fixed a minor conflict in `OneForOneBlockFetcher.java`. `Files.newInputStream` returns `sun.nio.ch.ChannelInputStream`. `ChannelInputStream` doesn't override `InputStream.skip`, so it's using the default `InputStream.skip` which just consumes and discards data. This causes a huge performance regression when reading shuffle files. ## How was this patch tested? Jenkins Author: Shixiong Zhu <zsxw...@gmail.com> Closes #20119 from zsxwing/revert-SPARK-21475. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14c4a62c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14c4a62c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14c4a62c Branch: refs/heads/master Commit: 14c4a62c126ac5e542f7b82565b5f0fcf3e7fa5a Parents: f2b3525 Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Fri Dec 29 22:33:29 2017 -0800 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Fri Dec 29 22:33:29 2017 -0800 ---------------------------------------------------------------------- .../buffer/FileSegmentManagedBuffer.java | 9 ++++----- .../network/shuffle/OneForOneBlockFetcher.java | 4 ++-- .../shuffle/ShuffleIndexInformation.java | 4 ++-- .../sort/BypassMergeSortShuffleWriter.java | 16 +++++++-------- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 9 ++++----- .../shuffle/IndexShuffleBlockResolver.scala | 6 ++---- .../util/collection/ExternalAppendOnlyMap.scala | 21 +++++++++----------- .../spark/util/collection/ExternalSorter.scala | 17 +++++++--------- 8 files changed, 37 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index ea9b3ce..c20fab8 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -18,13 +18,12 @@ package org.apache.spark.network.buffer; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.StandardOpenOption; import com.google.common.base.Objects; import com.google.common.io.ByteStreams; @@ -94,9 +93,9 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer { @Override public InputStream createInputStream() throws IOException { - InputStream is = null; + FileInputStream is = null; try { - is = Files.newInputStream(file.toPath()); + is = new FileInputStream(file); ByteStreams.skipFully(is, offset); return new LimitedInputStream(is, length); } catch (IOException e) { @@ -133,7 +132,7 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer { if (conf.lazyFileDescriptor()) { return new DefaultFileRegion(file, offset, length); } else { - FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ); + FileChannel fileChannel = new FileInputStream(file).getChannel(); return new DefaultFileRegion(fileChannel, offset, length); } } http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java index 3f2f20b..9cac7d0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java @@ -18,11 +18,11 @@ package org.apache.spark.network.shuffle; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; -import java.nio.file.Files; import java.util.Arrays; import org.slf4j.Logger; @@ -165,7 +165,7 @@ public class OneForOneBlockFetcher { DownloadCallback(int chunkIndex) throws IOException { this.targetFile = tempFileManager.createTempFile(); - this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath())); + this.channel = Channels.newChannel(new FileOutputStream(targetFile)); this.chunkIndex = chunkIndex; } http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java index 386738e..eacf485 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java @@ -19,10 +19,10 @@ package org.apache.spark.network.shuffle; import java.io.DataInputStream; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.LongBuffer; -import java.nio.file.Files; /** * Keeps the index information for a particular map output @@ -39,7 +39,7 @@ public class ShuffleIndexInformation { offsets = buffer.asLongBuffer(); DataInputStream dis = null; try { - dis = new DataInputStream(Files.newInputStream(indexFile.toPath())); + dis = new DataInputStream(new FileInputStream(indexFile)); dis.readFully(buffer.array()); } finally { if (dis != null) { http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index a9b5236..323a5d3 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -18,9 +18,9 @@ package org.apache.spark.shuffle.sort; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; -import java.nio.channels.FileChannel; -import static java.nio.file.StandardOpenOption.*; import javax.annotation.Nullable; import scala.None$; @@ -75,6 +75,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> { private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class); private final int fileBufferSize; + private final boolean transferToEnabled; private final int numPartitions; private final BlockManager blockManager; private final Partitioner partitioner; @@ -106,6 +107,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> { SparkConf conf) { // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; + this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true); this.blockManager = blockManager; final ShuffleDependency<K, V, V> dep = handle.dependency(); this.mapId = mapId; @@ -186,21 +188,17 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> { return lengths; } - // This file needs to opened in append mode in order to work around a Linux kernel bug that - // affects transferTo; see SPARK-3948 for more details. - final FileChannel out = FileChannel.open(outputFile.toPath(), WRITE, APPEND, CREATE); + final FileOutputStream out = new FileOutputStream(outputFile, true); final long writeStartTime = System.nanoTime(); boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { final File file = partitionWriterSegments[i].file(); if (file.exists()) { - final FileChannel in = FileChannel.open(file.toPath(), READ); + final FileInputStream in = new FileInputStream(file); boolean copyThrewException = true; try { - long size = in.size(); - Utils.copyFileStreamNIO(in, out, 0, size); - lengths[i] = size; + lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); copyThrewException = false; } finally { Closeables.close(in, copyThrewException); http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index e9c2a69..4839d04 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -20,7 +20,6 @@ package org.apache.spark.shuffle.sort; import javax.annotation.Nullable; import java.io.*; import java.nio.channels.FileChannel; -import static java.nio.file.StandardOpenOption.*; import java.util.Iterator; import scala.Option; @@ -291,7 +290,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled(); try { if (spills.length == 0) { - java.nio.file.Files.newOutputStream(outputFile.toPath()).close(); // Create an empty file + new FileOutputStream(outputFile).close(); // Create an empty file return new long[partitioner.numPartitions()]; } else if (spills.length == 1) { // Here, we don't need to perform any metrics updates because the bytes written to this @@ -368,7 +367,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { final InputStream[] spillInputStreams = new InputStream[spills.length]; final OutputStream bos = new BufferedOutputStream( - java.nio.file.Files.newOutputStream(outputFile.toPath()), + new FileOutputStream(outputFile), outputBufferSizeInBytes); // Use a counting output stream to avoid having to close the underlying file and ask // the file system for its size after each partition is written. @@ -443,11 +442,11 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { boolean threwException = true; try { for (int i = 0; i < spills.length; i++) { - spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), READ); + spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel(); } // This file needs to opened in append mode in order to work around a Linux kernel bug that // affects transferTo; see SPARK-3948 for more details. - mergedFileOutputChannel = FileChannel.open(outputFile.toPath(), WRITE, CREATE, APPEND); + mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel(); long bytesWrittenToMergedFile = 0; for (int partition = 0; partition < numPartitions; partition++) { http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 94a3a78..1554048 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -18,7 +18,6 @@ package org.apache.spark.shuffle import java.io._ -import java.nio.file.Files import com.google.common.io.ByteStreams @@ -142,8 +141,7 @@ private[spark] class IndexShuffleBlockResolver( val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) try { - val out = new DataOutputStream( - new BufferedOutputStream(Files.newOutputStream(indexTmp.toPath))) + val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) Utils.tryWithSafeFinally { // We take in lengths of each block, need to convert it to offsets. var offset = 0L @@ -198,7 +196,7 @@ private[spark] class IndexShuffleBlockResolver( // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) - val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) + val in = new DataInputStream(new FileInputStream(indexFile)) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 6f5b5bb..375f4a6 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -18,8 +18,6 @@ package org.apache.spark.util.collection import java.io._ -import java.nio.channels.{Channels, FileChannel} -import java.nio.file.StandardOpenOption import java.util.Comparator import scala.collection.BufferedIterator @@ -461,7 +459,7 @@ class ExternalAppendOnlyMap[K, V, C]( ) private var batchIndex = 0 // Which batch we're in - private var fileChannel: FileChannel = null + private var fileStream: FileInputStream = null // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams @@ -478,14 +476,14 @@ class ExternalAppendOnlyMap[K, V, C]( if (batchIndex < batchOffsets.length - 1) { if (deserializeStream != null) { deserializeStream.close() - fileChannel.close() + fileStream.close() deserializeStream = null - fileChannel = null + fileStream = null } val start = batchOffsets(batchIndex) - fileChannel = FileChannel.open(file.toPath, StandardOpenOption.READ) - fileChannel.position(start) + fileStream = new FileInputStream(file) + fileStream.getChannel.position(start) batchIndex += 1 val end = batchOffsets(batchIndex) @@ -493,8 +491,7 @@ class ExternalAppendOnlyMap[K, V, C]( assert(end >= start, "start = " + start + ", end = " + end + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) - val bufferedStream = new BufferedInputStream( - ByteStreams.limit(Channels.newInputStream(fileChannel), end - start)) + val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) val wrappedStream = serializerManager.wrapStream(blockId, bufferedStream) ser.deserializeStream(wrappedStream) } else { @@ -554,9 +551,9 @@ class ExternalAppendOnlyMap[K, V, C]( ds.close() deserializeStream = null } - if (fileChannel != null) { - fileChannel.close() - fileChannel = null + if (fileStream != null) { + fileStream.close() + fileStream = null } if (file.exists()) { if (!file.delete()) { http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 3593cfd..176f84f 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -18,8 +18,6 @@ package org.apache.spark.util.collection import java.io._ -import java.nio.channels.{Channels, FileChannel} -import java.nio.file.StandardOpenOption import java.util.Comparator import scala.collection.mutable @@ -494,7 +492,7 @@ private[spark] class ExternalSorter[K, V, C]( // Intermediate file and deserializer streams that read from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams - var fileChannel: FileChannel = null + var fileStream: FileInputStream = null var deserializeStream = nextBatchStream() // Also sets fileStream var nextItem: (K, C) = null @@ -507,14 +505,14 @@ private[spark] class ExternalSorter[K, V, C]( if (batchId < batchOffsets.length - 1) { if (deserializeStream != null) { deserializeStream.close() - fileChannel.close() + fileStream.close() deserializeStream = null - fileChannel = null + fileStream = null } val start = batchOffsets(batchId) - fileChannel = FileChannel.open(spill.file.toPath, StandardOpenOption.READ) - fileChannel.position(start) + fileStream = new FileInputStream(spill.file) + fileStream.getChannel.position(start) batchId += 1 val end = batchOffsets(batchId) @@ -522,8 +520,7 @@ private[spark] class ExternalSorter[K, V, C]( assert(end >= start, "start = " + start + ", end = " + end + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) - val bufferedStream = new BufferedInputStream( - ByteStreams.limit(Channels.newInputStream(fileChannel), end - start)) + val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) val wrappedStream = serializerManager.wrapStream(spill.blockId, bufferedStream) serInstance.deserializeStream(wrappedStream) @@ -613,7 +610,7 @@ private[spark] class ExternalSorter[K, V, C]( batchId = batchOffsets.length // Prevent reading any other batch val ds = deserializeStream deserializeStream = null - fileChannel = null + fileStream = null if (ds != null) { ds.close() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org