yabola commented on code in PR #39950:
URL: https://github.com/apache/spark/pull/39950#discussion_r1121979988


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:
##########
@@ -296,41 +309,45 @@ class ParquetFileFormat
             throw e
         }
       } else {
-        logDebug(s"Falling back to parquet-mr")
-        // ParquetRecordReader returns InternalRow
-        val readSupport = new ParquetReadSupport(
-          convertTz,
-          enableVectorizedReader = false,
-          datetimeRebaseSpec,
-          int96RebaseSpec)
-        val reader = if (pushed.isDefined && enableRecordFilter) {
-          val parquetFilter = FilterCompat.get(pushed.get, null)
-          new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
-        } else {
-          new ParquetRecordReader[InternalRow](readSupport)
-        }
-        val readerWithRowIndexes = 
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader,
-            requiredSchema)
-        val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes)
         try {
-          readerWithRowIndexes.initialize(split, hadoopAttemptContext)
-
-          val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
-          val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
-
-          if (partitionSchema.length == 0) {
-            // There is no partition columns
-            iter.map(unsafeProjection)
+          logDebug(s"Falling back to parquet-mr")
+          // ParquetRecordReader returns InternalRow
+          val readSupport = new ParquetReadSupport(
+            convertTz,
+            enableVectorizedReader = false,
+            datetimeRebaseSpec,
+            int96RebaseSpec)
+          val reader = if (pushed.isDefined && enableRecordFilter) {
+            val parquetFilter = FilterCompat.get(pushed.get, null)
+            new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
           } else {
-            val joinedRow = new JoinedRow()
-            iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+            new ParquetRecordReader[InternalRow](readSupport)
           }
-        } catch {
-          case e: Throwable =>
-            // SPARK-23457: In case there is an exception in initialization, 
close the iterator to
-            // avoid leaking resources.
-            iter.close()
-            throw e
+          val readerWithRowIndexes = 
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader,
+            requiredSchema)
+          val iter = new 
RecordReaderIterator[InternalRow](readerWithRowIndexes)
+          try {
+            readerWithRowIndexes.initialize(split, hadoopAttemptContext)
+
+            val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
+            val unsafeProjection = 
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+
+            if (partitionSchema.length == 0) {
+              // There is no partition columns
+              iter.map(unsafeProjection)
+            } else {
+              val joinedRow = new JoinedRow()
+              iter.map(d => unsafeProjection(joinedRow(d, 
file.partitionValues)))
+            }
+          } catch {
+            case e: Throwable =>
+              // SPARK-23457: In case there is an exception in initialization, 
close the iterator to
+              // avoid leaking resources.
+              iter.close()
+              throw e
+          }
+        } finally {
+          parquetReader.close()

Review Comment:
   I haven't change the code logic, just add to close parquetReader.
   Before, if the non-vectorization code path will close the reader after 
reading `footerFileMetaData` , it is consistent here. Please see
   [the original implementation 
code](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java#L53)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to