cloud-fan commented on code in PR #56572: URL: https://github.com/apache/spark/pull/56572#discussion_r3432290450
########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/XMLArchiveReadBase.scala: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.nio.charset.StandardCharsets +import java.nio.file.Files + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.types.StringType +import org.apache.spark.util.Utils + +/** + * Binds [[ArchiveReadSuiteBase]]'s file-format hooks to XML. XML opts into the shared + * schema-inference and complex-type tests (see `supportsSchemaInference`/`supportsComplexTypes`), + * and adds the XML-specific tests with no format-agnostic analogue: multi-line records, + * element-union across entries, and attributes. Records are delimited by a `rowTag` (here `row`), + * so a single `rowTag` is used for both writing and reading. Reusable across archive formats: a + * `XML<Container>ArchiveReadSuite` mixes this in alongside the archive-format trait. + */ +trait XMLArchiveReadBase extends ArchiveReadSuiteBase { + + private val rowTag = "row" + + override protected def format: String = "xml" + + override protected def fileExtension: String = "xml" + + override protected def readOptions: Map[String, String] = Map("rowTag" -> rowTag) + + override protected def readSchema: String = "id INT, name STRING" + + // XML infers from record content, unions fields across inputs by name, and represents nested + // elements as structs, so it keeps all three `supports*` defaults (inference, schema-merge, + // complex types) and runs the full shared test set. Inference needs no trigger option, so + // `inferenceOptions` keeps its empty default. + + override protected def encodeFile( + df: DataFrame, + writeOptions: Map[String, String]): Array[Byte] = { + val dir = Utils.createTempDir(namePrefix = "archive-test-encode") + try { + df.coalesce(1).write.format("xml") + .options(Map("rowTag" -> rowTag) ++ writeOptions) + .mode("overwrite").save(dir.getCanonicalPath) + val parts = dir.listFiles().filter { f => + f.isFile && !f.getName.startsWith("_") && !f.getName.startsWith(".") && + !f.getName.endsWith(".crc") + } + assert(parts.length == 1, + s"expected exactly one data file, got: ${parts.map(_.getName).toList}") + Files.readAllBytes(parts.head.toPath) + } finally Utils.deleteRecursively(dir) + } + + /** Raw XML bytes, for tests that need precise control over the record layout. */ + protected def xmlBytes(s: String): Array[Byte] = s.getBytes(StandardCharsets.UTF_8) + + // ----- XML-specific tests -------------------------------------------------- + + test("XML: records spanning multiple lines match a directory read") { + assertArchiveMatchesDir( + Seq( + "a.xml" -> xmlBytes( + "<rows>\n <row>\n <id>1</id>\n <name>Alice</name>\n </row>\n</rows>\n"), + "b.xml" -> xmlBytes( + "<rows>\n <row>\n <id>2</id>\n <name>Bob</name>\n </row>\n</rows>\n"))) + } + + test("XML: attributes match a directory read") { + assertArchiveMatchesDir( + Seq( + "a.xml" -> xmlBytes("<rows><row id=\"1\"><name>Alice</name></row></rows>"), + "b.xml" -> xmlBytes("<rows><row id=\"2\"><name>Bob</name></row></rows>")), + schema = "_id INT, name STRING") + } + + test("XML: inference widens a null archive field against a typed loose file like a directory") { + // `c` is empty (NullType) in the archive entry and an integer in the loose file. A single + // inference pass widens `c` to the integer type, exactly as a directory read does. Inferring + // the archive and the loose file separately would canonicalize the archive's `c` to string + // first, then merge to string -- diverging from the directory read. + val inArchive = xmlBytes("<rows><row><id>1</id><c></c></row></rows>") + val loose = xmlBytes("<rows><row><id>2</id><c>5</c></row></rows>") + withTempDir { dir => + writeArchive( + new File(dir, s"data.${archiveExtensions.head}"), Seq(entryName(0) -> inArchive)) + Files.write(new File(dir, s"loose.$fileExtension").toPath, loose) + val schema = inferredSchema(Seq(dir.getCanonicalPath)) + assert(schema.find(_.name == "c").exists(_.dataType != StringType), + s"expected `c` to widen to its real type, not collapse to string; got $schema") + withTempDir { looseDir => + Files.write(new File(looseDir, entryName(0)).toPath, inArchive) + Files.write(new File(looseDir, s"loose.$fileExtension").toPath, loose) + assert(schema == inferredSchema(Seq(looseDir.getCanonicalPath)), + s"archive+loose inference diverged from a directory read; got $schema") + } + } + } + + test("XML: single-line mode reads and infers an archive like a directory") { Review Comment: Non-blocking: no test exercises a malformed XML *record* through the archive read paths. The single-line `readArchive` override wires its own `FailureSafeParser` specifically for per-record corrupt-record handling, and the multi-line path buffers bytes to echo the corrupt record — but the shared `ArchiveReadSuiteBase` only covers corrupt archive *files* (file-granular), not corrupt records. The JSON peer has a direct analogue (`JSONArchiveReadBase`: "a malformed record in an archive entry matches a directory read (both modes)"). Consider adding the XML equivalent: a malformed single-line record and a malformed whole document in `multiLine`, each asserted against a directory read with a `_corrupt_record` column. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala: ########## @@ -59,6 +59,24 @@ abstract class XmlDataSource extends Serializable with Logging { parser: StaxXmlParser, schema: StructType): Iterator[InternalRow] + /** + * Streams a tar archive (`.tar`/`.tar.gz`/`.tgz`) entry by entry through the XML parser without + * unpacking it to disk. The whole archive is a single split (see `XmlFileFormat.isSplitable`); + * each entry's bytes are parsed exactly like a standalone XML file. Single-line and multi-line + * parse an entry's bytes differently (mirroring [[readFile]]), so each data source overrides it. + * + * Kept separate from [[readFile]] (rather than dispatched inside it) because only the V1 Review Comment: Non-blocking: the JSON archive support you're porting keeps `readArchive` concrete in the base — it calls an abstract `readStream(in, parser(), schema)` that each mode overrides, so the `ArchiveReader.readEntries` wiring lives in one place. Here `readArchive` is left abstract and both modes (`TextInputXmlDataSource` :239, `MultiLineXmlDataSource` :332) re-implement the `ArchiveReader(file.toPath).readEntries(conf) { ... }` wrapper, differing only in the per-entry body. Mirroring JSON — a concrete base `readArchive` over an abstract `readStream` (single-line `readStream` = lines + `FailureSafeParser`; multi-line `readStream` = the legacy/optimized branch) — would centralize that wiring and match the peer. Behavior is identical either way, so this is purely a maintainability/consistency call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
