spark git commit: [SPARK-23399][SQL] Register a task completion listener first for OrcColumnarBatchReader

2018-02-13 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 4f6a457d4 -> bb26bdb55


[SPARK-23399][SQL] Register a task completion listener first for 
OrcColumnarBatchReader

This PR aims to resolve an open file leakage issue reported at 
[SPARK-23390](https://issues.apache.org/jira/browse/SPARK-23390) by moving the 
listener registration position. Currently, the sequence is like the following.

1. Create `batchReader`
2. `batchReader.initialize` opens a ORC file.
3. `batchReader.initBatch` may take a long time to alloc memory in some 
environment and cause errors.
4. `Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
iter.close()))`

This PR moves 4 before 2 and 3. To sum up, the new sequence is 1 -> 4 -> 2 -> 3.

Manual. The following test case makes OOM intentionally to cause leaked 
filesystem connection in the current code base. With this patch, leakage 
doesn't occurs.

```scala
  // This should be tested manually because it raises OOM intentionally
  // in order to cause `Leaked filesystem connection`.
  test("SPARK-23399 Register a task completion listener first for 
OrcColumnarBatchReader") {
withSQLConf(SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE.key -> 
s"${Int.MaxValue}") {
  withTempDir { dir =>
val basePath = dir.getCanonicalPath
Seq(0).toDF("a").write.format("orc").save(new Path(basePath, 
"first").toString)
Seq(1).toDF("a").write.format("orc").save(new Path(basePath, 
"second").toString)
val df = spark.read.orc(
  new Path(basePath, "first").toString,
  new Path(basePath, "second").toString)
val e = intercept[SparkException] {
  df.collect()
}
assert(e.getCause.isInstanceOf[OutOfMemoryError])
  }
}
  }
```

Author: Dongjoon Hyun 

Closes #20590 from dongjoon-hyun/SPARK-23399.

(cherry picked from commit 357babde5a8eb9710de7016d7ae82dee21fa4ef3)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb26bdb5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb26bdb5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb26bdb5

Branch: refs/heads/branch-2.3
Commit: bb26bdb55fdf84c4e36fd66af9a15e325a3982d6
Parents: 4f6a457
Author: Dongjoon Hyun 
Authored: Wed Feb 14 10:55:24 2018 +0800
Committer: Wenchen Fan 
Committed: Wed Feb 14 10:56:37 2018 +0800

--
 .../spark/sql/execution/datasources/orc/OrcFileFormat.scala  | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bb26bdb5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 2dd314d..94403c3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -187,6 +187,12 @@ class OrcFileFormat
 if (enableVectorizedReader) {
   val batchReader = new OrcColumnarBatchReader(
 enableOffHeapColumnVector && taskContext.isDefined, copyToSpark)
+  // SPARK-23399 Register a task completion listener first to call 
`close()` in all cases.
+  // There is a possibility that `initialize` and `initBatch` hit some 
errors (like OOM)
+  // after opening a file.
+  val iter = new RecordReaderIterator(batchReader)
+  Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
iter.close()))
+
   batchReader.initialize(fileSplit, taskAttemptContext)
   batchReader.initBatch(
 reader.getSchema,
@@ -195,8 +201,6 @@ class OrcFileFormat
 partitionSchema,
 file.partitionValues)
 
-  val iter = new RecordReaderIterator(batchReader)
-  Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
iter.close()))
   iter.asInstanceOf[Iterator[InternalRow]]
 } else {
   val orcRecordReader = new OrcInputFormat[OrcStruct]


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23399][SQL] Register a task completion listener first for OrcColumnarBatchReader

2018-02-13 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master d6f5e172b -> 357babde5


[SPARK-23399][SQL] Register a task completion listener first for 
OrcColumnarBatchReader

## What changes were proposed in this pull request?

This PR aims to resolve an open file leakage issue reported at 
[SPARK-23390](https://issues.apache.org/jira/browse/SPARK-23390) by moving the 
listener registration position. Currently, the sequence is like the following.

1. Create `batchReader`
2. `batchReader.initialize` opens a ORC file.
3. `batchReader.initBatch` may take a long time to alloc memory in some 
environment and cause errors.
4. `Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
iter.close()))`

This PR moves 4 before 2 and 3. To sum up, the new sequence is 1 -> 4 -> 2 -> 3.

## How was this patch tested?

Manual. The following test case makes OOM intentionally to cause leaked 
filesystem connection in the current code base. With this patch, leakage 
doesn't occurs.

```scala
  // This should be tested manually because it raises OOM intentionally
  // in order to cause `Leaked filesystem connection`.
  test("SPARK-23399 Register a task completion listener first for 
OrcColumnarBatchReader") {
withSQLConf(SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE.key -> 
s"${Int.MaxValue}") {
  withTempDir { dir =>
val basePath = dir.getCanonicalPath
Seq(0).toDF("a").write.format("orc").save(new Path(basePath, 
"first").toString)
Seq(1).toDF("a").write.format("orc").save(new Path(basePath, 
"second").toString)
val df = spark.read.orc(
  new Path(basePath, "first").toString,
  new Path(basePath, "second").toString)
val e = intercept[SparkException] {
  df.collect()
}
assert(e.getCause.isInstanceOf[OutOfMemoryError])
  }
}
  }
```

Author: Dongjoon Hyun 

Closes #20590 from dongjoon-hyun/SPARK-23399.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/357babde
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/357babde
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/357babde

Branch: refs/heads/master
Commit: 357babde5a8eb9710de7016d7ae82dee21fa4ef3
Parents: d6f5e17
Author: Dongjoon Hyun 
Authored: Wed Feb 14 10:55:24 2018 +0800
Committer: Wenchen Fan 
Committed: Wed Feb 14 10:55:24 2018 +0800

--
 .../spark/sql/execution/datasources/orc/OrcFileFormat.scala  | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/357babde/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index dbf3bc6..1de2ca2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -188,6 +188,12 @@ class OrcFileFormat
 if (enableVectorizedReader) {
   val batchReader = new OrcColumnarBatchReader(
 enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, 
capacity)
+  // SPARK-23399 Register a task completion listener first to call 
`close()` in all cases.
+  // There is a possibility that `initialize` and `initBatch` hit some 
errors (like OOM)
+  // after opening a file.
+  val iter = new RecordReaderIterator(batchReader)
+  Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
iter.close()))
+
   batchReader.initialize(fileSplit, taskAttemptContext)
   batchReader.initBatch(
 reader.getSchema,
@@ -196,8 +202,6 @@ class OrcFileFormat
 partitionSchema,
 file.partitionValues)
 
-  val iter = new RecordReaderIterator(batchReader)
-  Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
iter.close()))
   iter.asInstanceOf[Iterator[InternalRow]]
 } else {
   val orcRecordReader = new OrcInputFormat[OrcStruct]


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org