GitHub user dongjoon-hyun opened a pull request:

    https://github.com/apache/spark/pull/20714

    [SPARK-23457][SQL][BRANCH-2.3] Register task completion listeners first in 
ParquetFileFormat

    ## What changes were proposed in this pull request?
    
    ParquetFileFormat leaks opened files in some cases. This PR prevents that 
by registering task completion listers first before initialization.
    
    - 
[spark-branch-2.3-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/205/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
    - 
[spark-master-test-sbt-hadoop-2.6](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4228/testReport/junit/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
    
    ```
    Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
        at 
org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
        at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
        at 
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:538)
        at 
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400)
        at
    ```
    
    ## How was this patch tested?
    
    Manual. The following test case generates the same leakage.
    
    ```scala
      test("SPARK-23457 Register task completion listeners first in 
ParquetFileFormat") {
        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> 
s"${Int.MaxValue}") {
          withTempDir { dir =>
            val basePath = dir.getCanonicalPath
            Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, 
"first").toString)
            Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, 
"second").toString)
            val df = spark.read.parquet(
              new Path(basePath, "first").toString,
              new Path(basePath, "second").toString)
            val e = intercept[SparkException] {
              df.collect()
            }
            assert(e.getCause.isInstanceOf[OutOfMemoryError])
          }
        }
      }
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dongjoon-hyun/spark SPARK-23457-2.3

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20714.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20714
    
----
commit d15eba754a59721bc7d9cdc7d374f2f323d21e41
Author: Dongjoon Hyun <dongjoon@...>
Date:   2018-02-20T05:33:03Z

    [SPARK-23457][SQL] Register task completion listeners first in 
ParquetFileFormat
    
    ParquetFileFormat leaks opened files in some cases. This PR prevents that 
by registering task completion listers first before initialization.
    
    - 
[spark-branch-2.3-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/205/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
    - 
[spark-master-test-sbt-hadoop-2.6](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4228/testReport/junit/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
    
    ```
    Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
        at 
org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
        at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
        at 
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:538)
        at 
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400)
        at
    ```
    
    Manual. The following test case generates the same leakage.
    
    ```scala
      test("SPARK-23457 Register task completion listeners first in 
ParquetFileFormat") {
        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> 
s"${Int.MaxValue}") {
          withTempDir { dir =>
            val basePath = dir.getCanonicalPath
            Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, 
"first").toString)
            Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, 
"second").toString)
            val df = spark.read.parquet(
              new Path(basePath, "first").toString,
              new Path(basePath, "second").toString)
            val e = intercept[SparkException] {
              df.collect()
            }
            assert(e.getCause.isInstanceOf[OutOfMemoryError])
          }
        }
      }
    ```
    
    Author: Dongjoon Hyun <[email protected]>
    
    Closes #20619 from dongjoon-hyun/SPARK-23390.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to