This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new ebb5238 [SPARK-37089][SQL] Do not register ParquetFileFormat completion listener lazily ebb5238 is described below commit ebb52383ea460c82d068d579dd180c223d523055 Author: Ankur Dave <ankurd...@gmail.com> AuthorDate: Mon Oct 25 10:01:07 2021 +0900 [SPARK-37089][SQL] Do not register ParquetFileFormat completion listener lazily ### What changes were proposed in this pull request? The previous PR https://github.com/apache/spark/pull/34245 assumed task completion listeners are registered bottom-up. `ParquetFileFormat#buildReaderWithPartitionValues()` violates this assumption by registering a task completion listener to close its output iterator lazily. Since task completion listeners are executed in reverse order of registration, this listener always runs before other listeners. When the downstream operator contains a Python UDF and the off-heap vectorized Parqu [...] The fix is to close the output iterator using FileScanRDD's task completion listener. ### Why are the changes needed? Without this PR, the Python tests introduced in https://github.com/apache/spark/pull/34245 are flaky ([see details in thread](https://github.com/apache/spark/pull/34245#issuecomment-948713545)). They intermittently fail with a segfault. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Repeatedly ran one of the Python tests introduced in https://github.com/apache/spark/pull/34245 using the commands below. Previously, the test was flaky and failed after about 50 runs. With this PR, the test has not failed after 1000 runs. ```sh ./build/sbt -Phive clean package && ./build/sbt test:compile seq 1000 | parallel -j 8 --halt now,fail=1 'echo {#}; python/run-tests --testnames pyspark.sql.tests.test_udf' ``` Closes #34369 from ankurdave/SPARK-37089. Authored-by: Ankur Dave <ankurd...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 1fc1d072c31ddac7a7b627e8fceba7929d1c6c7c) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../sql/execution/datasources/FileScanRDD.scala | 34 +++++++++-- .../datasources/RecordReaderIterator.scala | 7 +++ .../datasources/parquet/ParquetFileFormat.scala | 65 ++++++++++++++-------- 3 files changed, 78 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 33aa212..02815b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources -import java.io.{FileNotFoundException, IOException} +import java.io.{Closeable, FileNotFoundException, IOException} import org.apache.parquet.io.ParquetDecodingException @@ -85,6 +85,17 @@ class FileScanRDD( private[this] var currentFile: PartitionedFile = null private[this] var currentIterator: Iterator[Object] = null + private def resetCurrentIterator(): Unit = { + currentIterator match { + case iter: NextIterator[_] => + iter.closeIfNeeded() + case iter: Closeable => + iter.close() + case _ => // do nothing + } + currentIterator = null + } + def hasNext: Boolean = { // Kill the task in case it has been marked as killed. This logic is from // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order @@ -128,15 +139,21 @@ class FileScanRDD( // Sets InputFileBlockHolder for the file block's information InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) + resetCurrentIterator() if (ignoreMissingFiles || ignoreCorruptFiles) { currentIterator = new NextIterator[Object] { // The readFunction may read some bytes before consuming the iterator, e.g., - // vectorized Parquet reader. Here we use lazy val to delay the creation of - // iterator so that we will throw exception in `getNext`. - private lazy val internalIter = readCurrentFile() + // vectorized Parquet reader. Here we use a lazily initialized variable to delay the + // creation of iterator so that we will throw exception in `getNext`. + private var internalIter: Iterator[InternalRow] = null override def getNext(): AnyRef = { try { + // Initialize `internalIter` lazily. + if (internalIter == null) { + internalIter = readCurrentFile() + } + if (internalIter.hasNext) { internalIter.next() } else { @@ -158,7 +175,13 @@ class FileScanRDD( } } - override def close(): Unit = {} + override def close(): Unit = { + internalIter match { + case iter: Closeable => + iter.close() + case _ => // do nothing + } + } } } else { currentIterator = readCurrentFile() @@ -188,6 +211,7 @@ class FileScanRDD( override def close(): Unit = { incTaskInputMetricsBytesRead() InputFileBlockHolder.unset() + resetCurrentIterator() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala index d8e30e6..563337c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala @@ -56,6 +56,13 @@ class RecordReaderIterator[T]( rowReader.getCurrentValue } + override def map[B](f: (T) => B): Iterator[B] with Closeable = + new Iterator[B] with Closeable { + override def hasNext: Boolean = RecordReaderIterator.this.hasNext + override def next(): B = f(RecordReaderIterator.this.next()) + override def close(): Unit = RecordReaderIterator.this.close() + } + override def close(): Unit = { if (rowReader != null) { try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index d3ac077..7cc80a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -327,18 +327,31 @@ class ParquetFileFormat int96RebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, capacity) + // SPARK-37089: We cannot register a task completion listener to close this iterator here + // because downstream exec nodes have already registered their listeners. Since listeners + // are executed in reverse order of registration, a listener registered here would close the + // iterator while downstream exec nodes are still running. When off-heap column vectors are + // enabled, this can cause a use-after-free bug leading to a segfault. + // + // Instead, we use FileScanRDD's task completion listener to close this iterator. val iter = new RecordReaderIterator(vectorizedReader) - // SPARK-23457 Register a task completion listener before `initialization`. - taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) - vectorizedReader.initialize(split, hadoopAttemptContext) - logDebug(s"Appending $partitionSchema ${file.partitionValues}") - vectorizedReader.initBatch(partitionSchema, file.partitionValues) - if (returningBatch) { - vectorizedReader.enableReturningBatches() - } + try { + vectorizedReader.initialize(split, hadoopAttemptContext) + logDebug(s"Appending $partitionSchema ${file.partitionValues}") + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + if (returningBatch) { + vectorizedReader.enableReturningBatches() + } - // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. - iter.asInstanceOf[Iterator[InternalRow]] + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e + } } else { logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns InternalRow @@ -354,19 +367,25 @@ class ParquetFileFormat new ParquetRecordReader[InternalRow](readSupport) } val iter = new RecordReaderIterator[InternalRow](reader) - // SPARK-23457 Register a task completion listener before `initialization`. - taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) - reader.initialize(split, hadoopAttemptContext) - - val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes - val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - - if (partitionSchema.length == 0) { - // There is no partition columns - iter.map(unsafeProjection) - } else { - val joinedRow = new JoinedRow() - iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) + try { + reader.initialize(split, hadoopAttemptContext) + + val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + if (partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) + } else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) + } + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org