GitHub user dongjoon-hyun opened a pull request:
https://github.com/apache/spark/pull/20714
[SPARK-23457][SQL][BRANCH-2.3] Register task completion listeners first in
ParquetFileFormat
## What changes were proposed in this pull request?
ParquetFileFormat leaks opened files in some cases. This PR prevents that
by registering task completion listers first before initialization.
-
[spark-branch-2.3-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/205/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
-
[spark-master-test-sbt-hadoop-2.6](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4228/testReport/junit/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
```
Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
at
org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
at
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:538)
at
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400)
at
```
## How was this patch tested?
Manual. The following test case generates the same leakage.
```scala
test("SPARK-23457 Register task completion listeners first in
ParquetFileFormat") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key ->
s"${Int.MaxValue}") {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
Seq(0).toDF("a").write.format("parquet").save(new Path(basePath,
"first").toString)
Seq(1).toDF("a").write.format("parquet").save(new Path(basePath,
"second").toString)
val df = spark.read.parquet(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString)
val e = intercept[SparkException] {
df.collect()
}
assert(e.getCause.isInstanceOf[OutOfMemoryError])
}
}
}
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/dongjoon-hyun/spark SPARK-23457-2.3
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/20714.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #20714
----
commit d15eba754a59721bc7d9cdc7d374f2f323d21e41
Author: Dongjoon Hyun <dongjoon@...>
Date: 2018-02-20T05:33:03Z
[SPARK-23457][SQL] Register task completion listeners first in
ParquetFileFormat
ParquetFileFormat leaks opened files in some cases. This PR prevents that
by registering task completion listers first before initialization.
-
[spark-branch-2.3-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/205/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
-
[spark-master-test-sbt-hadoop-2.6](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4228/testReport/junit/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
```
Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
at
org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
at
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:538)
at
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400)
at
```
Manual. The following test case generates the same leakage.
```scala
test("SPARK-23457 Register task completion listeners first in
ParquetFileFormat") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key ->
s"${Int.MaxValue}") {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
Seq(0).toDF("a").write.format("parquet").save(new Path(basePath,
"first").toString)
Seq(1).toDF("a").write.format("parquet").save(new Path(basePath,
"second").toString)
val df = spark.read.parquet(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString)
val e = intercept[SparkException] {
df.collect()
}
assert(e.getCause.isInstanceOf[OutOfMemoryError])
}
}
}
```
Author: Dongjoon Hyun <[email protected]>
Closes #20619 from dongjoon-hyun/SPARK-23390.
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]