cloud-fan commented on code in PR #56480:
URL: https://github.com/apache/spark/pull/56480#discussion_r3406471065


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala:
##########
@@ -75,6 +123,79 @@ abstract class JsonDataSource extends Serializable {
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
       parsedOptions: JSONOptions): StructType
+
+  /**
+   * Infers a JSON schema when at least one input is a tar archive. Every 
archive entry (streamed
+   * via `ArchiveReader`, never unpacked to disk) and every loose file is read 
as JSON records --
+   * each line is a record, or the whole input is one document in multi-line 
mode -- and all of them
+   * feed a single [[JsonInferSchema]] pass, exactly as a directory of the 
same files would infer.
+   * Because [[JsonInferSchema]] already merges every record's type by field 
name across all inputs,
+   * one pass is itself the union: a field empty in one input but typed in 
another widens to the
+   * real type, and a `NullType` field survives to the single final 
canonicalization rather than
+   * being collapsed per-input. A corrupt/missing input is skipped as a unit 
(a whole archive or a
+   * whole file) when `ignoreCorruptFiles`/`ignoreMissingFiles` are set.
+   */
+  private def inferWithArchives(
+      sparkSession: SparkSession,
+      inputPaths: Seq[FileStatus],
+      parsedOptions: JSONOptions): StructType = {
+    val streams = JsonDataSource.createBaseRdd(sparkSession, inputPaths, 
parsedOptions)
+    val multiLine = parsedOptions.multiLine
+    val lineSeparator = parsedOptions.lineSeparatorInRead
+    val encoding = parsedOptions.encoding
+    val ignoreCorruptFiles = parsedOptions.ignoreCorruptFiles
+    val ignoreMissingFiles = parsedOptions.ignoreMissingFiles
+    // Each input is streamed lazily; records are copied into fresh 
`UTF8String`s (the line reader
+    // reuses one buffer) so they stay valid after the stream advances. An 
archive is read entry by
+    // entry; a loose file is read directly -- both yield the same record 
units, so all inputs feed
+    // one inference pass.
+    val records: RDD[UTF8String] = streams.flatMap { stream =>
+      val path = new Path(stream.getPath())
+      // Decode each record with the configured `encoding` (re-encoding to 
UTF-8) so archive
+      // inference matches the scan and a directory read. With no `encoding`, 
the raw bytes are
+      // kept and parsed as UTF-8 by `CreateJacksonParser.utf8String`.
+      def toRecord(bytes: Array[Byte], length: Int): UTF8String = encoding 
match {
+        case Some(enc) => UTF8String.fromString(new String(bytes, 0, length, 
enc))
+        case None => UTF8String.fromBytes(bytes, 0, length)

Review Comment:
   When `encoding` is unset, this parses records strictly as UTF-8: 
`CreateJacksonParser.utf8String` wraps an `InputStreamReader` with 
`StandardCharsets.UTF_8`. But the archive scan path 
(`MultiLineJsonDataSource.readStream` -> `CreateJacksonParser.inputStream`) and 
a directory read's inference are byte-based, so Jackson auto-detects 
UTF-16/UTF-32 there. A multiLine UTF-16 document with no `encoding` option 
therefore infers a corrupt-record-only schema whenever an archive is among the 
inputs, but the real schema from a directory read of the same files — and loose 
files alongside the archive are affected too, since every input routes through 
this pass. The new encoding test only covers the explicit-`encoding` case.
   
   I'd carry records as `Array[Byte]` and choose the parser the way 
`CreateJacksonParser.text`/`internalRow` do: `factory.createParser(bytes, 0, 
len)` when `encoding` is empty (byte-based, auto-detects, matching the scan), 
`getStreamDecoder` when set. That also subsumes the buffer-copy issue flagged 
at line 148.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala:
##########
@@ -75,6 +123,79 @@ abstract class JsonDataSource extends Serializable {
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
       parsedOptions: JSONOptions): StructType
+
+  /**
+   * Infers a JSON schema when at least one input is a tar archive. Every 
archive entry (streamed
+   * via `ArchiveReader`, never unpacked to disk) and every loose file is read 
as JSON records --
+   * each line is a record, or the whole input is one document in multi-line 
mode -- and all of them
+   * feed a single [[JsonInferSchema]] pass, exactly as a directory of the 
same files would infer.
+   * Because [[JsonInferSchema]] already merges every record's type by field 
name across all inputs,
+   * one pass is itself the union: a field empty in one input but typed in 
another widens to the
+   * real type, and a `NullType` field survives to the single final 
canonicalization rather than
+   * being collapsed per-input. A corrupt/missing input is skipped as a unit 
(a whole archive or a
+   * whole file) when `ignoreCorruptFiles`/`ignoreMissingFiles` are set.
+   */
+  private def inferWithArchives(
+      sparkSession: SparkSession,
+      inputPaths: Seq[FileStatus],
+      parsedOptions: JSONOptions): StructType = {
+    val streams = JsonDataSource.createBaseRdd(sparkSession, inputPaths, 
parsedOptions)
+    val multiLine = parsedOptions.multiLine
+    val lineSeparator = parsedOptions.lineSeparatorInRead
+    val encoding = parsedOptions.encoding
+    val ignoreCorruptFiles = parsedOptions.ignoreCorruptFiles
+    val ignoreMissingFiles = parsedOptions.ignoreMissingFiles
+    // Each input is streamed lazily; records are copied into fresh 
`UTF8String`s (the line reader

Review Comment:
   This comment overstates what the no-`encoding` branch does: 
`UTF8String.fromBytes(bytes, 0, length)` wraps the array without copying, and 
`line.getBytes` is `lineIterator`'s single reused `Text` buffer. It works today 
only because sampling and inference consume each record before the next 
`readLine` overwrites the buffer — any future buffering step downstream (a 
cache, batched sampling) would silently corrupt records. Copying for real 
(`line.copyBytes()`) is cheap next to the parse cost and makes the comment true.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/JSONArchiveReadBase.scala:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+
+import org.apache.spark.sql.{AnalysisException, DataFrame}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{NullType, StringType}
+import org.apache.spark.util.Utils
+
+/**
+ * Binds [[ArchiveReadSuiteBase]]'s file-format hooks to JSON. JSON opts into 
the shared
+ * schema-inference and complex-type tests (see 
`supportsSchemaInference`/`supportsComplexTypes`),
+ * and adds the JSON-specific tests with no format-agnostic analogue: NullType 
canonicalization,
+ * field-union/null-in-loose merging, and multi-line documents. Reusable 
across archive formats: a
+ * `JSON<Archive>Read` suite mixes this in alongside the archive-format trait.
+ */
+trait JSONArchiveReadBase extends ArchiveReadSuiteBase {
+
+  override protected def format: String = "json"
+
+  override protected def fileExtension: String = "json"
+
+  // JSON records are self-describing, so no per-file read options are 
required.
+  override protected def readOptions: Map[String, String] = Map.empty
+
+  override protected def readSchema: String = "id INT, name STRING"
+
+  // JSON infers its schema from record content and represents nested structs, 
so it opts into the
+  // shared inference and complex-type tests. Inference needs no trigger 
option, so the inherited
+  // `inferenceOptions` keeps its empty default.
+  override protected def supportsSchemaInference: Boolean = true
+
+  override protected def supportsComplexTypes: Boolean = true
+
+  override protected def encodeFile(
+      df: DataFrame,
+      writeOptions: Map[String, String]): Array[Byte] = {
+    val dir = Utils.createTempDir(namePrefix = "archive-test-encode")
+    try {
+      df.coalesce(1).write.format("json")
+        .options(writeOptions)
+        .mode("overwrite").save(dir.getCanonicalPath)
+      val parts = dir.listFiles().filter { f =>
+        f.isFile && !f.getName.startsWith("_") && !f.getName.startsWith(".") &&
+          !f.getName.endsWith(".crc")
+      }
+      assert(parts.length == 1,
+        s"expected exactly one data file, got: ${parts.map(_.getName).toList}")
+      Files.readAllBytes(parts.head.toPath)
+    } finally Utils.deleteRecursively(dir)
+  }
+
+  /** Raw JSON bytes, for tests that need precise control over the record 
layout. */
+  protected def jsonBytes(s: String): Array[Byte] = 
s.getBytes(StandardCharsets.UTF_8)
+
+  // ----- JSON-specific schema inference 
--------------------------------------
+  // The format-agnostic parity/widening/corrupt-skip and complex-type tests 
run from
+  // ArchiveReadSuiteBase (gated by the `supports*` hooks above); the tests 
below assert
+  // JSON-specific inference behavior -- NullType canonicalization and 
field-union merging -- that
+  // has no format-agnostic analogue. They use the shared `inferredSchema` 
helper from the base.
+
+  test("JSON: inference merges archive entries with loose files in the same 
directory") {
+    withTempDir { dir =>
+      val inArchive = 
jsonBytes("{\"id\":1,\"name\":\"Alice\"}\n{\"id\":2,\"name\":\"Bob\"}\n")
+      // The loose file drops `name` (which the archive entry has) and adds 
`age` (which it lacks);
+      // JSON's single inference pass unions both by field name, exactly as a 
directory read does.
+      val loose = jsonBytes("{\"id\":3,\"age\":30}\n")
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"), 
Seq(entryName(0) -> inArchive))
+      Files.write(new File(dir, s"loose.$fileExtension").toPath, loose)
+      val schema = inferredSchema(Seq(dir.getCanonicalPath))
+      assert(schema.fieldNames.toSet == Set("id", "name", "age"),
+        s"expected the union of the entry and loose fields, got $schema")
+      withTempDir { looseDir =>
+        Files.write(new File(looseDir, entryName(0)).toPath, inArchive)
+        Files.write(new File(looseDir, s"loose.$fileExtension").toPath, loose)
+        val expected = inferredSchema(Seq(looseDir.getCanonicalPath))
+        assert(schema == expected,
+          s"mixed archive+loose inference diverged from directory; got 
$schema, want $expected")
+      }
+    }
+  }
+
+  test("JSON: a column null across all archive entries infers as string") {
+    // Field `v` is null in every record across both entries, so each 
per-record type is NullType.
+    // The single inference pass canonicalizes the surviving NullType to 
StringType at the end (a
+    // valid schema, no NullType), matching a directory read of the same files.
+    val entries = Seq(
+      entryName(0) -> 
jsonBytes("{\"k\":1,\"v\":null}\n{\"k\":2,\"v\":null}\n"),
+      entryName(1) -> jsonBytes("{\"k\":3,\"v\":null}\n"))
+    withArchiveFile() { archive =>
+      writeArchive(archive, entries)
+      val archiveSchema = inferredSchema(Seq(archive.getCanonicalPath))
+      assert(archiveSchema.forall(_.dataType != NullType),
+        s"expected no NullType columns after canonicalization, got 
$archiveSchema")
+      assert(archiveSchema.find(_.name == "v").exists(_.dataType == 
StringType),
+        s"expected the all-null column to canonicalize to string, got 
$archiveSchema")
+      withTempDir { dir =>
+        entries.foreach { case (n, b) => Files.write(new File(dir, n).toPath, 
b) }
+        val dirSchema = inferredSchema(Seq(dir.getCanonicalPath))
+        assert(archiveSchema == dirSchema,
+          s"all-null column inference diverged from a directory; " +
+            s"archive=$archiveSchema dir=$dirSchema")
+      }
+    }
+  }
+
+  test("JSON: multiline inference merges archive entries with loose files") {
+    // multiLine + mixed inputs: inference reads every archive entry and loose 
file as one whole
+    // document and infers a single schema, matching a directory read of the 
same files.
+    val opts = Map("multiLine" -> "true")
+    withTempDir { dir =>
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"),
+        Seq(entryName(0) -> jsonBytes("{\n  \"id\": 1,\n  \"name\": 
\"Alice\"\n}")))
+      Files.write(new File(dir, s"loose.$fileExtension").toPath,
+        jsonBytes("{\n  \"id\": 2,\n  \"age\": 30\n}"))
+      val schema = inferredSchema(Seq(dir.getCanonicalPath), opts)
+      withTempDir { looseDir =>
+        Files.write(new File(looseDir, entryName(0)).toPath,
+          jsonBytes("{\n  \"id\": 1,\n  \"name\": \"Alice\"\n}"))
+        Files.write(new File(looseDir, s"loose.$fileExtension").toPath,
+          jsonBytes("{\n  \"id\": 2,\n  \"age\": 30\n}"))
+        val dirSchema = inferredSchema(Seq(looseDir.getCanonicalPath), opts)
+        assert(schema == dirSchema,
+          s"multiline mixed inference diverged from a directory read; got 
$schema want $dirSchema")
+      }
+    }
+  }
+
+  test("JSON: a field typed in the archive but null in a loose file is not 
collapsed to string") {
+    // One inference pass over all inputs keeps the loose file's null `v` as 
NullType until the end,
+    // so it widens with the archive's int. Inferring the loose file alone and 
merging two finished
+    // schemas would have canonicalized `v` to string first and yielded string 
here -- diverging
+    // from a directory read of the same files.
+    val archived = jsonBytes("{\"k\":1,\"v\":10}\n{\"k\":2,\"v\":20}\n")
+    val loose = jsonBytes("{\"k\":3,\"v\":null}\n")
+    withTempDir { dir =>
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"), 
Seq(entryName(0) -> archived))
+      Files.write(new File(dir, s"loose.$fileExtension").toPath, loose)
+      val schema = inferredSchema(Seq(dir.getCanonicalPath))
+      withTempDir { looseDir =>
+        Files.write(new File(looseDir, entryName(0)).toPath, archived)
+        Files.write(new File(looseDir, s"loose.$fileExtension").toPath, loose)
+        assert(schema == inferredSchema(Seq(looseDir.getCanonicalPath)),
+          s"null-in-loose field diverged from a directory read; got $schema")
+      }
+      assert(schema.find(_.name == "v").exists(_.dataType != StringType),
+        s"field null in the loose file should widen with the archive int, not 
collapse: $schema")
+    }
+  }
+
+  test("JSON: archive inference honors the encoding option like a directory") {
+    // A non-UTF-8 (UTF-16) archive: inference must decode each entry with 
`encoding`, exactly as
+    // the scan and a directory read do. Reading the bytes as UTF-8 would 
mis-parse them, diverging
+    // from a directory read of the same files. multiLine makes the whole 
document one record, so
+    // line-separator handling does not enter the test.
+    val opts = Map("encoding" -> "UTF-16", "multiLine" -> "true")
+    val bytes = "{\n  \"id\": 1,\n  \"name\": 
\"Alice\"\n}".getBytes(StandardCharsets.UTF_16)
+    withTempDir { dir =>
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"), 
Seq(entryName(0) -> bytes))
+      val schema = inferredSchema(Seq(dir.getCanonicalPath), opts)
+      withTempDir { looseDir =>
+        Files.write(new File(looseDir, entryName(0)).toPath, bytes)
+        val dirSchema = inferredSchema(Seq(looseDir.getCanonicalPath), opts)
+        assert(schema == dirSchema,
+          s"encoding inference diverged from a directory read; archive=$schema 
dir=$dirSchema")
+      }
+      assert(schema.fieldNames.toSet == Set("id", "name"),
+        s"expected id/name decoded from UTF-16, got $schema")
+    }
+  }
+
+  // ----- JSON-specific read tests 
--------------------------------------------
+
+  test("JSON: entries with differing fields union like a directory") {

Review Comment:
   Both `readStream` implementations wire new `FailureSafeParser`s, and the 
multiLine corrupt-record echo differs from `readFile`'s (pre-buffered bytes vs. 
file re-read), but no test puts a malformed JSON record inside an archive 
entry. A small `assertArchiveMatchesDir` case with one bad record would pin 
down `_corrupt_record`/permissive-mode parity with a directory read for both 
modes.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuiteBase.scala:
##########
@@ -104,6 +106,47 @@ trait ArchiveReadSuiteBase extends QueryTest with 
SharedSparkSession {
       schema: String = readSchema): DataFrame =
     spark.read.format(format).options(readOptions ++ 
extraOptions).schema(schema).load(path)
 
+  // ----- schema-inference / complex-type capability hooks (default off) 
------
+  //
+  // A format opts into the shared inference / complex-type tests below by 
overriding the
+  // corresponding `supports*` hook to true; a format that supports neither 
(e.g. text) leaves both
+  // false and runs only the read tests.
+
+  /**
+   * Whether this format infers its read schema from the textual content of 
the files (CSV, JSON,
+   * XML), as opposed to carrying an embedded schema (Avro, Parquet) or none 
(text). Gates the
+   * shared schema-inference tests. A format that needs an option to trigger 
inference (e.g. CSV's
+   * `inferSchema`) also overrides [[inferenceOptions]].
+   */
+  protected def supportsSchemaInference: Boolean = false
+
+  /** Extra options that trigger/control inference for 
[[supportsSchemaInference]] formats. */
+  protected def inferenceOptions: Map[String, String] = Map.empty
+
+  /**
+   * Whether this format can represent nested/complex types 
(struct/array/map). Gates the shared
+   * complex-type round-trip test; CSV and text leave it false, 
JSON/Avro/Parquet/XML override true.

Review Comment:
   Only JSON overrides this — no Avro/Parquet/XML archive traits exist yet.
   ```suggestion
      * complex-type round-trip test; CSV and text leave it false, JSON 
overrides it to true.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to