cloud-fan commented on code in PR #56480:
URL: https://github.com/apache/spark/pull/56480#discussion_r3406719629


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuiteBase.scala:
##########
@@ -104,6 +106,47 @@ trait ArchiveReadSuiteBase extends QueryTest with 
SharedSparkSession {
       schema: String = readSchema): DataFrame =
     spark.read.format(format).options(readOptions ++ 
extraOptions).schema(schema).load(path)
 
+  // ----- schema-inference / complex-type capability hooks (default off) 
------
+  //
+  // A format opts into the shared inference / complex-type tests below by 
overriding the
+  // corresponding `supports*` hook to true; a format that supports neither 
(e.g. text) leaves both
+  // false and runs only the read tests.
+
+  /**
+   * Whether this format infers its read schema from the textual content of 
the files (CSV, JSON,
+   * XML), as opposed to carrying an embedded schema (Avro, Parquet) or none 
(text). Gates the
+   * shared schema-inference tests. A format that needs an option to trigger 
inference (e.g. CSV's
+   * `inferSchema`) also overrides [[inferenceOptions]].
+   */
+  protected def supportsSchemaInference: Boolean = false
+
+  /** Extra options that trigger/control inference for 
[[supportsSchemaInference]] formats. */
+  protected def inferenceOptions: Map[String, String] = Map.empty
+
+  /**
+   * Whether this format can represent nested/complex types 
(struct/array/map). Gates the shared
+   * complex-type round-trip test; CSV and text leave it false, JSON overrides 
it to true.
+   */
+  protected def supportsComplexTypes: Boolean = false
+
+  /** Sample data with a nested struct column, used by the complex-type test. 
*/
+  protected def complexSampleDf: DataFrame =
+    Seq((1, "NYC", "10001"), (2, "SF", "94105")).toDF("id", "city", "zip")
+      .select(col("id"), struct(col("city"), col("zip")).as("addr"))
+
+  /** Read schema matching [[complexSampleDf]]. */
+  protected def complexReadSchema: String = "id INT, addr STRUCT<city: STRING, 
zip: STRING>"
+
+  /**
+   * Schema [[format]] infers from `paths` under [[readOptions]] ++ 
[[inferenceOptions]] (plus
+   * `extraOptions`). Loading several paths reads them as one fileset, exactly 
as a directory read.

Review Comment:
   Reads as an incomplete comparison — the sibling comments spell out the verb 
("as a directory read does"). The line is at 99 chars, hence the rewrap.
   ```suggestion
      * `extraOptions`). Loading several paths reads them as one fileset, 
exactly as a directory
      * read does.
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/JSONArchiveReadBase.scala:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+
+import org.apache.spark.sql.{AnalysisException, DataFrame}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{NullType, StringType}
+import org.apache.spark.util.Utils
+
+/**
+ * Binds [[ArchiveReadSuiteBase]]'s file-format hooks to JSON. JSON opts into 
the shared
+ * schema-inference and complex-type tests (see 
`supportsSchemaInference`/`supportsComplexTypes`),
+ * and adds the JSON-specific tests with no format-agnostic analogue: NullType 
canonicalization,
+ * field-union/null-in-loose merging, and multi-line documents. Reusable 
across archive formats: a
+ * `JSON<Archive>Read` suite mixes this in alongside the archive-format trait.
+ */
+trait JSONArchiveReadBase extends ArchiveReadSuiteBase {
+
+  override protected def format: String = "json"
+
+  override protected def fileExtension: String = "json"
+
+  // JSON records are self-describing, so no per-file read options are 
required.
+  override protected def readOptions: Map[String, String] = Map.empty
+
+  override protected def readSchema: String = "id INT, name STRING"
+
+  // JSON infers its schema from record content and represents nested structs, 
so it opts into the
+  // shared inference and complex-type tests. Inference needs no trigger 
option, so the inherited
+  // `inferenceOptions` keeps its empty default.
+  override protected def supportsSchemaInference: Boolean = true
+
+  override protected def supportsComplexTypes: Boolean = true
+
+  override protected def encodeFile(
+      df: DataFrame,
+      writeOptions: Map[String, String]): Array[Byte] = {
+    val dir = Utils.createTempDir(namePrefix = "archive-test-encode")
+    try {
+      df.coalesce(1).write.format("json")
+        .options(writeOptions)
+        .mode("overwrite").save(dir.getCanonicalPath)
+      val parts = dir.listFiles().filter { f =>
+        f.isFile && !f.getName.startsWith("_") && !f.getName.startsWith(".") &&
+          !f.getName.endsWith(".crc")
+      }
+      assert(parts.length == 1,
+        s"expected exactly one data file, got: ${parts.map(_.getName).toList}")
+      Files.readAllBytes(parts.head.toPath)
+    } finally Utils.deleteRecursively(dir)
+  }
+
+  /** Raw JSON bytes, for tests that need precise control over the record 
layout. */
+  protected def jsonBytes(s: String): Array[Byte] = 
s.getBytes(StandardCharsets.UTF_8)
+
+  // ----- JSON-specific schema inference 
--------------------------------------
+  // The format-agnostic parity/widening/corrupt-skip and complex-type tests 
run from
+  // ArchiveReadSuiteBase (gated by the `supports*` hooks above); the tests 
below assert
+  // JSON-specific inference behavior -- NullType canonicalization and 
field-union merging -- that
+  // has no format-agnostic analogue. They use the shared `inferredSchema` 
helper from the base.
+
+  test("JSON: inference merges archive entries with loose files in the same 
directory") {
+    withTempDir { dir =>
+      val inArchive = 
jsonBytes("{\"id\":1,\"name\":\"Alice\"}\n{\"id\":2,\"name\":\"Bob\"}\n")
+      // The loose file drops `name` (which the archive entry has) and adds 
`age` (which it lacks);
+      // JSON's single inference pass unions both by field name, exactly as a 
directory read does.
+      val loose = jsonBytes("{\"id\":3,\"age\":30}\n")
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"), 
Seq(entryName(0) -> inArchive))
+      Files.write(new File(dir, s"loose.$fileExtension").toPath, loose)
+      val schema = inferredSchema(Seq(dir.getCanonicalPath))
+      assert(schema.fieldNames.toSet == Set("id", "name", "age"),
+        s"expected the union of the entry and loose fields, got $schema")
+      withTempDir { looseDir =>
+        Files.write(new File(looseDir, entryName(0)).toPath, inArchive)
+        Files.write(new File(looseDir, s"loose.$fileExtension").toPath, loose)
+        val expected = inferredSchema(Seq(looseDir.getCanonicalPath))
+        assert(schema == expected,
+          s"mixed archive+loose inference diverged from directory; got 
$schema, want $expected")
+      }
+    }
+  }
+
+  test("JSON: a column null across all archive entries infers as string") {
+    // Field `v` is null in every record across both entries, so each 
per-record type is NullType.
+    // The single inference pass canonicalizes the surviving NullType to 
StringType at the end (a
+    // valid schema, no NullType), matching a directory read of the same files.
+    val entries = Seq(
+      entryName(0) -> 
jsonBytes("{\"k\":1,\"v\":null}\n{\"k\":2,\"v\":null}\n"),
+      entryName(1) -> jsonBytes("{\"k\":3,\"v\":null}\n"))
+    withArchiveFile() { archive =>
+      writeArchive(archive, entries)
+      val archiveSchema = inferredSchema(Seq(archive.getCanonicalPath))
+      assert(archiveSchema.forall(_.dataType != NullType),
+        s"expected no NullType columns after canonicalization, got 
$archiveSchema")
+      assert(archiveSchema.find(_.name == "v").exists(_.dataType == 
StringType),
+        s"expected the all-null column to canonicalize to string, got 
$archiveSchema")
+      withTempDir { dir =>
+        entries.foreach { case (n, b) => Files.write(new File(dir, n).toPath, 
b) }
+        val dirSchema = inferredSchema(Seq(dir.getCanonicalPath))
+        assert(archiveSchema == dirSchema,
+          s"all-null column inference diverged from a directory; " +
+            s"archive=$archiveSchema dir=$dirSchema")
+      }
+    }
+  }
+
+  test("JSON: multiline inference merges archive entries with loose files") {
+    // multiLine + mixed inputs: inference reads every archive entry and loose 
file as one whole
+    // document and infers a single schema, matching a directory read of the 
same files.
+    val opts = Map("multiLine" -> "true")
+    withTempDir { dir =>
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"),
+        Seq(entryName(0) -> jsonBytes("{\n  \"id\": 1,\n  \"name\": 
\"Alice\"\n}")))
+      Files.write(new File(dir, s"loose.$fileExtension").toPath,
+        jsonBytes("{\n  \"id\": 2,\n  \"age\": 30\n}"))
+      val schema = inferredSchema(Seq(dir.getCanonicalPath), opts)
+      withTempDir { looseDir =>
+        Files.write(new File(looseDir, entryName(0)).toPath,
+          jsonBytes("{\n  \"id\": 1,\n  \"name\": \"Alice\"\n}"))
+        Files.write(new File(looseDir, s"loose.$fileExtension").toPath,
+          jsonBytes("{\n  \"id\": 2,\n  \"age\": 30\n}"))
+        val dirSchema = inferredSchema(Seq(looseDir.getCanonicalPath), opts)
+        assert(schema == dirSchema,
+          s"multiline mixed inference diverged from a directory read; got 
$schema want $dirSchema")
+      }
+    }
+  }
+
+  test("JSON: a field typed in the archive but null in a loose file is not 
collapsed to string") {
+    // One inference pass over all inputs keeps the loose file's null `v` as 
NullType until the end,
+    // so it widens with the archive's int. Inferring the loose file alone and 
merging two finished
+    // schemas would have canonicalized `v` to string first and yielded string 
here -- diverging
+    // from a directory read of the same files.
+    val archived = jsonBytes("{\"k\":1,\"v\":10}\n{\"k\":2,\"v\":20}\n")
+    val loose = jsonBytes("{\"k\":3,\"v\":null}\n")
+    withTempDir { dir =>
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"), 
Seq(entryName(0) -> archived))
+      Files.write(new File(dir, s"loose.$fileExtension").toPath, loose)
+      val schema = inferredSchema(Seq(dir.getCanonicalPath))
+      withTempDir { looseDir =>
+        Files.write(new File(looseDir, entryName(0)).toPath, archived)
+        Files.write(new File(looseDir, s"loose.$fileExtension").toPath, loose)
+        assert(schema == inferredSchema(Seq(looseDir.getCanonicalPath)),
+          s"null-in-loose field diverged from a directory read; got $schema")
+      }
+      assert(schema.find(_.name == "v").exists(_.dataType != StringType),
+        s"field null in the loose file should widen with the archive int, not 
collapse: $schema")
+    }
+  }
+
+  test("JSON: archive inference honors the encoding option like a directory") {
+    // A non-UTF-8 (UTF-16) archive: inference must decode each entry with 
`encoding`, exactly as
+    // the scan and a directory read do. Reading the bytes as UTF-8 would 
mis-parse them, diverging
+    // from a directory read of the same files. multiLine makes the whole 
document one record, so
+    // line-separator handling does not enter the test.
+    val opts = Map("encoding" -> "UTF-16", "multiLine" -> "true")
+    val bytes = "{\n  \"id\": 1,\n  \"name\": 
\"Alice\"\n}".getBytes(StandardCharsets.UTF_16)
+    withTempDir { dir =>
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"), 
Seq(entryName(0) -> bytes))
+      val schema = inferredSchema(Seq(dir.getCanonicalPath), opts)
+      withTempDir { looseDir =>
+        Files.write(new File(looseDir, entryName(0)).toPath, bytes)
+        val dirSchema = inferredSchema(Seq(looseDir.getCanonicalPath), opts)
+        assert(schema == dirSchema,
+          s"encoding inference diverged from a directory read; archive=$schema 
dir=$dirSchema")
+      }
+      assert(schema.fieldNames.toSet == Set("id", "name"),
+        s"expected id/name decoded from UTF-16, got $schema")
+    }
+  }
+
+  test("JSON: archive inference auto-detects a non-UTF-8 charset with no 
encoding option") {
+    // With no `encoding` set, inference parses each record from its raw 
bytes, so Jackson
+    // auto-detects the charset (here UTF-16, carrying a BOM) exactly as the 
scan and a directory
+    // read do. Forcing the bytes through UTF-8 would mis-parse them into a 
corrupt-record-only
+    // schema, diverging from a directory read of the same files.
+    val opts = Map("multiLine" -> "true")
+    val bytes = "{\n  \"id\": 1,\n  \"name\": 
\"Alice\"\n}".getBytes(StandardCharsets.UTF_16)
+    withTempDir { dir =>
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"), 
Seq(entryName(0) -> bytes))
+      val schema = inferredSchema(Seq(dir.getCanonicalPath), opts)
+      withTempDir { looseDir =>
+        Files.write(new File(looseDir, entryName(0)).toPath, bytes)
+        val dirSchema = inferredSchema(Seq(looseDir.getCanonicalPath), opts)
+        assert(schema == dirSchema,
+          s"auto-detected inference diverged from a directory read; 
archive=$schema dir=$dirSchema")
+      }
+      assert(schema.fieldNames.toSet == Set("id", "name"),
+        s"expected id/name auto-detected from UTF-16, got $schema")
+    }
+  }
+
+  // ----- JSON-specific read tests 
--------------------------------------------
+
+  test("JSON: entries with differing fields union like a directory") {
+    assertArchiveMatchesDir(
+      Seq(
+        "a.json" -> 
jsonBytes("{\"id\":1,\"name\":\"Alice\"}\n{\"id\":2,\"name\":\"Bob\"}\n"),
+        // No "name" field: the schema's "name" column must read back as null 
for this entry.

Review Comment:
   The fixture is pure ASCII, which windows-1252 and UTF-8 encode identically — 
so if the explicit encoding were silently ignored (the auto-detect branch), the 
test would still pass. A non-ASCII windows-1252 byte (`0xE9`) is malformed 
standalone UTF-8, so it makes that regression observable: the archive side 
would infer a corrupt-record-only schema and fail the parity assert.
   ```suggestion
       val bytes = "{\n  \"id\": 1,\n  \"name\": 
\"Jos\u00e9\"\n}".getBytes("windows-1252")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to