cloud-fan commented on a change in pull request #34369:
URL: https://github.com/apache/spark/pull/34369#discussion_r734587904
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
##########
@@ -327,18 +327,31 @@ class ParquetFileFormat
int96RebaseMode.toString,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
+ // SPARK-37089: We cannot register a task completion listener to close
this iterator here
+ // because downstream exec nodes have already registered their
listeners. Since listeners
+ // are executed in reverse order of registration, a listener
registered here would close the
+ // iterator while downstream exec nodes are still running. When
off-heap column vectors are
+ // enabled, this can cause a use-after-free bug leading to a segfault.
+ //
+ // Instead, we use FileScanRDD's task completion listener to close
this iterator.
val iter = new RecordReaderIterator(vectorizedReader)
- // SPARK-23457 Register a task completion listener before
`initialization`.
- taskContext.foreach(_.addTaskCompletionListener[Unit](_ =>
iter.close()))
- vectorizedReader.initialize(split, hadoopAttemptContext)
- logDebug(s"Appending $partitionSchema ${file.partitionValues}")
- vectorizedReader.initBatch(partitionSchema, file.partitionValues)
- if (returningBatch) {
- vectorizedReader.enableReturningBatches()
- }
+ try {
+ vectorizedReader.initialize(split, hadoopAttemptContext)
+ logDebug(s"Appending $partitionSchema ${file.partitionValues}")
+ vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+ if (returningBatch) {
+ vectorizedReader.enableReturningBatches()
+ }
- // UnsafeRowParquetRecordReader appends the columns internally to
avoid another copy.
- iter.asInstanceOf[Iterator[InternalRow]]
+ // UnsafeRowParquetRecordReader appends the columns internally to
avoid another copy.
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } catch {
+ case e: Throwable =>
+ // SPARK-23457: In case there is an exception in initialization,
close the iterator to
Review comment:
shall we let the caller `FileScanRDD` close the iterator when hitting
errors?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]