[GitHub] spark pull request #15245: [SPARK-17666] Ensure that RecordReaders are close...

2016-09-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15245


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15245: [SPARK-17666] Ensure that RecordReaders are close...

2016-09-26 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15245#discussion_r80568308
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
 ---
@@ -30,7 +31,8 @@ import 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
  * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], 
which are all of the lines
  * in that file.
  */
-class HadoopFileLinesReader(file: PartitionedFile, conf: Configuration) 
extends Iterator[Text] {
+class HadoopFileLinesReader(
+file: PartitionedFile, conf: Configuration) extends Iterator[Text] 
with Closeable {
   private val iterator = {
--- End diff --

This is a `RecordReaderIterator`, whose memory footprint should become 
practically nothing after `close()` is called, hence my decision to not null 
things out here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15245: [SPARK-17666] Ensure that RecordReaders are close...

2016-09-26 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15245#discussion_r80568112
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
 ---
@@ -27,7 +29,8 @@ import org.apache.spark.sql.catalyst.InternalRow
  * Note that this returns [[Object]]s instead of [[InternalRow]] because 
we rely on erasure to pass
  * column batches by pretending they are rows.
  */
-class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends 
Iterator[T] {
+class RecordReaderIterator[T](
+private[this] var rowReader: RecordReader[_, T]) extends Iterator[T] 
with Closeable {
--- End diff --

By nulling out the `rowReader` I think that this will prevent memory 
consumption from becoming too high in the list of task completion callbacks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15245: [SPARK-17666] Ensure that RecordReaders are close...

2016-09-26 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15245#discussion_r80567995
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
 ---
@@ -52,4 +55,18 @@ class RecordReaderIterator[T](rowReader: RecordReader[_, 
T]) extends Iterator[T]
 havePair = false
 rowReader.getCurrentValue
   }
+
+  override def close(): Unit = {
+if (rowReader != null) {
+  try {
+// Close the reader and release it. Note: it's very important that 
we don't close the
--- End diff --

This comment is copied from `NewHadoopRdd`, which contains similar 
defensive programming.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15245: [SPARK-17666] Ensure that RecordReaders are close...

2016-09-26 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15245#discussion_r80567861
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
 ---
@@ -38,7 +41,7 @@ class RecordReaderIterator[T](rowReader: RecordReader[_, 
T]) extends Iterator[T]
 // Close and release the reader here; close() will also be called 
when the task
 // completes, but for tasks that read from many files, it helps to 
release the
 // resources early.
-rowReader.close()
--- End diff --

This is `CompletionIterator`-style cleanup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15245: [SPARK-17666] Ensure that RecordReaders are close...

2016-09-26 Thread JoshRosen
GitHub user JoshRosen opened a pull request:

https://github.com/apache/spark/pull/15245

[SPARK-17666] Ensure that RecordReaders are closed by data source file scans

## What changes were proposed in this pull request?

This patch addresses a potential cause of resource leaks in data source 
file scans. As reported in 
[SPARK-17666](https://issues.apache.org/jira/browse/SPARK-17666), tasks which 
do not fully-consume their input may cause file handles / network connections 
(e.g. S3 connections) to be leaked. Spark's `NewHadoopRDD` uses a TaskContext 
callback to [close its record 
readers](https://github.com/apache/spark/blame/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L208),
 but the new data source file scans will only close record readers once their 
iterators are fully-consumed.

This patch modifies `RecordReaderIterator` and `HadoopFileLinesReader` to 
add `close()` methods and modifies all six implementations of 
`FileFormat.buildReader()` to register TaskContext task completion callbacks to 
guarantee that cleanup is eventually performed.

## How was this patch tested?

Tested manually for now.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JoshRosen/spark SPARK-17666-close-recordreader

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15245.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15245


commit d804e025c2b4a8799f38f2f67beba1d12e224180
Author: Josh Rosen 
Date:   2016-09-26T20:28:12Z

Add close() to RecordReaderIterator and HadoopLinesReader

commit e4b8577ed71a30f4ad220cd1a2f19a8edd596c64
Author: Josh Rosen 
Date:   2016-09-26T20:29:24Z

Register close() callbacks in all implementations of 
FileFormat.buildReader()




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org