sandip-db commented on code in PR #51287: URL: https://github.com/apache/spark/pull/51287#discussion_r2186202853
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala: ########## @@ -186,6 +174,94 @@ class StaxXmlParser( } } + /** + * The optimized version of the XML stream parser that reads XML records from the input file + * stream sequentially without loading each individual XML record string into memory. + */ + def parseStreamOptimized( + inputStream: InputStream, + schema: StructType, + streamLiteral: () => UTF8String): Iterator[InternalRow] = { + // XSD validation would require converting to string first, which defeats the purpose + // For now, skip XSD validation in the optimized parsing mode to maintain memory efficiency + if (Option(options.rowValidationXSDPath).isDefined) { + logWarning("XSD validation is not supported in streaming mode and will be skipped") + } + val safeParser = new FailureSafeParser[XMLEventReader]( + input => { + // The first event is guaranteed to be a StartElement, so we can read attributes from it + // without using StaxXmlParserUtils.skipUntil. + val attributes = input.nextEvent().asStartElement().getAttributes.asScala.toArray Review Comment: Move this inside `doParseColumn`. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala: ########## @@ -175,15 +179,34 @@ object MultiLineXmlDataSource extends XmlDataSource { file: PartitionedFile, parser: StaxXmlParser, requiredSchema: StructType): Iterator[InternalRow] = { - parser.parseStream( - CodecStreams.createInputStreamWithCloseResource(conf, file.toPath), - requiredSchema) + if (SQLConf.get.enableOptimizedXmlParser) { + parser.parseStreamOptimized( + CodecStreams.createInputStreamWithCloseResource(conf, file.toPath), + requiredSchema, + () => { + Utils.tryWithResource( + CodecStreams.createInputStreamWithCloseResource(conf, file.toPath) + ) { is => + UTF8String.fromBytes(ByteStreams.toByteArray(is)) + } + } + ) + } else { + parser.parseStream( + CodecStreams.createInputStreamWithCloseResource(conf, file.toPath), + requiredSchema + ) + } } override def infer( sparkSession: SparkSession, inputPaths: Seq[FileStatus], parsedOptions: XmlOptions): StructType = { + if (SQLConf.get.enableOptimizedXmlParser) { + return inferOptimized(sparkSession, inputPaths, parsedOptions) Review Comment: Lets just support this instead of two different implementations ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParserUtils.scala: ########## @@ -35,26 +36,33 @@ object StaxXmlParserUtils { factory } + val filter = new EventFilter { + override def accept(event: XMLEvent): Boolean = + event.getEventType match { + // Ignore comments and processing instructions + case XMLStreamConstants.COMMENT | XMLStreamConstants.PROCESSING_INSTRUCTION => false + // unsupported events + case XMLStreamConstants.DTD | + XMLStreamConstants.ENTITY_DECLARATION | + XMLStreamConstants.ENTITY_REFERENCE | + XMLStreamConstants.NOTATION_DECLARATION => false + case _ => true + } + } + def filteredReader(xml: String): XMLEventReader = { - val filter = new EventFilter { - override def accept(event: XMLEvent): Boolean = - event.getEventType match { - // Ignore comments and processing instructions - case XMLStreamConstants.COMMENT | XMLStreamConstants.PROCESSING_INSTRUCTION => false - // unsupported events - case XMLStreamConstants.DTD | - XMLStreamConstants.ENTITY_DECLARATION | - XMLStreamConstants.ENTITY_REFERENCE | - XMLStreamConstants.NOTATION_DECLARATION => false - case _ => true - } - } // It does not have to skip for white space, since `XmlInputFormat` // always finds the root tag without a heading space. val eventReader = factory.createXMLEventReader(new StringReader(xml)) factory.createFilteredReader(eventReader, filter) } + def filteredReader(inputStream: java.io.InputStream, options: XmlOptions): XMLEventReader = { + val inputStreamReader = new InputStreamReader(inputStream, Charset.forName(options.charset)) + val eventReader = factory.createXMLEventReader(inputStreamReader) Review Comment: Any issue with this? ```suggestion val eventReader = factory.createXMLEventReader(inputStream, Charset.forName(options.charset)) ``` ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/OptimizedXMLParserSuite.scala: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.xml + +import java.nio.charset.StandardCharsets +import java.nio.file.Files + +import org.apache.spark.SparkException +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.util.{DropMalformedMode, PermissiveMode} +import org.apache.spark.sql.functions.{col, variant_get} +import org.apache.spark.sql.internal.SQLConf + +//import org.scalactic.source.Position +//import org.scalatest.Tag + +class OptimizedXMLParserSuite extends XmlSuite with XmlVariantTests with XmlInferSchemaTests { + override protected def sparkConf = { + val conf = super.sparkConf + conf.set(SQLConf.ENABLE_OPTIMIZED_XML_PARSER.key, "true") + conf + } + +// override protected def test(testName: String, testTags: Tag*)(testFun: => Any)( +// implicit pos: Position): Unit = { +// if (testName == "DSL test for permissive mode for corrupt records - optimized XML parser") { +// super.test(testName, testTags: _*)(testFun) +// } +// } + + override def excluded: Seq[String] = super.excluded ++ Seq( + // XSD validation is not supported in optimized XML parser + "test XSD validation with validation error", + "test XSD validation with addFile() with validation error", + "DSL: test XSD validation", + // Malformed recording handling is slightly different in optimized XML parser + "DSL test for parsing a malformed XML file", + "DSL test for permissive mode for corrupt records", + "DSL test with malformed attributes", + "DSL test for dropping malformed rows", + "DSL: handle malformed record in singleVariantColumn mode", + // No valid row will be found in `unclosed_tag.xml` by the OptimizedXMLTokenizer + "test FAILFAST with unclosed tag", Review Comment: Lets update some of these tests to have some valid rowTags in the beginning and a corrupt one towards the end. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala: ########## @@ -175,15 +179,34 @@ object MultiLineXmlDataSource extends XmlDataSource { file: PartitionedFile, parser: StaxXmlParser, requiredSchema: StructType): Iterator[InternalRow] = { - parser.parseStream( - CodecStreams.createInputStreamWithCloseResource(conf, file.toPath), - requiredSchema) + if (SQLConf.get.enableOptimizedXmlParser) { + parser.parseStreamOptimized( + CodecStreams.createInputStreamWithCloseResource(conf, file.toPath), + requiredSchema, + () => { + Utils.tryWithResource( + CodecStreams.createInputStreamWithCloseResource(conf, file.toPath) + ) { is => + UTF8String.fromBytes(ByteStreams.toByteArray(is)) Review Comment: This may hit java byte array limit that this PR is trying to address. Limit it to 1GB. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/OptimizedXMLParserSuite.scala: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.xml + +import java.nio.charset.StandardCharsets +import java.nio.file.Files + +import org.apache.spark.SparkException +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.util.{DropMalformedMode, PermissiveMode} +import org.apache.spark.sql.functions.{col, variant_get} +import org.apache.spark.sql.internal.SQLConf + +//import org.scalactic.source.Position +//import org.scalatest.Tag + +class OptimizedXMLParserSuite extends XmlSuite with XmlVariantTests with XmlInferSchemaTests { + override protected def sparkConf = { + val conf = super.sparkConf + conf.set(SQLConf.ENABLE_OPTIMIZED_XML_PARSER.key, "true") + conf + } + +// override protected def test(testName: String, testTags: Tag*)(testFun: => Any)( +// implicit pos: Position): Unit = { +// if (testName == "DSL test for permissive mode for corrupt records - optimized XML parser") { +// super.test(testName, testTags: _*)(testFun) +// } +// } + + override def excluded: Seq[String] = super.excluded ++ Seq( + // XSD validation is not supported in optimized XML parser + "test XSD validation with validation error", + "test XSD validation with addFile() with validation error", + "DSL: test XSD validation", + // Malformed recording handling is slightly different in optimized XML parser + "DSL test for parsing a malformed XML file", + "DSL test for permissive mode for corrupt records", + "DSL test with malformed attributes", + "DSL test for dropping malformed rows", + "DSL: handle malformed record in singleVariantColumn mode", + // No valid row will be found in `unclosed_tag.xml` by the OptimizedXMLTokenizer + "test FAILFAST with unclosed tag", + // The file `fias_house.xml` can't be directly read by the XMLEventReader in the + // optimized XML parser Review Comment: Why? ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/OptimizedXMLParserSuite.scala: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.xml + +import java.nio.charset.StandardCharsets +import java.nio.file.Files + +import org.apache.spark.SparkException +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.util.{DropMalformedMode, PermissiveMode} +import org.apache.spark.sql.functions.{col, variant_get} +import org.apache.spark.sql.internal.SQLConf + +//import org.scalactic.source.Position +//import org.scalatest.Tag + +class OptimizedXMLParserSuite extends XmlSuite with XmlVariantTests with XmlInferSchemaTests { + override protected def sparkConf = { + val conf = super.sparkConf + conf.set(SQLConf.ENABLE_OPTIMIZED_XML_PARSER.key, "true") + conf + } + +// override protected def test(testName: String, testTags: Tag*)(testFun: => Any)( +// implicit pos: Position): Unit = { +// if (testName == "DSL test for permissive mode for corrupt records - optimized XML parser") { +// super.test(testName, testTags: _*)(testFun) +// } +// } + + override def excluded: Seq[String] = super.excluded ++ Seq( + // XSD validation is not supported in optimized XML parser + "test XSD validation with validation error", + "test XSD validation with addFile() with validation error", + "DSL: test XSD validation", Review Comment: Add support for XSD validation by creating a second reader. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala: ########## @@ -175,9 +178,24 @@ object MultiLineXmlDataSource extends XmlDataSource { file: PartitionedFile, parser: StaxXmlParser, requiredSchema: StructType): Iterator[InternalRow] = { - parser.parseStream( - CodecStreams.createInputStreamWithCloseResource(conf, file.toPath), - requiredSchema) + if (SQLConf.get.enableOptimizedXmlParser) { Review Comment: Even for the old implementation to scavenge all valid records deterministically. The new optimized version will fail at the first corrupt record, but return all the valid ones before it. I suggest adding support for XSD in the new version and remove the old tokenizer. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala: ########## @@ -156,31 +145,104 @@ class StaxXmlParser( parser.close() result } + } + } + + /** + * The optimized version of the XML stream parser that reads XML records from the input file + * stream sequentially without loading each individual XML record string into memory. + */ + def parseStreamOptimized( + inputStream: InputStream, + schema: StructType, + streamLiteral: () => UTF8String): Iterator[InternalRow] = { + // XSD validation would require converting to string first, which defeats the purpose + // For now, skip XSD validation in the optimized parsing mode to maintain memory efficiency + if (Option(options.rowValidationXSDPath).isDefined) { + logWarning("XSD validation is not supported in streaming mode and will be skipped") + } Review Comment: `Validator::validate` function can take as input a `StreamSource` or `StAXSource`. Either one can be extended to pass individual `rowTag` element to the `validate` function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org