GitHub user jerryshao opened a pull request:
https://github.com/apache/spark/pull/18684
[SPARK-21475][Core] Use NIO's Files API to replace
FileInputStream/FileOutputStream in some critical paths
## What changes were proposed in this pull request?
Java's `FileInputStream` and `FileOutputStream` overrides finalize(), even
this file input/output stream is closed correctly and promptly, it will still
leave some memory footprints which will only get cleaned in Full GC. This will
introduce two side effects:
1. Lots of memory footprints regarding to Finalizer will be kept in memory
and this will increase the memory overhead. In our use case of external shuffle
service, a busy shuffle service will have bunch of this object and potentially
lead to OOM.
2. The Finalizer will only be called in Full GC, and this will increase the
overhead of Full GC and lead to long GC pause.
So to fix this potential issue, here propose to use NIO's
Files#newInput/OutputStream instead in some critical paths like shuffle.
Left unchanged FileInputStream in core which I think is not so critical:
```
./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:467:
val file = new DataInputStream(new FileInputStream(filename))
./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:942:
val in = new FileInputStream(new File(path))
./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:76:
val fileIn = new FileInputStream(file)
./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:248:
val fis = new FileInputStream(file)
./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:910:
input = new FileInputStream(new File(t))
./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:20:import
java.io.{FileInputStream, InputStream}
./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:132:
case Some(f) => new FileInputStream(f)
./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:20:import
java.io.{FileInputStream, InputStream}
./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:77:
val fis = new FileInputStream(f)
./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:27:import
org.apache.spark.io.NioBufferedFileInputStream
./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:94:
new DataInputStream(new NioBufferedFileInputStream(index))
./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:111:
val channel = new FileInputStream(file).getChannel()
./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:219: val
channel = new FileInputStream(file).getChannel()
./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import
java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
./core/src/main/scala/org/apache/spark/TestUtils.scala:106: val in =
new FileInputStream(file)
./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:89:
inputStream = new FileInputStream(activeFile)
./core/src/main/scala/org/apache/spark/util/Utils.scala:329: if
(in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
./core/src/main/scala/org/apache/spark/util/Utils.scala:332: val
inChannel = in.asInstanceOf[FileInputStream].getChannel()
./core/src/main/scala/org/apache/spark/util/Utils.scala:1533:
gzInputStream = new GZIPInputStream(new FileInputStream(file))
./core/src/main/scala/org/apache/spark/util/Utils.scala:1560: new
GZIPInputStream(new FileInputStream(file))
./core/src/main/scala/org/apache/spark/util/Utils.scala:1562: new
FileInputStream(file)
./core/src/main/scala/org/apache/spark/util/Utils.scala:2090: val
inReader = new InputStreamReader(new FileInputStream(file),
StandardCharsets.UTF_8)
```
Left unchanged FileOutputStream in core:
```
./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:957:
val out = new FileOutputStream(file)
./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:20:import
java.io.{DataOutputStream, File, FileOutputStream, IOException}
./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:131: val
dos = new DataOutputStream(new FileOutputStream(f))
./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:62:
val fileOut = new FileOutputStream(file)
./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:160:
val outStream = new FileOutputStream(outPath)
./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:239:
val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false))
./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:949:
val out = new FileOutputStream(tempFile)
./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:20:import
java.io.{File, FileOutputStream, InputStream, IOException}
./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:106:
val out = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:109:
* Therefore, for local files, use FileOutputStream instead. */
./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:112:
new FileOutputStream(uri.getPath)
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:20:import
java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:71:
private var fos: FileOutputStream = null
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:102:
fos = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:213:
var truncateStream: FileOutputStream = null
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:215:
truncateStream = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:153: val
out = new FileOutputStream(file).getChannel()
./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import
java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
./core/src/main/scala/org/apache/spark/TestUtils.scala:81: val jarStream
= new JarOutputStream(new FileOutputStream(jarFile))
./core/src/main/scala/org/apache/spark/TestUtils.scala:96: val
jarFileStream = new FileOutputStream(jarFile)
./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:20:import
java.io.{File, FileOutputStream, InputStream, IOException}
./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:31:
@volatile private var outputStream: FileOutputStream = null
./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:97:
outputStream = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:90:
gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile))
./core/src/main/scala/org/apache/spark/util/Utils.scala:329: if
(in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
./core/src/main/scala/org/apache/spark/util/Utils.scala:333: val
outChannel = out.asInstanceOf[FileOutputStream].getChannel()
./core/src/main/scala/org/apache/spark/util/Utils.scala:527: val out =
new FileOutputStream(tempFile)
```
Here in `DiskBlockObjectWriter`, it uses `FileDescriptor` so it is not easy
to change to NIO Files API.
For the `FileInputStream` and `FileOutputStream` in common/shuffle* I
changed them all.
## How was this patch tested?
Existing tests and manual verification.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/jerryshao/apache-spark SPARK-21475
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/18684.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #18684
----
commit b9dad5ac976261359623fafbbfa9389310272238
Author: jerryshao <[email protected]>
Date: 2017-07-19T18:30:15Z
Use NIO's file API to replace FileInputStream/FileOutputStream
Change-Id: I0f11b9e0cbe62ca3d0bac7bfe0e2df838da80b48
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]