sandip-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1422984743


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala:
##########
@@ -750,7 +752,20 @@ object StaxXmlParser {
         throw QueryExecutionErrors.endOfStreamError()
       }
       val curRecord = convert(nextRecord.get)
-      nextRecord = xmlTokenizer.next()
+      try {
+        nextRecord = xmlTokenizer.next()
+      } catch {
+        case _: FileNotFoundException if options.ignoreMissingFiles =>

Review Comment:
   Move this try..catch into XmlTokenizer.next



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2178,4 +2207,108 @@ class XmlSuite extends QueryTest with 
SharedSparkSession {
     )
     testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> 
"true"))
   }
+
+  test("SPARK-46248: Enabling/disabling 
ignoreCorruptFiles/ignoreMissingFiles") {
+    withCorruptFile(inputFile => {
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", false)
+            .xml(inputFile.toURI.toString)
+            .collect()
+        }
+        assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException])
+        assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end 
of input stream")
+        val e2 = intercept[SparkException] {
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", true)
+            .xml(inputFile.toURI.toString)
+            .collect()
+        }
+        assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException])
+        assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end 
of input stream")
+      }
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+        spark.read
+          .option("rowTag", "ROW")
+          .option("multiLine", false)
+          .xml(inputFile.toURI.toString)
+          .collect()
+        assert(
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", true)
+            .xml(inputFile.toURI.toString)
+            .collect()
+            .isEmpty
+        )
+      }
+    })
+    withTempPath { dir =>
+      import org.apache.hadoop.fs.Path
+      val xmlPath = new Path(dir.getCanonicalPath, "xml")
+      val fs = xmlPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+      sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString)
+      val df = spark.read.option("rowTag", "ROW").option("multiLine", 
true).xml(xmlPath.toString)
+      fs.delete(xmlPath, true)
+      withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          df.collect()
+        }
+        assert(e.getCause.isInstanceOf[SparkFileNotFoundException])
+        assert(e.getCause.getMessage.contains(".xml does not exist"))
+      }
+
+      sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString)
+      val df2 = spark.read.option("rowTag", "ROW").option("multiLine", 
true).xml(xmlPath.toString)
+      fs.delete(xmlPath, true)
+      withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
+        assert(df2.collect().isEmpty)
+      }
+    }
+  }
+
+  test("SPARK-46248: Read from a corrupted compressed file") {
+    withTempDir { dir =>
+      val format = "xml"
+      val numRecords = 10000
+      // create data
+      val data =
+        spark.sparkContext.parallelize(
+          (0 until numRecords).map(i => Row(i.toString, (i * 2).toString)))
+      val schema = buildSchema(field("a1"), field("a2"))
+      val df = spark.createDataFrame(data, schema)
+
+      df.coalesce(4)
+        .write
+        .mode(SaveMode.Overwrite)
+        .format(format)
+        .option("compression", "gZiP")
+        .option("rowTag", "row")
+        .save(dir.getCanonicalPath)
+
+      withCorruptedFile(dir) { corruptedDir =>
+        withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+          val dfCorrupted = spark.read
+            .format(format)
+            .option("multiline", "true")
+            .option("compression", "gzip")
+            .option("rowTag", "row")
+            .load(corruptedDir.getCanonicalPath)
+          assert(!dfCorrupted.isEmpty)

Review Comment:
   ```suggestion
             assert(dfCorrupted.collect().length > 100)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to