[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20590 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20590#discussion_r167925515 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala --- @@ -188,6 +188,9 @@ class OrcFileFormat if (enableVectorizedReader) { val batchReader = new OrcColumnarBatchReader( enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity) + val iter = new RecordReaderIterator(batchReader) + Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) --- End diff -- Sure! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20590#discussion_r167904770 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala --- @@ -188,6 +188,9 @@ class OrcFileFormat if (enableVectorizedReader) { val batchReader = new OrcColumnarBatchReader( enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity) + val iter = new RecordReaderIterator(batchReader) + Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) --- End diff -- Could you please add comment why we put this registration here with `SPARK-23399`. Since we would forget this investigation in the future :), this comment will help us and will remind to run the test case manually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20590#discussion_r167901068 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala --- @@ -160,6 +162,25 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } } } + + // This should be tested manually because it raises OOM intentionally + // in order to cause `Leaked filesystem connection`. The test suite dies, too. --- End diff -- Sure! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20590#discussion_r167828374 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala --- @@ -160,6 +162,25 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } } } + + // This should be tested manually because it raises OOM intentionally + // in order to cause `Leaked filesystem connection`. The test suite dies, too. --- End diff -- ah, nice trick to fail the reader midway! But it's a little weird to have it as a unit test, shall we just put it in the PR description and say it's manually tested? This test needs to be run manually anyway... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20590#discussion_r167803208 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala --- @@ -160,6 +162,25 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } } } + + // This should be tested manually because it raises OOM intentionally + // in order to cause `Leaked filesystem connection`. The test suite dies, too. + ignore("SPARK-23399 Register a task completion listner first for OrcColumnarBatchReader") { +withSQLConf(SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") { + withTempDir { dir => +val basePath = dir.getCanonicalPath +Seq(0).toDF("a").write.format("orc").save(new Path(basePath, "first").toString) +Seq(1).toDF("a").write.format("orc").save(new Path(basePath, "second").toString) +val df = spark.read.orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) +val e = intercept[SparkException] { + df.collect() +} +assert(e.getCause.isInstanceOf[OutOfMemoryError]) + } +} + } --- End diff -- Hi, All. The above test case generates the same leakage reported in JIRA. And, this PR fixes that. Please try this in IntelliJ with the original code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20590#discussion_r167762763 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala --- @@ -188,6 +188,9 @@ class OrcFileFormat if (enableVectorizedReader) { val batchReader = new OrcColumnarBatchReader( enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity) + val iter = new RecordReaderIterator(batchReader) + Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) + batchReader.initialize(fileSplit, taskAttemptContext) --- End diff -- Because I tried to verify it manually in local, seems `close` is called before this change. Maybe I miss something or this is environment depending. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20590#discussion_r167762591 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala --- @@ -188,6 +188,9 @@ class OrcFileFormat if (enableVectorizedReader) { val batchReader = new OrcColumnarBatchReader( enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity) + val iter = new RecordReaderIterator(batchReader) + Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) + batchReader.initialize(fileSplit, taskAttemptContext) --- End diff -- @dongjoon-hyun Thanks for this fix! My question is how do we know if `close` is not called and is called now? Have you verified it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20590#discussion_r167700392 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala --- @@ -188,6 +188,9 @@ class OrcFileFormat if (enableVectorizedReader) { val batchReader = new OrcColumnarBatchReader( enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity) + val iter = new RecordReaderIterator(batchReader) + Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) + batchReader.initialize(fileSplit, taskAttemptContext) --- End diff -- @cloud-fan and @gatorsmile . Could you take a look this? For ORC library, it looks okay when we call `close` correctly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20590#discussion_r167699657 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala --- @@ -188,6 +188,9 @@ class OrcFileFormat if (enableVectorizedReader) { val batchReader = new OrcColumnarBatchReader( enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity) + val iter = new RecordReaderIterator(batchReader) + Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) + batchReader.initialize(fileSplit, taskAttemptContext) --- End diff -- According to the reported case, the ORC file is opened here. But, it seems that the task is killed, `TaskKilled (Stage cancelled)`, before registering its listener during `initBatch`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...
GitHub user dongjoon-hyun opened a pull request: https://github.com/apache/spark/pull/20590 [SPARK-23399][SQL] Register a task completion listner first for OrcColumnarBatchReader ## What changes were proposed in this pull request? This PR aims to resolve an open file leakage issue reported at SPARK-23390 by moving the listener registration position. Currently, the sequence is like the following. 1. Create `batchReader` 2. `batchReader.initialize` opens a ORC file. 3. `batchReader.initBatch` may take a long time to alloc memory in some environment and cause errors. 4. `Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))` This PR moves 4 before 2 and 3. To sum up, the new sequence is 1 -> 4 -> 2 -> 3. ## How was this patch tested? Currently, I couldn't find a way to add a test case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dongjoon-hyun/spark SPARK-23399 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20590.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20590 commit 198f1861cfe4d2cd544cb3a09d3a271de1b656ab Author: Dongjoon Hyun Date: 2018-02-12T21:46:49Z [SPARK-23399][SQL] Register a task completion listner first for OrcColumnarBatchReader --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org