[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...

2018-02-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20590


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...

2018-02-13 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20590#discussion_r167925515
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 ---
@@ -188,6 +188,9 @@ class OrcFileFormat
 if (enableVectorizedReader) {
   val batchReader = new OrcColumnarBatchReader(
 enableOffHeapColumnVector && taskContext.isDefined, 
copyToSpark, capacity)
+  val iter = new RecordReaderIterator(batchReader)
+  Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ 
=> iter.close()))
--- End diff --

Sure!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...

2018-02-13 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20590#discussion_r167904770
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 ---
@@ -188,6 +188,9 @@ class OrcFileFormat
 if (enableVectorizedReader) {
   val batchReader = new OrcColumnarBatchReader(
 enableOffHeapColumnVector && taskContext.isDefined, 
copyToSpark, capacity)
+  val iter = new RecordReaderIterator(batchReader)
+  Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ 
=> iter.close()))
--- End diff --

Could you please add comment why we put this registration here with 
`SPARK-23399`. Since we would forget this investigation in the future :), this 
comment will help us and will remind to run the test case manually.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...

2018-02-13 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20590#discussion_r167901068
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
 ---
@@ -160,6 +162,25 @@ abstract class OrcSuite extends OrcTest with 
BeforeAndAfterAll {
   }
 }
   }
+
+  // This should be tested manually because it raises OOM intentionally
+  // in order to cause `Leaked filesystem connection`. The test suite 
dies, too.
--- End diff --

Sure!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...

2018-02-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20590#discussion_r167828374
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
 ---
@@ -160,6 +162,25 @@ abstract class OrcSuite extends OrcTest with 
BeforeAndAfterAll {
   }
 }
   }
+
+  // This should be tested manually because it raises OOM intentionally
+  // in order to cause `Leaked filesystem connection`. The test suite 
dies, too.
--- End diff --

ah, nice trick to fail the reader midway!

But it's a little weird to have it as a unit test, shall we just put it in 
the PR description and say it's manually tested? This test needs to be run 
manually anyway...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...

2018-02-13 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20590#discussion_r167803208
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
 ---
@@ -160,6 +162,25 @@ abstract class OrcSuite extends OrcTest with 
BeforeAndAfterAll {
   }
 }
   }
+
+  // This should be tested manually because it raises OOM intentionally
+  // in order to cause `Leaked filesystem connection`. The test suite 
dies, too.
+  ignore("SPARK-23399 Register a task completion listner first for 
OrcColumnarBatchReader") {
+withSQLConf(SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE.key -> 
s"${Int.MaxValue}") {
+  withTempDir { dir =>
+val basePath = dir.getCanonicalPath
+Seq(0).toDF("a").write.format("orc").save(new Path(basePath, 
"first").toString)
+Seq(1).toDF("a").write.format("orc").save(new Path(basePath, 
"second").toString)
+val df = spark.read.orc(
+  new Path(basePath, "first").toString,
+  new Path(basePath, "second").toString)
+val e = intercept[SparkException] {
+  df.collect()
+}
+assert(e.getCause.isInstanceOf[OutOfMemoryError])
+  }
+}
+  }
--- End diff --

Hi, All. 
The above test case generates the same leakage reported in JIRA.
And, this PR fixes that. Please try this in IntelliJ with the original code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...

2018-02-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20590#discussion_r167762763
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 ---
@@ -188,6 +188,9 @@ class OrcFileFormat
 if (enableVectorizedReader) {
   val batchReader = new OrcColumnarBatchReader(
 enableOffHeapColumnVector && taskContext.isDefined, 
copyToSpark, capacity)
+  val iter = new RecordReaderIterator(batchReader)
+  Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ 
=> iter.close()))
+
   batchReader.initialize(fileSplit, taskAttemptContext)
--- End diff --

Because I tried to verify it manually in local, seems `close` is called 
before this change. Maybe I miss something or this is environment depending.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...

2018-02-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20590#discussion_r167762591
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 ---
@@ -188,6 +188,9 @@ class OrcFileFormat
 if (enableVectorizedReader) {
   val batchReader = new OrcColumnarBatchReader(
 enableOffHeapColumnVector && taskContext.isDefined, 
copyToSpark, capacity)
+  val iter = new RecordReaderIterator(batchReader)
+  Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ 
=> iter.close()))
+
   batchReader.initialize(fileSplit, taskAttemptContext)
--- End diff --

@dongjoon-hyun Thanks for this fix! My question is how do we know if 
`close` is not called and is called now? Have you verified it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...

2018-02-12 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20590#discussion_r167700392
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 ---
@@ -188,6 +188,9 @@ class OrcFileFormat
 if (enableVectorizedReader) {
   val batchReader = new OrcColumnarBatchReader(
 enableOffHeapColumnVector && taskContext.isDefined, 
copyToSpark, capacity)
+  val iter = new RecordReaderIterator(batchReader)
+  Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ 
=> iter.close()))
+
   batchReader.initialize(fileSplit, taskAttemptContext)
--- End diff --

@cloud-fan and @gatorsmile . Could you take a look this?
For ORC library, it looks okay when we call `close` correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...

2018-02-12 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20590#discussion_r167699657
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 ---
@@ -188,6 +188,9 @@ class OrcFileFormat
 if (enableVectorizedReader) {
   val batchReader = new OrcColumnarBatchReader(
 enableOffHeapColumnVector && taskContext.isDefined, 
copyToSpark, capacity)
+  val iter = new RecordReaderIterator(batchReader)
+  Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ 
=> iter.close()))
+
   batchReader.initialize(fileSplit, taskAttemptContext)
--- End diff --

According to the reported case, the ORC file is opened here. 
But, it seems that the task is killed, `TaskKilled (Stage cancelled)`, 
before registering its listener during `initBatch`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20590: [SPARK-23399][SQL] Register a task completion lis...

2018-02-12 Thread dongjoon-hyun
GitHub user dongjoon-hyun opened a pull request:

https://github.com/apache/spark/pull/20590

[SPARK-23399][SQL] Register a task completion listner first for 
OrcColumnarBatchReader

## What changes were proposed in this pull request?

This PR aims to resolve an open file leakage issue reported at SPARK-23390 
by moving the listener registration position. Currently, the sequence is like 
the following.

1. Create `batchReader`
2. `batchReader.initialize` opens a ORC file.
3. `batchReader.initBatch` may take a long time to alloc memory in some 
environment and cause errors.
4. `Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
iter.close()))`

This PR moves 4 before 2 and 3. To sum up, the new sequence is 1 -> 4 -> 2 
-> 3.

## How was this patch tested?

Currently, I couldn't find a way to add a test case.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dongjoon-hyun/spark SPARK-23399

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20590.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20590


commit 198f1861cfe4d2cd544cb3a09d3a271de1b656ab
Author: Dongjoon Hyun 
Date:   2018-02-12T21:46:49Z

[SPARK-23399][SQL] Register a task completion listner first for 
OrcColumnarBatchReader




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org