cloud-fan commented on code in PR #56572:
URL: https://github.com/apache/spark/pull/56572#discussion_r3432124504
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala:
##########
@@ -116,14 +120,25 @@ case class XmlFileFormat() extends TextBasedFileFormat
with DataSourceRegister {
}
(file: PartitionedFile) => {
- val parser = new StaxXmlParser(
+ def parser() = new StaxXmlParser(
Review Comment:
The parser-factory change (`() => StaxXmlParser` / `() => JacksonParser`,
fresh parser per entry) looks unnecessary. On master `readArchive` took a
single shared parser and reused it for every entry, and that shipped via
SPARK-57419. `StaxXmlParser` holds no per-stream mutable state — only `lazy
val`/`val` parsing utilities and a reusable `val parse` ("intentionally a val
to create a function once and reuse") — and
`parseStream`/`parseStreamOptimized` build their reader and `FailureSafeParser`
locally per call without mutating `this`; the same is true of `JacksonParser`.
So a shared parser is safe across entries (just as one parser already parses
many records within a single file), and the factory just adds a per-entry
allocation and a `() => parser()` double-indirection here.
This is the XML side of @HyukjinKwon's comment on `JsonFileFormat.scala`.
One nuance for that thread: a plain `lazy val parser` would *not* preserve the
stated "fresh per entry" semantics — the `() => parser` closure would hand the
single memoized instance to every entry — so if you take the simplification,
the clean form is to revert `readArchive` to a shared parser (as on master)
rather than change `def` to `lazy val`. Non-blocking; the current code is
correct either way.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala:
##########
@@ -185,6 +319,27 @@ object MultiLineXmlDataSource extends XmlDataSource {
}
}
+ /**
+ * Parses each archive entry as a single XML document, mirroring
[[readFile]]: the optimized
+ * parser re-reads its input (to echo the corrupt-record text on a parse
failure), which a
+ * single-use entry stream cannot do, so the entry's bytes are buffered and
re-opened over; the
+ * legacy parser reads the entry stream directly.
+ */
+ override def readArchive(
+ conf: Configuration,
+ file: PartitionedFile,
+ parser: () => StaxXmlParser,
+ schema: StructType): Iterator[InternalRow] =
+ ArchiveReader(file.toPath).readEntries(conf) { (_, in) =>
+ val entryParser = parser()
+ if (entryParser.options.useLegacyXMLParser) {
+ entryParser.parseStream(in, schema)
+ } else {
+ val bytes = in.readAllBytes()
Review Comment:
The default (optimized) multi-line parser re-reads its input to echo the
corrupt-record text on a parse failure, which a single-use archive entry stream
can't do, so the entry is fully buffered here and re-opened over a
`ByteArrayInputStream`. A non-archive multi-line read instead streams from disk
and re-opens the file, and the legacy-parser branch above streams the entry
directly without buffering — so this is the one read path that holds a whole
entry in memory at once.
This is inherent to single-use entry streams + the optimized parser's
re-read requirement, so likely an accepted trade-off, but worth confirming it's
fine for a large single XML document packed in an archive, and maybe noting the
limitation in the `readArchive` Scaladoc. Non-blocking.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]