GitHub user jerryshao opened a pull request:

    https://github.com/apache/spark/pull/18684

    [SPARK-21475][Core] Use NIO's Files API to replace 
FileInputStream/FileOutputStream in some critical paths

    ## What changes were proposed in this pull request?
    
    Java's `FileInputStream` and `FileOutputStream` overrides finalize(), even 
this file input/output stream is closed correctly and promptly, it will still 
leave some memory footprints which will only get cleaned in Full GC. This will 
introduce two side effects:
    
    1. Lots of memory footprints regarding to Finalizer will be kept in memory 
and this will increase the memory overhead. In our use case of external shuffle 
service, a busy shuffle service will have bunch of this object and potentially 
lead to OOM.
    2. The Finalizer will only be called in Full GC, and this will increase the 
overhead of Full GC and lead to long GC pause.
    
    So to fix this potential issue, here propose to use NIO's 
Files#newInput/OutputStream instead in some critical paths like shuffle.
    
    Left unchanged FileInputStream in core which I think is not so critical:
    
    ```
    ./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:467:    
val file = new DataInputStream(new FileInputStream(filename))
    ./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:942:    
val in = new FileInputStream(new File(path))
    
./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:76:
    val fileIn = new FileInputStream(file)
    ./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:248:      
  val fis = new FileInputStream(file)
    ./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:910:        
        input = new FileInputStream(new File(t))
    
./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:20:import 
java.io.{FileInputStream, InputStream}
    ./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:132:     
   case Some(f) => new FileInputStream(f)
    
./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:20:import
 java.io.{FileInputStream, InputStream}
    
./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:77:   
     val fis = new FileInputStream(f)
    
./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:27:import
 org.apache.spark.io.NioBufferedFileInputStream
    
./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:94:
      new DataInputStream(new NioBufferedFileInputStream(index))
    ./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:111:        
val channel = new FileInputStream(file).getChannel()
    ./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:219:    val 
channel = new FileInputStream(file).getChannel()
    ./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import 
java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
    ./core/src/main/scala/org/apache/spark/TestUtils.scala:106:      val in = 
new FileInputStream(file)
    
./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:89:
        inputStream = new FileInputStream(activeFile)
    ./core/src/main/scala/org/apache/spark/util/Utils.scala:329:      if 
(in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
    ./core/src/main/scala/org/apache/spark/util/Utils.scala:332:        val 
inChannel = in.asInstanceOf[FileInputStream].getChannel()
    ./core/src/main/scala/org/apache/spark/util/Utils.scala:1533:      
gzInputStream = new GZIPInputStream(new FileInputStream(file))
    ./core/src/main/scala/org/apache/spark/util/Utils.scala:1560:      new 
GZIPInputStream(new FileInputStream(file))
    ./core/src/main/scala/org/apache/spark/util/Utils.scala:1562:      new 
FileInputStream(file)
    ./core/src/main/scala/org/apache/spark/util/Utils.scala:2090:    val 
inReader = new InputStreamReader(new FileInputStream(file), 
StandardCharsets.UTF_8)
    ```
    
    Left unchanged FileOutputStream in core:
    
    ```
    ./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:957:    
val out = new FileOutputStream(file)
    ./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:20:import 
java.io.{DataOutputStream, File, FileOutputStream, IOException}
    ./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:131:      val 
dos = new DataOutputStream(new FileOutputStream(f))
    
./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:62:
    val fileOut = new FileOutputStream(file)
    ./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:160:      
    val outStream = new FileOutputStream(outPath)
    ./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:239:    
val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false))
    ./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:949:        
val out = new FileOutputStream(tempFile)
    
./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:20:import
 java.io.{File, FileOutputStream, InputStream, IOException}
    
./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:106:    
val out = new FileOutputStream(file, true)
    
./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:109:
     * Therefore, for local files, use FileOutputStream instead. */
    
./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:112:
        new FileOutputStream(uri.getPath)
    
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:20:import
 java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
    
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:71:  
private var fos: FileOutputStream = null
    
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:102: 
   fos = new FileOutputStream(file, true)
    
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:213: 
     var truncateStream: FileOutputStream = null
    
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:215: 
       truncateStream = new FileOutputStream(file, true)
    ./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:153:    val 
out = new FileOutputStream(file).getChannel()
    ./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import 
java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
    ./core/src/main/scala/org/apache/spark/TestUtils.scala:81:    val jarStream 
= new JarOutputStream(new FileOutputStream(jarFile))
    ./core/src/main/scala/org/apache/spark/TestUtils.scala:96:    val 
jarFileStream = new FileOutputStream(jarFile)
    
./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:20:import
 java.io.{File, FileOutputStream, InputStream, IOException}
    ./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:31:  
@volatile private var outputStream: FileOutputStream = null
    ./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:97:  
  outputStream = new FileOutputStream(file, true)
    
./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:90:
        gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile))
    ./core/src/main/scala/org/apache/spark/util/Utils.scala:329:      if 
(in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
    ./core/src/main/scala/org/apache/spark/util/Utils.scala:333:        val 
outChannel = out.asInstanceOf[FileOutputStream].getChannel()
    ./core/src/main/scala/org/apache/spark/util/Utils.scala:527:      val out = 
new FileOutputStream(tempFile)
    ```
    
    Here in `DiskBlockObjectWriter`, it uses `FileDescriptor` so it is not easy 
to change to NIO Files API.
    
    For the `FileInputStream` and `FileOutputStream` in common/shuffle* I 
changed them all.
    
    ## How was this patch tested?
    
    Existing tests and manual verification.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jerryshao/apache-spark SPARK-21475

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18684.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18684
    
----
commit b9dad5ac976261359623fafbbfa9389310272238
Author: jerryshao <[email protected]>
Date:   2017-07-19T18:30:15Z

    Use NIO's file API to replace FileInputStream/FileOutputStream
    
    Change-Id: I0f11b9e0cbe62ca3d0bac7bfe0e2df838da80b48

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to