This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new ebb5238  [SPARK-37089][SQL] Do not register ParquetFileFormat 
completion listener lazily
ebb5238 is described below

commit ebb52383ea460c82d068d579dd180c223d523055
Author: Ankur Dave <ankurd...@gmail.com>
AuthorDate: Mon Oct 25 10:01:07 2021 +0900

    [SPARK-37089][SQL] Do not register ParquetFileFormat completion listener 
lazily
    
    ### What changes were proposed in this pull request?
    
    The previous PR https://github.com/apache/spark/pull/34245 assumed task 
completion listeners are registered bottom-up. 
`ParquetFileFormat#buildReaderWithPartitionValues()` violates this assumption 
by registering a task completion listener to close its output iterator lazily. 
Since task completion listeners are executed in reverse order of registration, 
this listener always runs before other listeners. When the downstream operator 
contains a Python UDF and the off-heap vectorized Parqu [...]
    
    The fix is to close the output iterator using FileScanRDD's task completion 
listener.
    
    ### Why are the changes needed?
    
    Without this PR, the Python tests introduced in 
https://github.com/apache/spark/pull/34245 are flaky ([see details in 
thread](https://github.com/apache/spark/pull/34245#issuecomment-948713545)). 
They intermittently fail with a segfault.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Repeatedly ran one of the Python tests introduced in 
https://github.com/apache/spark/pull/34245 using the commands below. 
Previously, the test was flaky and failed after about 50 runs. With this PR, 
the test has not failed after 1000 runs.
    
    ```sh
    ./build/sbt -Phive clean package && ./build/sbt test:compile
    seq 1000 | parallel -j 8 --halt now,fail=1 'echo {#}; python/run-tests 
--testnames pyspark.sql.tests.test_udf'
    ```
    
    Closes #34369 from ankurdave/SPARK-37089.
    
    Authored-by: Ankur Dave <ankurd...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 1fc1d072c31ddac7a7b627e8fceba7929d1c6c7c)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../sql/execution/datasources/FileScanRDD.scala    | 34 +++++++++--
 .../datasources/RecordReaderIterator.scala         |  7 +++
 .../datasources/parquet/ParquetFileFormat.scala    | 65 ++++++++++++++--------
 3 files changed, 78 insertions(+), 28 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 33aa212..02815b6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.io.{FileNotFoundException, IOException}
+import java.io.{Closeable, FileNotFoundException, IOException}
 
 import org.apache.parquet.io.ParquetDecodingException
 
