Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2024-01-07 Thread via GitHub


HyukjinKwon closed pull request #44163: [SPARK-46248][SQL] XML: Support for 
ignoreCorruptFiles and ignoreMissingFiles options
URL: https://github.com/apache/spark/pull/44163


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2024-01-07 Thread via GitHub


HyukjinKwon commented on PR #44163:
URL: https://github.com/apache/spark/pull/44163#issuecomment-1880226508

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2024-01-07 Thread via GitHub


sandip-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1443960176


##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/TestXmlData.scala:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.xml
+
+import java.io.{File, RandomAccessFile}
+
+import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
+
+private[xml] trait TestXmlData {
+  protected def spark: SparkSession
+
+  def sampledTestData: Dataset[String] = {
+spark
+  .range(0, 100, 1)
+  .map { index =>
+val predefinedSample = Set[Long](3, 18, 20, 24, 50, 60, 87, 99)
+if (predefinedSample.contains(index)) {
+  index.toString
+} else {
+  (index.toDouble + 0.1).toString
+}
+  }(Encoders.STRING)
+  }
+
+  def withCorruptedFile(dir: File, format: String = "gz", numBytesToCorrupt: 
Int = 50)(
+  f: File => Unit): Unit = {
+// find the targeted files and corrupt the first one
+val files = dir.listFiles().filter(file => file.isFile && 
file.getName.endsWith(format))
+val raf = new RandomAccessFile(files.head.getPath, "rw")
+
+// disable checksum verification
+import org.apache.hadoop.fs.Path
+val fs = new 
Path(dir.getPath).getFileSystem(spark.sessionState.newHadoopConf())
+fs.setVerifyChecksum(false)
+// delete crc files
+val crcFiles = dir.listFiles
+  .filter(file => file.isFile && file.getName.endsWith("crc"))
+crcFiles.foreach { file =>
+  assert(file.exists())
+  file.delete()
+  assert(!file.exists())
+}
+fs.close()

Review Comment:
   remove



##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/TestXmlData.scala:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.xml
+
+import java.io.{File, RandomAccessFile}
+
+import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
+
+private[xml] trait TestXmlData {
+  protected def spark: SparkSession
+
+  def sampledTestData: Dataset[String] = {
+spark
+  .range(0, 100, 1)
+  .map { index =>
+val predefinedSample = Set[Long](3, 18, 20, 24, 50, 60, 87, 99)
+if (predefinedSample.contains(index)) {
+  index.toString
+} else {
+  (index.toDouble + 0.1).toString
+}
+  }(Encoders.STRING)
+  }
+
+  def withCorruptedFile(dir: File, format: String = "gz", numBytesToCorrupt: 
Int = 50)(
+  f: File => Unit): Unit = {
+// find the targeted files and corrupt the first one
+val files = dir.listFiles().filter(file => file.isFile && 
file.getName.endsWith(format))
+val raf = new RandomAccessFile(files.head.getPath, "rw")
+
+// disable checksum verification
+import org.apache.hadoop.fs.Path
+val fs = new 
Path(dir.getPath).getFileSystem(spark.sessionState.newHadoopConf())
+fs.setVerifyChecksum(false)
+// delete crc files
+val crcFiles = dir.listFiles
+  .filter(file => file.isFile && file.getName.endsWith("crc"))
+crcFiles.foreach { file =>
+  assert(file.exists())
+  file.delete()
+  assert(!file.exists())
+}
+fs.close()
+
+// corrupt the file
+val fileSize = raf.length()
+// avoid the last few bytes as it might contain crc
+raf.seek(fileSize - numBytesToCorrupt - 100)

Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-21 Thread via GitHub


sandip-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1434601695


##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##
@@ -2371,4 +2400,104 @@ class XmlSuite extends QueryTest with 
SharedSparkSession {
   }
 }
   }
+
+  test("SPARK-46248: Enabling/disabling 
ignoreCorruptFiles/ignoreMissingFiles") {
+withCorruptFile(inputFile => {
+  withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+val e = intercept[SparkException] {
+  spark.read
+.option("rowTag", "ROW")
+.option("multiLine", false)
+.xml(inputFile.toURI.toString)
+.collect()
+}
+assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException])
+assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end 
of input stream")
+val e2 = intercept[SparkException] {
+  spark.read
+.option("rowTag", "ROW")
+.option("multiLine", true)
+.xml(inputFile.toURI.toString)
+.collect()
+}
+assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException])
+assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end 
of input stream")
+  }
+  withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+val result = spark.read
+   .option("rowTag", "ROW")
+   .option("multiLine", false)
+   .xml(inputFile.toURI.toString)
+   .collect()
+assert(result.isEmpty)
+  }
+})
+withTempPath { dir =>
+  import org.apache.hadoop.fs.Path
+  val xmlPath = new Path(dir.getCanonicalPath, "xml")
+  val fs = xmlPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+  sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString)
+  val df = spark.read.option("rowTag", "ROW").option("multiLine", 
true).xml(xmlPath.toString)
+  fs.delete(xmlPath, true)
+  withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
+val e = intercept[SparkException] {
+  df.collect()
+}
+assert(e.getCause.isInstanceOf[SparkFileNotFoundException])
+assert(e.getCause.getMessage.contains(".xml does not exist"))
+  }
+
+  sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString)
+  val df2 = spark.read.option("rowTag", "ROW").option("multiLine", 
true).xml(xmlPath.toString)
+  fs.delete(xmlPath, true)
+  withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
+assert(df2.collect().isEmpty)
+  }
+}
+  }
+
+  test("SPARK-46248: Read from a corrupted compressed file") {
+withTempDir { dir =>
+  val format = "xml"
+  val numRecords = 1
+  // create data
+  val data =
+spark.sparkContext.parallelize(
+  (0 until numRecords).map(i => Row(i.toString, (i * 2).toString)))
+  val schema = buildSchema(field("a1"), field("a2"))

Review Comment:
   ```suggestion
 val schema = buildSchema(
   field("a1", LongType),
   field("a2", LongType))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-21 Thread via GitHub


HyukjinKwon commented on PR #44163:
URL: https://github.com/apache/spark/pull/44163#issuecomment-1867064220

   I think the test failure is related ... dunno how and why though. They 
aren't known as flaky ones, and the tests fail after running XML tests. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-21 Thread via GitHub


shujingyang-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1434431560


##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##
@@ -2371,4 +2400,108 @@ class XmlSuite extends QueryTest with 
SharedSparkSession {
   }
 }
   }
+
+  test("SPARK-46248: Enabling/disabling 
ignoreCorruptFiles/ignoreMissingFiles") {
+withCorruptFile(inputFile => {
+  withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+val e = intercept[SparkException] {
+  spark.read
+.option("rowTag", "ROW")
+.option("multiLine", false)
+.xml(inputFile.toURI.toString)
+.collect()
+}
+assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException])
+assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end 
of input stream")
+val e2 = intercept[SparkException] {
+  spark.read
+.option("rowTag", "ROW")
+.option("multiLine", true)
+.xml(inputFile.toURI.toString)
+.collect()
+}
+assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException])
+assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end 
of input stream")

Review Comment:
   These two cases have different multiline options



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-21 Thread via GitHub


sandip-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1434278718


##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##
@@ -2371,4 +2400,108 @@ class XmlSuite extends QueryTest with 
SharedSparkSession {
   }
 }
   }
+
+  test("SPARK-46248: Enabling/disabling 
ignoreCorruptFiles/ignoreMissingFiles") {
+withCorruptFile(inputFile => {
+  withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+val e = intercept[SparkException] {
+  spark.read
+.option("rowTag", "ROW")
+.option("multiLine", false)
+.xml(inputFile.toURI.toString)
+.collect()
+}
+assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException])
+assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end 
of input stream")
+val e2 = intercept[SparkException] {
+  spark.read
+.option("rowTag", "ROW")
+.option("multiLine", true)
+.xml(inputFile.toURI.toString)
+.collect()
+}
+assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException])
+assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end 
of input stream")
+  }
+  withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+spark.read
+  .option("rowTag", "ROW")
+  .option("multiLine", false)
+  .xml(inputFile.toURI.toString)
+  .collect()
+assert(
+  spark.read
+.option("rowTag", "ROW")
+.option("multiLine", true)
+.xml(inputFile.toURI.toString)
+.collect()
+.isEmpty
+)
+  }
+})
+withTempPath { dir =>
+  import org.apache.hadoop.fs.Path
+  val xmlPath = new Path(dir.getCanonicalPath, "xml")
+  val fs = xmlPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+  sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString)
+  val df = spark.read.option("rowTag", "ROW").option("multiLine", 
true).xml(xmlPath.toString)
+  fs.delete(xmlPath, true)
+  withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
+val e = intercept[SparkException] {
+  df.collect()
+}
+assert(e.getCause.isInstanceOf[SparkFileNotFoundException])
+assert(e.getCause.getMessage.contains(".xml does not exist"))
+  }
+
+  sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString)
+  val df2 = spark.read.option("rowTag", "ROW").option("multiLine", 
true).xml(xmlPath.toString)
+  fs.delete(xmlPath, true)
+  withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
+assert(df2.collect().isEmpty)
+  }
+}
+  }
+
+  test("SPARK-46248: Read from a corrupted compressed file") {
+withTempDir { dir =>
+  val format = "xml"
+  val numRecords = 1
+  // create data
+  val data =
+spark.sparkContext.parallelize(
+  (0 until numRecords).map(i => Row(i.toString, (i * 2).toString)))
+  val schema = buildSchema(field("a1"), field("a2"))
+  val df = spark.createDataFrame(data, schema)
+
+  df.coalesce(4)
+.write
+.mode(SaveMode.Overwrite)
+.format(format)
+.option("compression", "gZiP")
+.option("rowTag", "row")
+.save(dir.getCanonicalPath)
+
+  withCorruptedFile(dir) { corruptedDir =>
+withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+  val dfCorrupted = spark.read
+.format(format)
+.option("multiline", "true")
+.option("compression", "gzip")
+.option("rowTag", "row")
+.load(corruptedDir.getCanonicalPath)
+  assert(dfCorrupted.collect().length > 100)

Review Comment:
   ```suggestion
 val results = dfCorrupted.collect()
 assert(results(1) === Row(1, 2))
 assert(results.length > 100)
   ```



##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##
@@ -2371,4 +2400,108 @@ class XmlSuite extends QueryTest with 
SharedSparkSession {
   }
 }
   }
+
+  test("SPARK-46248: Enabling/disabling 
ignoreCorruptFiles/ignoreMissingFiles") {
+withCorruptFile(inputFile => {
+  withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+val e = intercept[SparkException] {
+  spark.read
+.option("rowTag", "ROW")
+.option("multiLine", false)
+.xml(inputFile.toURI.toString)
+.collect()
+}
+assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException])
+assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end 
of input stream")
+val e2 = intercept[SparkException] {
+  spark.read
+.option("rowTag", "ROW")
+.option("multiLine", true)
+ 

Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-20 Thread via GitHub


HyukjinKwon commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1433676674


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala:
##
@@ -130,10 +147,28 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: 
Boolean)
   parser.close()

Review Comment:
   didn't take a super close look but should we maybe close the `parser` with 
`finally`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-12 Thread via GitHub


HyukjinKwon commented on PR #44163:
URL: https://github.com/apache/spark/pull/44163#issuecomment-1853261270

   Mind retriggering https://github.com/shujingyang-db/spark/runs/19583242928 
please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-12 Thread via GitHub


shujingyang-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1424733989


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala:
##
@@ -604,6 +606,24 @@ class XmlTokenizer(
 return Some(str)
   }
 } catch {

Review Comment:
   Thanks for putting this down! 



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala:
##
@@ -604,6 +606,24 @@ class XmlTokenizer(
 return Some(str)
   }
 } catch {

Review Comment:
   Thanks for putting this down! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-11 Thread via GitHub


sandip-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1423270340


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala:
##
@@ -604,6 +606,24 @@ class XmlTokenizer(
 return Some(str)
   }
 } catch {

Review Comment:
   Consider this to avoid reader.close repetitions:
   
   ```
 def next(): Option[String] = {
   var nextString: Option[String] = None
   try {
 if (readUntilStartElement()) {
   buffer.append(currentStartTag)
   // Don't check whether the end element was found. Even if not, 
return everything
   // that was read, which will invariably cause a parse error later
   readUntilEndElement(currentStartTag.endsWith(">"))
   nextString = buffer.toString()
   buffer = new StringBuilder()
 }
   } catch {
 case e: FileNotFoundException if options.ignoreMissingFiles =>
   logWarning(
 "Skipping the rest of" +
 " the content in the missing file during schema inference",
 e)
 case NonFatal(e) =>
   ExceptionUtils.getRootCause(e) match {
 case _: RuntimeException | _: IOException if 
options.ignoreCorruptFiles =>
   logWarning(
 "Skipping the rest of" +
 " the content in the corrupted file during schema inference",
 e
   )
   }
   } finally {
 if (var == None) {
   reader.close()
   reader = null
 }
   }
   nextStr
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-11 Thread via GitHub


sandip-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1422984743


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala:
##
@@ -750,7 +752,20 @@ object StaxXmlParser {
 throw QueryExecutionErrors.endOfStreamError()
   }
   val curRecord = convert(nextRecord.get)
-  nextRecord = xmlTokenizer.next()
+  try {
+nextRecord = xmlTokenizer.next()
+  } catch {
+case _: FileNotFoundException if options.ignoreMissingFiles =>

Review Comment:
   Move this try..catch into XmlTokenizer.next



##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##
@@ -2178,4 +2207,108 @@ class XmlSuite extends QueryTest with 
SharedSparkSession {
 )
 testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> 
"true"))
   }
+
+  test("SPARK-46248: Enabling/disabling 
ignoreCorruptFiles/ignoreMissingFiles") {
+withCorruptFile(inputFile => {
+  withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+val e = intercept[SparkException] {
+  spark.read
+.option("rowTag", "ROW")
+.option("multiLine", false)
+.xml(inputFile.toURI.toString)
+.collect()
+}
+assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException])
+assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end 
of input stream")
+val e2 = intercept[SparkException] {
+  spark.read
+.option("rowTag", "ROW")
+.option("multiLine", true)
+.xml(inputFile.toURI.toString)
+.collect()
+}
+assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException])
+assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end 
of input stream")
+  }
+  withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+spark.read
+  .option("rowTag", "ROW")
+  .option("multiLine", false)
+  .xml(inputFile.toURI.toString)
+  .collect()
+assert(
+  spark.read
+.option("rowTag", "ROW")
+.option("multiLine", true)
+.xml(inputFile.toURI.toString)
+.collect()
+.isEmpty
+)
+  }
+})
+withTempPath { dir =>
+  import org.apache.hadoop.fs.Path
+  val xmlPath = new Path(dir.getCanonicalPath, "xml")
+  val fs = xmlPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+  sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString)
+  val df = spark.read.option("rowTag", "ROW").option("multiLine", 
true).xml(xmlPath.toString)
+  fs.delete(xmlPath, true)
+  withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
+val e = intercept[SparkException] {
+  df.collect()
+}
+assert(e.getCause.isInstanceOf[SparkFileNotFoundException])
+assert(e.getCause.getMessage.contains(".xml does not exist"))
+  }
+
+  sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString)
+  val df2 = spark.read.option("rowTag", "ROW").option("multiLine", 
true).xml(xmlPath.toString)
+  fs.delete(xmlPath, true)
+  withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
+assert(df2.collect().isEmpty)
+  }
+}
+  }
+
+  test("SPARK-46248: Read from a corrupted compressed file") {
+withTempDir { dir =>
+  val format = "xml"
+  val numRecords = 1
+  // create data
+  val data =
+spark.sparkContext.parallelize(
+  (0 until numRecords).map(i => Row(i.toString, (i * 2).toString)))
+  val schema = buildSchema(field("a1"), field("a2"))
+  val df = spark.createDataFrame(data, schema)
+
+  df.coalesce(4)
+.write
+.mode(SaveMode.Overwrite)
+.format(format)
+.option("compression", "gZiP")
+.option("rowTag", "row")
+.save(dir.getCanonicalPath)
+
+  withCorruptedFile(dir) { corruptedDir =>
+withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+  val dfCorrupted = spark.read
+.format(format)
+.option("multiline", "true")
+.option("compression", "gzip")
+.option("rowTag", "row")
+.load(corruptedDir.getCanonicalPath)
+  assert(!dfCorrupted.isEmpty)

Review Comment:
   ```suggestion
 assert(dfCorrupted.collect().length > 100)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For 

Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-06 Thread via GitHub


shujingyang-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1418019453


##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##
@@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession 
{
 )
 testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> 
"true"))
   }
+
+  test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") {
+withCorruptFile(inputFile => {

Review Comment:
   Added this scenario in the test case: "SPARK-46248: Read from a corrupted 
compressed file"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-04 Thread via GitHub


shujingyang-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1414754388


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala:
##
@@ -120,9 +136,27 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: 
Boolean)
   val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser)
   Some(inferObject(parser, rootAttributes))
 } catch {
-  case NonFatal(_) if options.parseMode == PermissiveMode =>
-Some(StructType(Seq(StructField(options.columnNameOfCorruptRecord, 
StringType
-  case NonFatal(_) =>
+  case e @ (_: XMLStreamException | _: MalformedInputException | _: 
SAXException) =>
+handleXmlErrorsByParseMode(options.parseMode, 
options.columnNameOfCorruptRecord, e)

Review Comment:
   yeah, we guard it by NonFatal errors at the end of this code block. Does it 
look good to you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46248][SQL] XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-04 Thread via GitHub


shujingyang-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1414754388


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala:
##
@@ -120,9 +136,27 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: 
Boolean)
   val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser)
   Some(inferObject(parser, rootAttributes))
 } catch {
-  case NonFatal(_) if options.parseMode == PermissiveMode =>
-Some(StructType(Seq(StructField(options.columnNameOfCorruptRecord, 
StringType
-  case NonFatal(_) =>
+  case e @ (_: XMLStreamException | _: MalformedInputException | _: 
SAXException) =>
+handleXmlErrorsByParseMode(options.parseMode, 
options.columnNameOfCorruptRecord, e)

Review Comment:
   yeah, we guard it by NonFatal errors at the end of this code block. Does it 
look good to you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org