cloud-fan commented on code in PR #56480:
URL: https://github.com/apache/spark/pull/56480#discussion_r3406577526


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala:
##########
@@ -75,6 +123,72 @@ abstract class JsonDataSource extends Serializable {
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
       parsedOptions: JSONOptions): StructType
+
+  /**
+   * Infers a JSON schema when at least one input is a tar archive. Every 
archive entry (streamed
+   * via `ArchiveReader`, never unpacked to disk) and every loose file is read 
as JSON records --
+   * each line is a record, or the whole input is one document in multi-line 
mode -- and all of them
+   * feed a single [[JsonInferSchema]] pass, exactly as a directory of the 
same files would infer.
+   * Because [[JsonInferSchema]] already merges every record's type by field 
name across all inputs,
+   * one pass is itself the union: a field empty in one input but typed in 
another widens to the
+   * real type, and a `NullType` field survives to the single final 
canonicalization rather than
+   * being collapsed per-input. A corrupt/missing input is skipped as a unit 
(a whole archive or a
+   * whole file) when `ignoreCorruptFiles`/`ignoreMissingFiles` are set.
+   */
+  private def inferWithArchives(
+      sparkSession: SparkSession,
+      inputPaths: Seq[FileStatus],
+      parsedOptions: JSONOptions): StructType = {
+    val streams = JsonDataSource.createBaseRdd(sparkSession, inputPaths, 
parsedOptions)
+    val multiLine = parsedOptions.multiLine
+    val lineSeparator = parsedOptions.lineSeparatorInRead
+    val encoding = parsedOptions.encoding
+    val ignoreCorruptFiles = parsedOptions.ignoreCorruptFiles
+    val ignoreMissingFiles = parsedOptions.ignoreMissingFiles
+    // Each input is streamed lazily, carrying each record as its raw bytes in 
a fresh array (the
+    // line reader reuses one buffer, so a line is copied off it). The bytes 
are parsed downstream
+    // exactly as the scan parses a file: a byte-array parser auto-detects the 
charset when no
+    // `encoding` is set (so UTF-16/UTF-32 is read correctly, not just UTF-8), 
and a stream decoder
+    // applies the explicit `encoding` when one is given. An archive is read 
entry by entry; a loose
+    // file is read directly -- both yield the same record units, so all 
inputs feed one pass.
+    val records: RDD[Array[Byte]] = streams.flatMap { stream =>
+      val path = new Path(stream.getPath())
+      def recordsOf(in: InputStream): Iterator[Array[Byte]] =
+        if (multiLine) {
+          Iterator.single(in.readAllBytes())
+        } else {
+          ArchiveReader.lineIterator(in, lineSeparator).map(_.copyBytes())
+        }
+      try {
+        if (ArchiveReader.isArchivePath(path)) {
+          ArchiveReader(path).readEntries(stream.getConfiguration)((_, in) => 
recordsOf(in))
+        } else {
+          
recordsOf(CodecStreams.createInputStreamWithCloseResource(stream.getConfiguration,
 path))
+        }
+      } catch {
+        case e: FileNotFoundException if ignoreMissingFiles =>
+          logWarning(log"Skipped missing input: ${MDC(PATH, 
stream.getPath())}", e)
+          Iterator.empty
+        case e: FileNotFoundException => throw e
+        case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles 
=>
+          logWarning(log"Skipped the corrupted input: ${MDC(PATH, 
stream.getPath())}", e)
+          Iterator.empty
+        case NonFatal(e) =>
+          throw QueryExecutionErrors.cannotReadFilesError(
+            e, SparkPath.fromPathString(stream.getPath()).urlEncoded)
+      }
+    }
+    // Honor `samplingRatio` like the loose-file infer paths, so an archive 
samples its records like
+    // a directory read rather than always reading every one.
+    val sampled = JsonUtils.sample(records, parsedOptions)
+    val recordParser = encoding
+      .map(enc => CreateJacksonParser.bytes(enc, _: JsonFactory, _: 
Array[Byte]))

Review Comment:
   One charset cell still diverges from the scan: multiLine with an `encoding` 
outside `CharsetProvider`'s allow-list (e.g. `windows-1252`, which 
`JSONOptionsInRead` allows in multiLine mode). `bytes(enc, ...)` decodes via 
`CharsetProvider.newDecoder`, which rejects such charsets unless 
`spark.sql.legacy.javaCharsets` is set, and with `isReadFile=true` the 
exception propagates out of `JsonInferSchema.infer` — so archive inference 
fails with `INVALID_PARAMETER_VALUE.CHARSET` while the multiLine scan 
(`CreateJacksonParser.inputStream`, a raw `InputStreamReader`) and a directory 
read's inference accept the same files. Line-delimited is unaffected: its scan 
path uses the same `getStreamDecoder`.
   
   I'd build the multiLine parser as `CreateJacksonParser.inputStream(enc, 
factory, new ByteArrayInputStream(record))` so each mode matches its own scan 
path exactly. Non-blocking: the failure is loud and the charset combination is 
rare.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala:
##########
@@ -42,9 +41,11 @@ object JsonUtils {
   }
 
   /**
-   * Sample JSON RDD as configured by `samplingRatio`.
+   * Sample a JSON record RDD as configured by `samplingRatio`. Generic over 
the record type so the
+   * multiLine path (`RDD[PortableDataStream]`) and the archive inference path 
(`RDD[UTF8String]`)

Review Comment:
   Went stale with the switch to raw bytes:
   ```suggestion
      * multiLine path (`RDD[PortableDataStream]`) and the archive inference 
path (`RDD[Array[Byte]]`)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to