Re: [PR] [SPARK-46248]XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]
sandip-db commented on code in PR #44163: URL: https://github.com/apache/spark/pull/44163#discussion_r1414522386 ## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala: ## @@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession { ) testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> "true")) } + + test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") { +withCorruptFile(inputFile => { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { +val e = intercept[SparkException] { + spark.read.option("rowTag", "ROW").xml(inputFile.toURI.toString).collect() +} +assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException]) +assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end of input stream") +val e2 = intercept[SparkException] { + spark.read +.option("rowTag", "ROW") +.option("multiLine", true) +.xml(inputFile.toURI.toString) +.collect() +} +assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException]) +assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end of input stream") + } + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { +assert(spark.read.option("rowTag", "ROW").xml(inputFile.toURI.toString).collect().isEmpty) +assert( Review Comment: Not sure how this is working without `ignoreCorruptFiles` handling in `XMLDataSource`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46248]XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]
sandip-db commented on code in PR #44163: URL: https://github.com/apache/spark/pull/44163#discussion_r1414588409 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala: ## @@ -172,13 +176,27 @@ object MultiLineXmlDataSource extends XmlDataSource { parsedOptions: XmlOptions): StructType = { val xml = createBaseRdd(sparkSession, inputPaths, parsedOptions) -val tokenRDD = xml.flatMap { portableDataStream => - StaxXmlParser.tokenizeStream( -CodecStreams.createInputStreamWithCloseResource( - portableDataStream.getConfiguration, - new Path(portableDataStream.getPath())), -parsedOptions) -} +val tokenRDD: RDD[String] = + xml.flatMap { portableDataStream => +try { + StaxXmlParser.tokenizeStream( +CodecStreams.createInputStreamWithCloseResource( + portableDataStream.getConfiguration, + new Path(portableDataStream.getPath())), +parsedOptions) +} catch { Review Comment: Ah. got it. Thx. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46248]XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]
shujingyang-db commented on code in PR #44163: URL: https://github.com/apache/spark/pull/44163#discussion_r1414570468 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala: ## @@ -172,13 +176,27 @@ object MultiLineXmlDataSource extends XmlDataSource { parsedOptions: XmlOptions): StructType = { val xml = createBaseRdd(sparkSession, inputPaths, parsedOptions) -val tokenRDD = xml.flatMap { portableDataStream => - StaxXmlParser.tokenizeStream( -CodecStreams.createInputStreamWithCloseResource( - portableDataStream.getConfiguration, - new Path(portableDataStream.getPath())), -parsedOptions) -} +val tokenRDD: RDD[String] = + xml.flatMap { portableDataStream => +try { + StaxXmlParser.tokenizeStream( +CodecStreams.createInputStreamWithCloseResource( + portableDataStream.getConfiguration, + new Path(portableDataStream.getPath())), +parsedOptions) +} catch { Review Comment: It's in `case _: RuntimeException | _: IOException if parsedOptions.ignoreCorruptFiles =>` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46248]XML: Support for ignoreCorruptFiles and ignoreMissingFiles options [spark]
sandip-db commented on code in PR #44163: URL: https://github.com/apache/spark/pull/44163#discussion_r1414443217 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala: ## @@ -120,9 +136,27 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean) val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser) Some(inferObject(parser, rootAttributes)) } catch { - case NonFatal(_) if options.parseMode == PermissiveMode => -Some(StructType(Seq(StructField(options.columnNameOfCorruptRecord, StringType - case NonFatal(_) => + case e @ (_: XMLStreamException | _: MalformedInputException | _: SAXException) => +handleXmlErrorsByParseMode(options.parseMode, options.columnNameOfCorruptRecord, e) + case e: CharConversionException if options.charset.isEmpty => +val msg = + """XML parser cannot handle a character in its input. +|Specifying encoding as an input option explicitly might help to resolve the issue. +|""".stripMargin + e.getMessage +val wrappedCharException = new CharConversionException(msg) +wrappedCharException.initCause(e) +handleXmlErrorsByParseMode( + options.parseMode, + options.columnNameOfCorruptRecord, + wrappedCharException) + case e: FileNotFoundException if options.ignoreMissingFiles => +logWarning("Skipped missing file", e) +Some(StructType(Nil)) + case e: FileNotFoundException if !options.ignoreMissingFiles => throw e + case e @ (_: IOException | _: RuntimeException) if options.ignoreCorruptFiles => +logWarning("Skipped the rest of the content in the corrupted file", e) +Some(StructType(Nil)) + case NonFatal(e) => None Review Comment: ```suggestion handleXmlErrorsByParseMode(options.parseMode, options.columnNameOfCorruptRecord, e) ``` ## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala: ## @@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession { ) testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> "true")) } + + test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") { +withCorruptFile(inputFile => { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { +val e = intercept[SparkException] { + spark.read.option("rowTag", "ROW").xml(inputFile.toURI.toString).collect() +} +assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException]) +assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end of input stream") +val e2 = intercept[SparkException] { + spark.read +.option("rowTag", "ROW") +.option("multiLine", true) +.xml(inputFile.toURI.toString) +.collect() +} +assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException]) +assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end of input stream") + } + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { +assert(spark.read.option("rowTag", "ROW").xml(inputFile.toURI.toString).collect().isEmpty) Review Comment: `multiLine` is set to `true` by default for XML. ```suggestion spark.read.option("rowTag", "ROW").option("multiLine", false).xml(inputFile.toURI.toString).collect() ``` ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala: ## @@ -172,13 +176,27 @@ object MultiLineXmlDataSource extends XmlDataSource { parsedOptions: XmlOptions): StructType = { val xml = createBaseRdd(sparkSession, inputPaths, parsedOptions) -val tokenRDD = xml.flatMap { portableDataStream => - StaxXmlParser.tokenizeStream( -CodecStreams.createInputStreamWithCloseResource( - portableDataStream.getConfiguration, - new Path(portableDataStream.getPath())), -parsedOptions) -} +val tokenRDD: RDD[String] = + xml.flatMap { portableDataStream => +try { + StaxXmlParser.tokenizeStream( +CodecStreams.createInputStreamWithCloseResource( + portableDataStream.getConfiguration, + new Path(portableDataStream.getPath())), +parsedOptions) +} catch { Review Comment: I don't see `ignoreCorruptFiles` handling here ## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala: ## @@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession { ) testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> "true")) } + + test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") { +withCorruptFile(inputFile => { + withSQLConf(SQLConf.IGNORE_