Re: [PR] [SPARK-46248]XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-04 Thread via GitHub


sandip-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1414522386


##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##
@@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession 
{
 )
 testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> 
"true"))
   }
+
+  test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") {
+withCorruptFile(inputFile => {
+  withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+val e = intercept[SparkException] {
+  spark.read.option("rowTag", 
"ROW").xml(inputFile.toURI.toString).collect()
+}
+assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException])
+assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end 
of input stream")
+val e2 = intercept[SparkException] {
+  spark.read
+.option("rowTag", "ROW")
+.option("multiLine", true)
+.xml(inputFile.toURI.toString)
+.collect()
+}
+assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException])
+assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end 
of input stream")
+  }
+  withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+assert(spark.read.option("rowTag", 
"ROW").xml(inputFile.toURI.toString).collect().isEmpty)
+assert(

Review Comment:
   Not sure how this is working without `ignoreCorruptFiles` handling in 
`XMLDataSource`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46248]XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-04 Thread via GitHub


sandip-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1414588409


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala:
##
@@ -172,13 +176,27 @@ object MultiLineXmlDataSource extends XmlDataSource {
   parsedOptions: XmlOptions): StructType = {
 val xml = createBaseRdd(sparkSession, inputPaths, parsedOptions)
 
-val tokenRDD = xml.flatMap { portableDataStream =>
-  StaxXmlParser.tokenizeStream(
-CodecStreams.createInputStreamWithCloseResource(
-  portableDataStream.getConfiguration,
-  new Path(portableDataStream.getPath())),
-parsedOptions)
-}
+val tokenRDD: RDD[String] =
+  xml.flatMap { portableDataStream =>
+try {
+  StaxXmlParser.tokenizeStream(
+CodecStreams.createInputStreamWithCloseResource(
+  portableDataStream.getConfiguration,
+  new Path(portableDataStream.getPath())),
+parsedOptions)
+} catch {

Review Comment:
   Ah. got it. Thx.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46248]XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-04 Thread via GitHub


shujingyang-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1414570468


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala:
##
@@ -172,13 +176,27 @@ object MultiLineXmlDataSource extends XmlDataSource {
   parsedOptions: XmlOptions): StructType = {
 val xml = createBaseRdd(sparkSession, inputPaths, parsedOptions)
 
-val tokenRDD = xml.flatMap { portableDataStream =>
-  StaxXmlParser.tokenizeStream(
-CodecStreams.createInputStreamWithCloseResource(
-  portableDataStream.getConfiguration,
-  new Path(portableDataStream.getPath())),
-parsedOptions)
-}
+val tokenRDD: RDD[String] =
+  xml.flatMap { portableDataStream =>
+try {
+  StaxXmlParser.tokenizeStream(
+CodecStreams.createInputStreamWithCloseResource(
+  portableDataStream.getConfiguration,
+  new Path(portableDataStream.getPath())),
+parsedOptions)
+} catch {

Review Comment:
   It's in `case _: RuntimeException | _: IOException if 
parsedOptions.ignoreCorruptFiles =>`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46248]XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]

2023-12-04 Thread via GitHub


sandip-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1414443217


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala:
##
@@ -120,9 +136,27 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: 
Boolean)
   val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser)
   Some(inferObject(parser, rootAttributes))
 } catch {
-  case NonFatal(_) if options.parseMode == PermissiveMode =>
-Some(StructType(Seq(StructField(options.columnNameOfCorruptRecord, 
StringType
-  case NonFatal(_) =>
+  case e @ (_: XMLStreamException | _: MalformedInputException | _: 
SAXException) =>
+handleXmlErrorsByParseMode(options.parseMode, 
options.columnNameOfCorruptRecord, e)
+  case e: CharConversionException if options.charset.isEmpty =>
+val msg =
+  """XML parser cannot handle a character in its input.
+|Specifying encoding as an input option explicitly might help to 
resolve the issue.
+|""".stripMargin + e.getMessage
+val wrappedCharException = new CharConversionException(msg)
+wrappedCharException.initCause(e)
+handleXmlErrorsByParseMode(
+  options.parseMode,
+  options.columnNameOfCorruptRecord,
+  wrappedCharException)
+  case e: FileNotFoundException if options.ignoreMissingFiles =>
+logWarning("Skipped missing file", e)
+Some(StructType(Nil))
+  case e: FileNotFoundException if !options.ignoreMissingFiles => throw e
+  case e @ (_: IOException | _: RuntimeException) if 
options.ignoreCorruptFiles =>
+logWarning("Skipped the rest of the content in the corrupted file", e)
+Some(StructType(Nil))
+  case NonFatal(e) =>
 None

Review Comment:
   ```suggestion
   handleXmlErrorsByParseMode(options.parseMode, 
options.columnNameOfCorruptRecord, e)
   ```



##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##
@@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession 
{
 )
 testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> 
"true"))
   }
+
+  test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") {
+withCorruptFile(inputFile => {
+  withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+val e = intercept[SparkException] {
+  spark.read.option("rowTag", 
"ROW").xml(inputFile.toURI.toString).collect()
+}
+assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException])
+assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end 
of input stream")
+val e2 = intercept[SparkException] {
+  spark.read
+.option("rowTag", "ROW")
+.option("multiLine", true)
+.xml(inputFile.toURI.toString)
+.collect()
+}
+assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException])
+assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end 
of input stream")
+  }
+  withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+assert(spark.read.option("rowTag", 
"ROW").xml(inputFile.toURI.toString).collect().isEmpty)

Review Comment:
   `multiLine` is set to `true` by default for XML.
   ```suggestion
 spark.read.option("rowTag", "ROW").option("multiLine", 
false).xml(inputFile.toURI.toString).collect()
   ```



##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala:
##
@@ -172,13 +176,27 @@ object MultiLineXmlDataSource extends XmlDataSource {
   parsedOptions: XmlOptions): StructType = {
 val xml = createBaseRdd(sparkSession, inputPaths, parsedOptions)
 
-val tokenRDD = xml.flatMap { portableDataStream =>
-  StaxXmlParser.tokenizeStream(
-CodecStreams.createInputStreamWithCloseResource(
-  portableDataStream.getConfiguration,
-  new Path(portableDataStream.getPath())),
-parsedOptions)
-}
+val tokenRDD: RDD[String] =
+  xml.flatMap { portableDataStream =>
+try {
+  StaxXmlParser.tokenizeStream(
+CodecStreams.createInputStreamWithCloseResource(
+  portableDataStream.getConfiguration,
+  new Path(portableDataStream.getPath())),
+parsedOptions)
+} catch {

Review Comment:
   I don't see `ignoreCorruptFiles` handling here



##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##
@@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession 
{
 )
 testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> 
"true"))
   }
+
+  test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") {
+withCorruptFile(inputFile => {
+  withSQLConf(SQLConf.IGNORE_