cloud-fan commented on code in PR #56572:
URL: https://github.com/apache/spark/pull/56572#discussion_r3432124504


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala:
##########
@@ -116,14 +120,25 @@ case class XmlFileFormat() extends TextBasedFileFormat 
with DataSourceRegister {
     }
 
     (file: PartitionedFile) => {
-      val parser = new StaxXmlParser(
+      def parser() = new StaxXmlParser(

Review Comment:
   The parser-factory change (`() => StaxXmlParser` / `() => JacksonParser`, 
fresh parser per entry) looks unnecessary. On master `readArchive` took a 
single shared parser and reused it for every entry, and that shipped via 
SPARK-57419. `StaxXmlParser` holds no per-stream mutable state — only `lazy 
val`/`val` parsing utilities and a reusable `val parse` ("intentionally a val 
to create a function once and reuse") — and 
`parseStream`/`parseStreamOptimized` build their reader and `FailureSafeParser` 
locally per call without mutating `this`; the same is true of `JacksonParser`. 
So a shared parser is safe across entries (just as one parser already parses 
many records within a single file), and the factory just adds a per-entry 
allocation and a `() => parser()` double-indirection here.
   
   This is the XML side of @HyukjinKwon's comment on `JsonFileFormat.scala`. 
One nuance for that thread: a plain `lazy val parser` would *not* preserve the 
stated "fresh per entry" semantics — the `() => parser` closure would hand the 
single memoized instance to every entry — so if you take the simplification, 
the clean form is to revert `readArchive` to a shared parser (as on master) 
rather than change `def` to `lazy val`. Non-blocking; the current code is 
correct either way.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala:
##########
@@ -185,6 +319,27 @@ object MultiLineXmlDataSource extends XmlDataSource {
     }
   }
 
+  /**
+   * Parses each archive entry as a single XML document, mirroring 
[[readFile]]: the optimized
+   * parser re-reads its input (to echo the corrupt-record text on a parse 
failure), which a
+   * single-use entry stream cannot do, so the entry's bytes are buffered and 
re-opened over; the
+   * legacy parser reads the entry stream directly.
+   */
+  override def readArchive(
+      conf: Configuration,
+      file: PartitionedFile,
+      parser: () => StaxXmlParser,
+      schema: StructType): Iterator[InternalRow] =
+    ArchiveReader(file.toPath).readEntries(conf) { (_, in) =>
+      val entryParser = parser()
+      if (entryParser.options.useLegacyXMLParser) {
+        entryParser.parseStream(in, schema)
+      } else {
+        val bytes = in.readAllBytes()

Review Comment:
   The default (optimized) multi-line parser re-reads its input to echo the 
corrupt-record text on a parse failure, which a single-use archive entry stream 
can't do, so the entry is fully buffered here and re-opened over a 
`ByteArrayInputStream`. A non-archive multi-line read instead streams from disk 
and re-opens the file, and the legacy-parser branch above streams the entry 
directly without buffering — so this is the one read path that holds a whole 
entry in memory at once.
   
   This is inherent to single-use entry streams + the optimized parser's 
re-read requirement, so likely an accepted trade-off, but worth confirming it's 
fine for a large single XML document packed in an archive, and maybe noting the 
limitation in the `readArchive` Scaladoc. Non-blocking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to