@@ -85,6 +85,17 @@ class FileScanRDD(
       private[this] var currentFile: PartitionedFile = null
       private[this] var currentIterator: Iterator[Object] = null
 
+      private def resetCurrentIterator(): Unit = {
+        currentIterator match {
+          case iter: NextIterator[_] =>
+            iter.closeIfNeeded()
+          case iter: Closeable =>
+            iter.close()
+          case _ => // do nothing
+        }
+        currentIterator = null
+      }
+
       def hasNext: Boolean = {
         // Kill the task in case it has been marked as killed. This logic is 
from
         // InterruptibleIterator, but we inline it here instead of wrapping 
the iterator in order
@@ -128,15 +139,21 @@ class FileScanRDD(
           // Sets InputFileBlockHolder for the file block's information
           InputFileBlockHolder.set(currentFile.filePath, currentFile.start, 
currentFile.length)
 
+          resetCurrentIterator()
           if (ignoreMissingFiles || ignoreCorruptFiles) {
             currentIterator = new NextIterator[Object] {
               // The readFunction may read some bytes before consuming the 
iterator, e.g.,
-              // vectorized Parquet reader. Here we use lazy val to delay the 
creation of
-              // iterator so that we will throw exception in `getNext`.
-              private lazy val internalIter = readCurrentFile()
+              // vectorized Parquet reader. Here we use a lazily initialized 
variable to delay the
+              // creation of iterator so that we will throw exception in 
`getNext`.
+              private var internalIter: Iterator[InternalRow] = null
 
               override def getNext(): AnyRef = {
                 try {
+                  // Initialize `internalIter` lazily.
+                  if (internalIter == null) {
+                    internalIter = readCurrentFile()
+                  }
+
                   if (internalIter.hasNext) {
                     internalIter.next()
                   } else {
@@ -158,7 +175,13 @@ class FileScanRDD(
                 }
               }
 
-              override def close(): Unit = {}
+              override def close(): Unit = {
+                internalIter match {
+                  case iter: Closeable =>
+                    iter.close()
+                  case _ => // do nothing
+                }
+              }
             }
           } else {
             currentIterator = readCurrentFile()
@@ -188,6 +211,7 @@ class FileScanRDD(
       override def close(): Unit = {
         incTaskInputMetricsBytesRead()
         InputFileBlockHolder.unset()
+        resetCurrentIterator()
       }
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
index d8e30e6..563337c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
@@ -56,6 +56,13 @@ class RecordReaderIterator[T](
     rowReader.getCurrentValue
   }
 
+  override def map[B](f: (T) => B): Iterator[B] with Closeable =
+    new Iterator[B] with Closeable {
+      override def hasNext: Boolean = RecordReaderIterator.this.hasNext
+      override def next(): B = f(RecordReaderIterator.this.next())
+      override def close(): Unit = RecordReaderIterator.this.close()
+    }
+
   override def close(): Unit = {
     if (rowReader != null) {
       try {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index d3ac077..7cc80a1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -327,18 +327,31 @@ class ParquetFileFormat
           int96RebaseMode.toString,
           enableOffHeapColumnVector && taskContext.isDefined,
           capacity)
+        // SPARK-37089: We cannot register a task completion listener to close 
this iterator here
+        // because downstream exec nodes have already registered their 
listeners. Since listeners
+        // are executed in reverse order of registration, a listener 
registered here would close the
+        // iterator while downstream exec nodes are still running. When 
off-heap column vectors are
+        // enabled, this can cause a use-after-free bug leading to a segfault.
+        //
+        // Instead, we use FileScanRDD's task completion listener to close 
this iterator.
         val iter = new RecordReaderIterator(vectorizedReader)
-        // SPARK-23457 Register a task completion listener before 
`initialization`.
-        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => 
iter.close()))
-        vectorizedReader.initialize(split, hadoopAttemptContext)
-        logDebug(s"Appending $partitionSchema ${file.partitionValues}")
-        vectorizedReader.initBatch(partitionSchema, file.partitionValues)
-        if (returningBatch) {
-          vectorizedReader.enableReturningBatches()
-        }
+        try {
+          vectorizedReader.initialize(split, hadoopAttemptContext)
+          logDebug(s"Appending $partitionSchema ${file.partitionValues}")
+          vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+          if (returningBatch) {
+            vectorizedReader.enableReturningBatches()
+          }
 
-        // UnsafeRowParquetRecordReader appends the columns internally to 
avoid another copy.
-        iter.asInstanceOf[Iterator[InternalRow]]
+          // UnsafeRowParquetRecordReader appends the columns internally to 
avoid another copy.
+          iter.asInstanceOf[Iterator[InternalRow]]
+        } catch {
+          case e: Throwable =>
+            // SPARK-23457: In case there is an exception in initialization, 
close the iterator to
+            // avoid leaking resources.
+            iter.close()
+            throw e
+        }
       } else {
         logDebug(s"Falling back to parquet-mr")
         // ParquetRecordReader returns InternalRow
@@ -354,19 +367,25 @@ class ParquetFileFormat
           new ParquetRecordReader[InternalRow](readSupport)
         }
         val iter = new RecordReaderIterator[InternalRow](reader)
-        // SPARK-23457 Register a task completion listener before 
`initialization`.
-        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => 
iter.close()))
-        reader.initialize(split, hadoopAttemptContext)
-
-        val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
-        val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
-
-        if (partitionSchema.length == 0) {
-          // There is no partition columns
-          iter.map(unsafeProjection)
-        } else {
-          val joinedRow = new JoinedRow()
-          iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+        try {
+          reader.initialize(split, hadoopAttemptContext)
+
+          val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
+          val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
+
+          if (partitionSchema.length == 0) {
+            // There is no partition columns
+            iter.map(unsafeProjection)
+          } else {
+            val joinedRow = new JoinedRow()
+            iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+          }
+        } catch {
+          case e: Throwable =>
+            // SPARK-23457: In case there is an exception in initialization, 
close the iterator to
+            // avoid leaking resources.
+            iter.close()
+            throw e
         }
       }
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to