cloud-fan commented on code in PR #56480:
URL: https://github.com/apache/spark/pull/56480#discussion_r3415881663
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala:
##########
@@ -75,6 +124,80 @@ abstract class JsonDataSource extends Serializable {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType
+
+ /**
+ * Infers a JSON schema when at least one input is a tar archive. Every
archive entry (streamed
+ * via `ArchiveReader`, never unpacked to disk) and every loose file is read
as JSON records --
+ * each line is a record, or the whole input is one document in multi-line
mode -- and all of them
+ * feed a single [[JsonInferSchema]] pass, exactly as a directory of the
same files would infer.
+ * Because [[JsonInferSchema]] already merges every record's type by field
name across all inputs,
+ * one pass is itself the union: a field empty in one input but typed in
another widens to the
+ * real type, and a `NullType` field survives to the single final
canonicalization rather than
+ * being collapsed per-input. A corrupt/missing input is skipped as a unit
(a whole archive or a
+ * whole file) when `ignoreCorruptFiles`/`ignoreMissingFiles` are set.
+ */
+ private def inferWithArchives(
+ sparkSession: SparkSession,
+ inputPaths: Seq[FileStatus],
+ parsedOptions: JSONOptions): StructType = {
+ val baseRdd = JsonDataSource.createBaseRdd(sparkSession, inputPaths,
parsedOptions)
+ val multiLine = parsedOptions.multiLine
+ val lineSeparator = parsedOptions.lineSeparatorInRead
+ val encoding = parsedOptions.encoding
+ val ignoreCorruptFiles = parsedOptions.ignoreCorruptFiles
+ val ignoreMissingFiles = parsedOptions.ignoreMissingFiles
+
+ // Applies `perEntry` to each input -- once per archive entry, once for a
loose file -- skipping
+ // a whole input on corrupt/missing input when the ignore flags are set.
The entry/file stream
Review Comment:
Repeated "input" reads awkwardly.
```suggestion
// a whole input when it is corrupt/missing and the ignore flags are
set. The entry/file stream
```
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala:
##########
@@ -75,6 +124,80 @@ abstract class JsonDataSource extends Serializable {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType
+
+ /**
+ * Infers a JSON schema when at least one input is a tar archive. Every
archive entry (streamed
+ * via `ArchiveReader`, never unpacked to disk) and every loose file is read
as JSON records --
+ * each line is a record, or the whole input is one document in multi-line
mode -- and all of them
+ * feed a single [[JsonInferSchema]] pass, exactly as a directory of the
same files would infer.
+ * Because [[JsonInferSchema]] already merges every record's type by field
name across all inputs,
+ * one pass is itself the union: a field empty in one input but typed in
another widens to the
+ * real type, and a `NullType` field survives to the single final
canonicalization rather than
+ * being collapsed per-input. A corrupt/missing input is skipped as a unit
(a whole archive or a
+ * whole file) when `ignoreCorruptFiles`/`ignoreMissingFiles` are set.
+ */
+ private def inferWithArchives(
+ sparkSession: SparkSession,
+ inputPaths: Seq[FileStatus],
+ parsedOptions: JSONOptions): StructType = {
+ val baseRdd = JsonDataSource.createBaseRdd(sparkSession, inputPaths,
parsedOptions)
+ val multiLine = parsedOptions.multiLine
+ val lineSeparator = parsedOptions.lineSeparatorInRead
+ val encoding = parsedOptions.encoding
+ val ignoreCorruptFiles = parsedOptions.ignoreCorruptFiles
+ val ignoreMissingFiles = parsedOptions.ignoreMissingFiles
+
+ // Applies `perEntry` to each input -- once per archive entry, once for a
loose file -- skipping
+ // a whole input on corrupt/missing input when the ignore flags are set.
The entry/file stream
+ // is consumed lazily by `perEntry`, never buffered whole; mirrors CSV's
`inferWithArchives`.
+ def perInput[T: ClassTag](perEntry: InputStream => Iterator[T]): RDD[T] =
baseRdd.flatMap {
+ stream =>
+ val path = new Path(stream.getPath())
+ try {
+ if (ArchiveReader.isArchivePath(path)) {
+ ArchiveReader(path).readEntries(stream.getConfiguration) { (_, in)
=> perEntry(in) }
+ } else {
+ perEntry(
+
CodecStreams.createInputStreamWithCloseResource(stream.getConfiguration, path))
+ }
+ } catch {
+ case e: FileNotFoundException if ignoreMissingFiles =>
+ logWarning(log"Skipped missing input: ${MDC(PATH,
stream.getPath())}", e)
+ Iterator.empty
+ case e: FileNotFoundException => throw e
+ case e @ (_: RuntimeException | _: IOException) if
ignoreCorruptFiles =>
+ logWarning(log"Skipped the corrupted input: ${MDC(PATH,
stream.getPath())}", e)
+ Iterator.empty
+ case NonFatal(e) =>
+ throw QueryExecutionErrors.cannotReadFilesError(
+ e, SparkPath.fromPathString(stream.getPath()).urlEncoded)
+ }
+ }
+
+ SQLExecution.withSQLConfPropagated(sparkSession) {
+ val inferSchema = new JsonInferSchema(parsedOptions)
+ if (multiLine) {
+ // Each input/entry is one JSON document: hand its stream straight to
the parser
+ // (`CreateJacksonParser.inputStream`, matching
MultiLineJsonDataSource and its charset
+ // auto-detect) so the document is parsed incrementally rather than
buffered.
+ val docs = perInput(in => Iterator.single(in))
Review Comment:
These `RDD[InputStream]` elements are live `CloseShieldInputStream` views
over the single shared `TarArchiveInputStream` cursor — each valid only until
`readEntries` advances to the next entry (`getNextEntry` skips the prior
entry's unread bytes). This is correct today only because
`JsonInferSchema.infer` fully consumes each stream (`flatMap {
tryWithResource(parse) }.reduceOption`) before pulling the next, and
`inferWithArchives` is its only consumer.
The line-delimited sibling (`RDD[Array[Byte]]`) and the CSV analogue
(`RDD[Array[String]]`) both materialize their records, so they don't carry this
constraint. If `infer`'s per-partition consumption ever changes to buffer /
look ahead / parallelize, this path would silently read from an advanced cursor
and infer a wrong schema (no exception; the one-entry fixtures here can't
observe it). Non-blocking, and the streaming is a worthwhile memory
optimization — but worth a line in the `perInput` comment documenting that the
consumer must fully consume each stream before the iterator advances.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]