[GitHub] spark pull request #15245: [SPARK-17666] Ensure that RecordReaders are close...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/15245 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15245: [SPARK-17666] Ensure that RecordReaders are close...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/15245#discussion_r80568308 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -30,7 +31,8 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines * in that file. */ -class HadoopFileLinesReader(file: PartitionedFile, conf: Configuration) extends Iterator[Text] { +class HadoopFileLinesReader( +file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { private val iterator = { --- End diff -- This is a `RecordReaderIterator`, whose memory footprint should become practically nothing after `close()` is called, hence my decision to not null things out here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15245: [SPARK-17666] Ensure that RecordReaders are close...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/15245#discussion_r80568112 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala --- @@ -27,7 +29,8 @@ import org.apache.spark.sql.catalyst.InternalRow * Note that this returns [[Object]]s instead of [[InternalRow]] because we rely on erasure to pass * column batches by pretending they are rows. */ -class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T] { +class RecordReaderIterator[T]( +private[this] var rowReader: RecordReader[_, T]) extends Iterator[T] with Closeable { --- End diff -- By nulling out the `rowReader` I think that this will prevent memory consumption from becoming too high in the list of task completion callbacks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15245: [SPARK-17666] Ensure that RecordReaders are close...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/15245#discussion_r80567995 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala --- @@ -52,4 +55,18 @@ class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T] havePair = false rowReader.getCurrentValue } + + override def close(): Unit = { +if (rowReader != null) { + try { +// Close the reader and release it. Note: it's very important that we don't close the --- End diff -- This comment is copied from `NewHadoopRdd`, which contains similar defensive programming. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15245: [SPARK-17666] Ensure that RecordReaders are close...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/15245#discussion_r80567861 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala --- @@ -38,7 +41,7 @@ class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T] // Close and release the reader here; close() will also be called when the task // completes, but for tasks that read from many files, it helps to release the // resources early. -rowReader.close() --- End diff -- This is `CompletionIterator`-style cleanup. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15245: [SPARK-17666] Ensure that RecordReaders are close...
GitHub user JoshRosen opened a pull request: https://github.com/apache/spark/pull/15245 [SPARK-17666] Ensure that RecordReaders are closed by data source file scans ## What changes were proposed in this pull request? This patch addresses a potential cause of resource leaks in data source file scans. As reported in [SPARK-17666](https://issues.apache.org/jira/browse/SPARK-17666), tasks which do not fully-consume their input may cause file handles / network connections (e.g. S3 connections) to be leaked. Spark's `NewHadoopRDD` uses a TaskContext callback to [close its record readers](https://github.com/apache/spark/blame/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L208), but the new data source file scans will only close record readers once their iterators are fully-consumed. This patch modifies `RecordReaderIterator` and `HadoopFileLinesReader` to add `close()` methods and modifies all six implementations of `FileFormat.buildReader()` to register TaskContext task completion callbacks to guarantee that cleanup is eventually performed. ## How was this patch tested? Tested manually for now. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JoshRosen/spark SPARK-17666-close-recordreader Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15245.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15245 commit d804e025c2b4a8799f38f2f67beba1d12e224180 Author: Josh RosenDate: 2016-09-26T20:28:12Z Add close() to RecordReaderIterator and HadoopLinesReader commit e4b8577ed71a30f4ad220cd1a2f19a8edd596c64 Author: Josh Rosen Date: 2016-09-26T20:29:24Z Register close() callbacks in all implementations of FileFormat.buildReader() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org