cloud-fan commented on code in PR #56480:
URL: https://github.com/apache/spark/pull/56480#discussion_r3415881663


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala:
##########
@@ -75,6 +124,80 @@ abstract class JsonDataSource extends Serializable {
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
       parsedOptions: JSONOptions): StructType
+
+  /**
+   * Infers a JSON schema when at least one input is a tar archive. Every 
archive entry (streamed
+   * via `ArchiveReader`, never unpacked to disk) and every loose file is read 
as JSON records --
+   * each line is a record, or the whole input is one document in multi-line 
mode -- and all of them
+   * feed a single [[JsonInferSchema]] pass, exactly as a directory of the 
same files would infer.
+   * Because [[JsonInferSchema]] already merges every record's type by field 
name across all inputs,
+   * one pass is itself the union: a field empty in one input but typed in 
another widens to the
+   * real type, and a `NullType` field survives to the single final 
canonicalization rather than
+   * being collapsed per-input. A corrupt/missing input is skipped as a unit 
(a whole archive or a
+   * whole file) when `ignoreCorruptFiles`/`ignoreMissingFiles` are set.
+   */
+  private def inferWithArchives(
+      sparkSession: SparkSession,
+      inputPaths: Seq[FileStatus],
+      parsedOptions: JSONOptions): StructType = {
+    val baseRdd = JsonDataSource.createBaseRdd(sparkSession, inputPaths, 
parsedOptions)
+    val multiLine = parsedOptions.multiLine
+    val lineSeparator = parsedOptions.lineSeparatorInRead
+    val encoding = parsedOptions.encoding
+    val ignoreCorruptFiles = parsedOptions.ignoreCorruptFiles
+    val ignoreMissingFiles = parsedOptions.ignoreMissingFiles
+
+    // Applies `perEntry` to each input -- once per archive entry, once for a 
loose file -- skipping
+    // a whole input on corrupt/missing input when the ignore flags are set. 
The entry/file stream

Review Comment:
   Repeated "input" reads awkwardly.
   ```suggestion
       // a whole input when it is corrupt/missing and the ignore flags are 
set. The entry/file stream
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala:
##########
@@ -75,6 +124,80 @@ abstract class JsonDataSource extends Serializable {
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
       parsedOptions: JSONOptions): StructType
+
+  /**
+   * Infers a JSON schema when at least one input is a tar archive. Every 
archive entry (streamed
+   * via `ArchiveReader`, never unpacked to disk) and every loose file is read 
as JSON records --
+   * each line is a record, or the whole input is one document in multi-line 
mode -- and all of them
+   * feed a single [[JsonInferSchema]] pass, exactly as a directory of the 
same files would infer.
+   * Because [[JsonInferSchema]] already merges every record's type by field 
name across all inputs,
+   * one pass is itself the union: a field empty in one input but typed in 
another widens to the
+   * real type, and a `NullType` field survives to the single final 
canonicalization rather than
+   * being collapsed per-input. A corrupt/missing input is skipped as a unit 
(a whole archive or a
+   * whole file) when `ignoreCorruptFiles`/`ignoreMissingFiles` are set.
+   */
+  private def inferWithArchives(
+      sparkSession: SparkSession,
+      inputPaths: Seq[FileStatus],
+      parsedOptions: JSONOptions): StructType = {
+    val baseRdd = JsonDataSource.createBaseRdd(sparkSession, inputPaths, 
parsedOptions)
+    val multiLine = parsedOptions.multiLine
+    val lineSeparator = parsedOptions.lineSeparatorInRead
+    val encoding = parsedOptions.encoding
+    val ignoreCorruptFiles = parsedOptions.ignoreCorruptFiles
+    val ignoreMissingFiles = parsedOptions.ignoreMissingFiles
+
+    // Applies `perEntry` to each input -- once per archive entry, once for a 
loose file -- skipping
+    // a whole input on corrupt/missing input when the ignore flags are set. 
The entry/file stream
+    // is consumed lazily by `perEntry`, never buffered whole; mirrors CSV's 
`inferWithArchives`.
+    def perInput[T: ClassTag](perEntry: InputStream => Iterator[T]): RDD[T] = 
baseRdd.flatMap {
+      stream =>
+        val path = new Path(stream.getPath())
+        try {
+          if (ArchiveReader.isArchivePath(path)) {
+            ArchiveReader(path).readEntries(stream.getConfiguration) { (_, in) 
=> perEntry(in) }
+          } else {
+            perEntry(
+              
CodecStreams.createInputStreamWithCloseResource(stream.getConfiguration, path))
+          }
+        } catch {
+          case e: FileNotFoundException if ignoreMissingFiles =>
+            logWarning(log"Skipped missing input: ${MDC(PATH, 
stream.getPath())}", e)
+            Iterator.empty
+          case e: FileNotFoundException => throw e
+          case e @ (_: RuntimeException | _: IOException) if 
ignoreCorruptFiles =>
+            logWarning(log"Skipped the corrupted input: ${MDC(PATH, 
stream.getPath())}", e)
+            Iterator.empty
+          case NonFatal(e) =>
+            throw QueryExecutionErrors.cannotReadFilesError(
+              e, SparkPath.fromPathString(stream.getPath()).urlEncoded)
+        }
+    }
+
+    SQLExecution.withSQLConfPropagated(sparkSession) {
+      val inferSchema = new JsonInferSchema(parsedOptions)
+      if (multiLine) {
+        // Each input/entry is one JSON document: hand its stream straight to 
the parser
+        // (`CreateJacksonParser.inputStream`, matching 
MultiLineJsonDataSource and its charset
+        // auto-detect) so the document is parsed incrementally rather than 
buffered.
+        val docs = perInput(in => Iterator.single(in))

Review Comment:
   These `RDD[InputStream]` elements are live `CloseShieldInputStream` views 
over the single shared `TarArchiveInputStream` cursor — each valid only until 
`readEntries` advances to the next entry (`getNextEntry` skips the prior 
entry's unread bytes). This is correct today only because 
`JsonInferSchema.infer` fully consumes each stream (`flatMap { 
tryWithResource(parse) }.reduceOption`) before pulling the next, and 
`inferWithArchives` is its only consumer.
   
   The line-delimited sibling (`RDD[Array[Byte]]`) and the CSV analogue 
(`RDD[Array[String]]`) both materialize their records, so they don't carry this 
constraint. If `infer`'s per-partition consumption ever changes to buffer / 
look ahead / parallelize, this path would silently read from an advanced cursor 
and infer a wrong schema (no exception; the one-entry fixtures here can't 
observe it). Non-blocking, and the streaming is a worthwhile memory 
optimization — but worth a line in the `perInput` comment documenting that the 
consumer must fully consume each stream before the iterator advances.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to