ankurdave commented on a change in pull request #34369:
URL: https://github.com/apache/spark/pull/34369#discussion_r734608343



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
##########
@@ -327,18 +327,31 @@ class ParquetFileFormat
           int96RebaseMode.toString,
           enableOffHeapColumnVector && taskContext.isDefined,
           capacity)
+        // SPARK-37089: We cannot register a task completion listener to close 
this iterator here
+        // because downstream exec nodes have already registered their 
listeners. Since listeners
+        // are executed in reverse order of registration, a listener 
registered here would close the
+        // iterator while downstream exec nodes are still running. When 
off-heap column vectors are
+        // enabled, this can cause a use-after-free bug leading to a segfault.
+        //
+        // Instead, we use FileScanRDD's task completion listener to close 
this iterator.
         val iter = new RecordReaderIterator(vectorizedReader)
-        // SPARK-23457 Register a task completion listener before 
`initialization`.
-        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => 
iter.close()))
-        vectorizedReader.initialize(split, hadoopAttemptContext)
-        logDebug(s"Appending $partitionSchema ${file.partitionValues}")
-        vectorizedReader.initBatch(partitionSchema, file.partitionValues)
-        if (returningBatch) {
-          vectorizedReader.enableReturningBatches()
-        }
+        try {
+          vectorizedReader.initialize(split, hadoopAttemptContext)
+          logDebug(s"Appending $partitionSchema ${file.partitionValues}")
+          vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+          if (returningBatch) {
+            vectorizedReader.enableReturningBatches()
+          }
 
-        // UnsafeRowParquetRecordReader appends the columns internally to 
avoid another copy.
-        iter.asInstanceOf[Iterator[InternalRow]]
+          // UnsafeRowParquetRecordReader appends the columns internally to 
avoid another copy.
+          iter.asInstanceOf[Iterator[InternalRow]]
+        } catch {
+          case e: Throwable =>
+            // SPARK-23457: In case there is an exception in initialization, 
close the iterator to

Review comment:
       In general, I think FileScanRDD does close the iterator when hitting 
exceptions, because it uses a task completion listener to do so. The only case 
where it will not close the iterator is when the exception prevents FileScanRDD 
from getting a reference to the iterator, as is the case here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to