cloud-fan commented on code in PR #56572:
URL: https://github.com/apache/spark/pull/56572#discussion_r3432290450


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/XMLArchiveReadBase.scala:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.util.Utils
+
+/**
+ * Binds [[ArchiveReadSuiteBase]]'s file-format hooks to XML. XML opts into 
the shared
+ * schema-inference and complex-type tests (see 
`supportsSchemaInference`/`supportsComplexTypes`),
+ * and adds the XML-specific tests with no format-agnostic analogue: 
multi-line records,
+ * element-union across entries, and attributes. Records are delimited by a 
`rowTag` (here `row`),
+ * so a single `rowTag` is used for both writing and reading. Reusable across 
archive formats: a
+ * `XML<Container>ArchiveReadSuite` mixes this in alongside the archive-format 
trait.
+ */
+trait XMLArchiveReadBase extends ArchiveReadSuiteBase {
+
+  private val rowTag = "row"
+
+  override protected def format: String = "xml"
+
+  override protected def fileExtension: String = "xml"
+
+  override protected def readOptions: Map[String, String] = Map("rowTag" -> 
rowTag)
+
+  override protected def readSchema: String = "id INT, name STRING"
+
+  // XML infers from record content, unions fields across inputs by name, and 
represents nested
+  // elements as structs, so it keeps all three `supports*` defaults 
(inference, schema-merge,
+  // complex types) and runs the full shared test set. Inference needs no 
trigger option, so
+  // `inferenceOptions` keeps its empty default.
+
+  override protected def encodeFile(
+      df: DataFrame,
+      writeOptions: Map[String, String]): Array[Byte] = {
+    val dir = Utils.createTempDir(namePrefix = "archive-test-encode")
+    try {
+      df.coalesce(1).write.format("xml")
+        .options(Map("rowTag" -> rowTag) ++ writeOptions)
+        .mode("overwrite").save(dir.getCanonicalPath)
+      val parts = dir.listFiles().filter { f =>
+        f.isFile && !f.getName.startsWith("_") && !f.getName.startsWith(".") &&
+          !f.getName.endsWith(".crc")
+      }
+      assert(parts.length == 1,
+        s"expected exactly one data file, got: ${parts.map(_.getName).toList}")
+      Files.readAllBytes(parts.head.toPath)
+    } finally Utils.deleteRecursively(dir)
+  }
+
+  /** Raw XML bytes, for tests that need precise control over the record 
layout. */
+  protected def xmlBytes(s: String): Array[Byte] = 
s.getBytes(StandardCharsets.UTF_8)
+
+  // ----- XML-specific tests 
--------------------------------------------------
+
+  test("XML: records spanning multiple lines match a directory read") {
+    assertArchiveMatchesDir(
+      Seq(
+        "a.xml" -> xmlBytes(
+          "<rows>\n  <row>\n    <id>1</id>\n    <name>Alice</name>\n  
</row>\n</rows>\n"),
+        "b.xml" -> xmlBytes(
+          "<rows>\n  <row>\n    <id>2</id>\n    <name>Bob</name>\n  
</row>\n</rows>\n")))
+  }
+
+  test("XML: attributes match a directory read") {
+    assertArchiveMatchesDir(
+      Seq(
+        "a.xml" -> xmlBytes("<rows><row 
id=\"1\"><name>Alice</name></row></rows>"),
+        "b.xml" -> xmlBytes("<rows><row 
id=\"2\"><name>Bob</name></row></rows>")),
+      schema = "_id INT, name STRING")
+  }
+
+  test("XML: inference widens a null archive field against a typed loose file 
like a directory") {
+    // `c` is empty (NullType) in the archive entry and an integer in the 
loose file. A single
+    // inference pass widens `c` to the integer type, exactly as a directory 
read does. Inferring
+    // the archive and the loose file separately would canonicalize the 
archive's `c` to string
+    // first, then merge to string -- diverging from the directory read.
+    val inArchive = xmlBytes("<rows><row><id>1</id><c></c></row></rows>")
+    val loose = xmlBytes("<rows><row><id>2</id><c>5</c></row></rows>")
+    withTempDir { dir =>
+      writeArchive(
+        new File(dir, s"data.${archiveExtensions.head}"), Seq(entryName(0) -> 
inArchive))
+      Files.write(new File(dir, s"loose.$fileExtension").toPath, loose)
+      val schema = inferredSchema(Seq(dir.getCanonicalPath))
+      assert(schema.find(_.name == "c").exists(_.dataType != StringType),
+        s"expected `c` to widen to its real type, not collapse to string; got 
$schema")
+      withTempDir { looseDir =>
+        Files.write(new File(looseDir, entryName(0)).toPath, inArchive)
+        Files.write(new File(looseDir, s"loose.$fileExtension").toPath, loose)
+        assert(schema == inferredSchema(Seq(looseDir.getCanonicalPath)),
+          s"archive+loose inference diverged from a directory read; got 
$schema")
+      }
+    }
+  }
+
+  test("XML: single-line mode reads and infers an archive like a directory") {

Review Comment:
   Non-blocking: no test exercises a malformed XML *record* through the archive 
read paths. The single-line `readArchive` override wires its own 
`FailureSafeParser` specifically for per-record corrupt-record handling, and 
the multi-line path buffers bytes to echo the corrupt record — but the shared 
`ArchiveReadSuiteBase` only covers corrupt archive *files* (file-granular), not 
corrupt records.
   
   The JSON peer has a direct analogue (`JSONArchiveReadBase`: "a malformed 
record in an archive entry matches a directory read (both modes)"). Consider 
adding the XML equivalent: a malformed single-line record and a malformed whole 
document in `multiLine`, each asserted against a directory read with a 
`_corrupt_record` column.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala:
##########
@@ -59,6 +59,24 @@ abstract class XmlDataSource extends Serializable with 
Logging {
       parser: StaxXmlParser,
       schema: StructType): Iterator[InternalRow]
 
+  /**
+   * Streams a tar archive (`.tar`/`.tar.gz`/`.tgz`) entry by entry through 
the XML parser without
+   * unpacking it to disk. The whole archive is a single split (see 
`XmlFileFormat.isSplitable`);
+   * each entry's bytes are parsed exactly like a standalone XML file. 
Single-line and multi-line
+   * parse an entry's bytes differently (mirroring [[readFile]]), so each data 
source overrides it.
+   *
+   * Kept separate from [[readFile]] (rather than dispatched inside it) 
because only the V1

Review Comment:
   Non-blocking: the JSON archive support you're porting keeps `readArchive` 
concrete in the base — it calls an abstract `readStream(in, parser(), schema)` 
that each mode overrides, so the `ArchiveReader.readEntries` wiring lives in 
one place. Here `readArchive` is left abstract and both modes 
(`TextInputXmlDataSource` :239, `MultiLineXmlDataSource` :332) re-implement the 
`ArchiveReader(file.toPath).readEntries(conf) { ... }` wrapper, differing only 
in the per-entry body.
   
   Mirroring JSON — a concrete base `readArchive` over an abstract `readStream` 
(single-line `readStream` = lines + `FailureSafeParser`; multi-line 
`readStream` = the legacy/optimized branch) — would centralize that wiring and 
match the peer. Behavior is identical either way, so this is purely a 
maintainability/consistency call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to