sandip-db commented on code in PR #54647:
URL: https://github.com/apache/spark/pull/54647#discussion_r2899242613


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala:
##########
@@ -39,6 +42,34 @@ abstract class ParquetFileFormatSuite
 
   override protected def dataSourceFormat = "parquet"
 
+  private def checkCannotReadFooterError(body: => Unit): Unit = {
+    checkErrorMatchPVals(
+      exception = intercept[SparkException] { body 
}.getCause.asInstanceOf[SparkException],
+      condition = "FAILED_READ_FILE.CANNOT_READ_FILE_FOOTER",

Review Comment:
   Maintain the previous error for ignoreMissingFiles = false



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:
##########
@@ -515,13 +519,20 @@ object ParquetFileFormat extends Logging {
         Some(new Footer(currentFile.getPath,
           ParquetFooterReader.readFooter(
             HadoopInputFile.fromStatus(currentFile, conf), SKIP_ROW_GROUPS)))
-      } catch { case e: RuntimeException =>
-        if (ignoreCorruptFiles) {
+      } catch {
+        case e: Exception
+            if 
ExceptionUtils.getThrowables(e).exists(_.isInstanceOf[FileNotFoundException]) =>
+          if (ignoreMissingFiles) {
+            logWarning(log"Skipped missing file: ${MDC(PATH, currentFile)}", e)
+            None
+          } else {
+            throw 
QueryExecutionErrors.cannotReadFooterForFileError(currentFile.getPath, e)
+          }

Review Comment:
   ```suggestion
          case e: Exception if ignoreMissingFiles &&
             
ExceptionUtils.getThrowables(e).exists(_.isInstanceOf[FileNotFoundException]) =>
               logWarning(log"Skipped missing file: ${MDC(PATH, currentFile)}", 
e)
               None
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala:
##########
@@ -75,6 +106,81 @@ abstract class ParquetFileFormatSuite
     )
   }
 
+  test("SPARK-55857: read parquet footers in parallel - ignoreMissingFiles") {

Review Comment:
   Check test("Enabling/disabling ignoreCorruptFiles") in ParquetQuerySuite. 
Add a similar test with both conf and option in ParquetQuerySuite for 
ignoreMissingFiles.
   Same for OrcQuerySuite.scala



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala:
##########
@@ -461,6 +461,45 @@ abstract class OrcSuite
     }
   }
 
+  test("SPARK-55857: test schema merging with missing files") {
+    withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> orcImp) {
+      withTempDir { dir =>
+        val fs = FileSystem.get(spark.sessionState.newHadoopConf())
+        val basePath = dir.getCanonicalPath
+        val path1 = new Path(basePath, "first")
+        val path2 = new Path(basePath, "second")
+        spark.range(0, 10).toDF("a").coalesce(1).write.orc(path1.toString)
+        spark.range(0, 10).toDF("b").coalesce(1).write.orc(path2.toString)
+
+        // Collect FileStatuses before deleting, to simulate the race 
condition where a file
+        // is listed for schema inference but then disappears before it can be 
read.
+        // Filter to actual ORC data files (exclude _SUCCESS and directories).
+        val allStatuses = Seq(fs.listStatus(path1), 
fs.listStatus(path2)).flatten
+        val fileStatuses = allStatuses.filter(s => s.isFile && 
!s.getPath.getName.startsWith("_"))
+        val deletedFile = fileStatuses.head
+        fs.delete(deletedFile.getPath, false)
+
+        val hadoopConf = spark.sessionState.newHadoopConf()
+
+        // ignoreMissingFiles=true: skips the missing file, returns schema 
from remaining files
+        val schemas = OrcUtils.readOrcSchemasInParallel(
+          fileStatuses, hadoopConf, ignoreCorruptFiles = false, 
ignoreMissingFiles = true)
+        assert(schemas.nonEmpty)
+
+        // ignoreMissingFiles=false: throws on the missing file.
+        // ThreadUtils.parmap wraps the exception one level deep, so unwrap 
via getCause.
+        checkErrorMatchPVals(
+          exception = intercept[SparkException] {
+            OrcUtils.readOrcSchemasInParallel(
+              fileStatuses, hadoopConf, ignoreCorruptFiles = false, 
ignoreMissingFiles = false)
+          }.getCause.asInstanceOf[SparkException],
+          condition = "FAILED_READ_FILE.CANNOT_READ_FILE_FOOTER",

Review Comment:
   Shouldn't it be `FileNotFoundException`? Maintain the previous error for 
ignoreMissingFiles = false



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala:
##########
@@ -86,6 +87,13 @@ object OrcUtils extends Logging {
         Some(schema)
       }
     } catch {
+      case e: FileNotFoundException =>
+        if (ignoreMissingFiles) {
+          logWarning(log"Skipped missing file: ${MDC(PATH, file)}", e)
+          None
+        } else {
+          throw QueryExecutionErrors.cannotReadFooterForFileError(file, e)
+        }

Review Comment:
   ```suggestion
         case e: Exception if ignoreMissingFiles &&
             
ExceptionUtils.getThrowables(e).exists(_.isInstanceOf[FileNotFoundException]) =>
           logWarning(log"Skipped missing file: ${MDC(PATH, file)}", e)
           None
   ```



##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala:
##########
@@ -76,6 +77,13 @@ private[hive] object OrcFileOperator extends Logging {
       val reader = try {
         Some(OrcFile.createReader(fs, path))
       } catch {
+        case e: FileNotFoundException =>
+          if (ignoreMissingFiles) {
+            logWarning(log"Skipped missing file: ${MDC(PATH, path)}", e)
+            None
+          } else {
+            throw QueryExecutionErrors.cannotReadFooterForFileError(path, e)
+          }

Review Comment:
   ```suggestion
         case e: Exception if ignoreMissingFiles &&
             
ExceptionUtils.getThrowables(e).exists(_.isInstanceOf[FileNotFoundException]) =>
               logWarning(log"Skipped missing file: ${MDC(PATH, path)}", e)
               None
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to