cloud-fan commented on code in PR #56254:
URL: https://github.com/apache/spark/pull/56254#discussion_r3392444897


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CSVArchiveReadBase.scala:
##########
@@ -63,16 +65,158 @@ trait CSVArchiveReadBase extends ArchiveReadSuiteBase {
   /** Raw CSV bytes, for tests that need precise control over the row layout. 
*/
   protected def csvBytes(s: String): Array[Byte] = 
s.getBytes(StandardCharsets.UTF_8)
 
-  test("CSV: reading an archive without a schema fails (inference not yet 
supported)") {
-    // Schema inference for archives is a follow-up; until then an explicit 
schema is required, and
-    // an inference attempt raises Spark's standard UNABLE_TO_INFER_SCHEMA 
error.
+  test("CSV: archive infers the same schema as a directory of the same files") 
{
+    val entries = Seq(sampleDf((1, "Alice"), (2, "Bob")), sampleDf((3, 
"Carol")))
+      .zipWithIndex.map { case (p, i) => entryName(i) -> encodeFile(p) }
+    withArchiveFile() { archive =>
+      writeArchive(archive, entries)
+      val archiveSchema = 
spark.read.options(readOptions).option("inferSchema", "true")
+        .format(format).load(archive.getCanonicalPath).schema
+      withTempDir { dir =>
+        entries.foreach { case (n, b) => Files.write(new File(dir, n).toPath, 
b) }
+        val dirSchema = spark.read.options(readOptions).option("inferSchema", 
"true")
+          .format(format).load(dir.getCanonicalPath).schema
+        assert(archiveSchema == dirSchema,
+          s"inference parity broken; archive=$archiveSchema dir=$dirSchema")
+      }
+    }
+  }
+
+  test("CSV: all archive formats infer the same schema") {
+    val entries = Seq(sampleDf((1, "Alice"), (2, "Bob")), sampleDf((3, 
"Carol")))
+      .zipWithIndex.map { case (p, i) => entryName(i) -> encodeFile(p) }
+    val schemas = archiveExtensions.map { ext =>
+      withArchiveFile(ext) { archive =>
+        writeArchive(archive, entries)
+        spark.read.options(readOptions).option("inferSchema", "true")
+          .format(format).load(archive.getCanonicalPath).schema
+      }
+    }
+    assert(schemas.distinct.size == 1,
+      s"archive formats inferred different schemas: 
${archiveExtensions.zip(schemas)}")
+  }
+
+  /** CSV bytes for `rows`, prefixed with a `cols` header line when [[header]] 
is set. */
+  private def csvEntry(cols: String, rows: String*): Array[Byte] =
+    csvBytes((if (header) cols +: rows else rows).mkString("", "\n", "\n"))
+
+  test("CSV: inference skips a corrupt archive among good ones 
(ignoreCorruptFiles)") {
+    withTempDir { dir =>
+      val good = sampleDf((1, "Alice"), (2, "Bob"))
+      writeArchive(new File(dir, s"good.${archiveExtensions.head}"),
+        Seq(entryName(0) -> encodeFile(good)))
+      writeCorruptArchive(new File(dir, s"bad.$corruptArchiveExtension"))
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+        val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+          .format(format).load(dir.getCanonicalPath).schema
+        withTempDir { onlyGood =>
+          Files.write(new File(onlyGood, entryName(0)).toPath, 
encodeFile(good))
+          val expected = spark.read.options(readOptions).option("inferSchema", 
"true")
+            .format(format).load(onlyGood.getCanonicalPath).schema
+          assert(schema == expected,
+            s"corrupt archive not skipped during inference; got $schema, want 
$expected")
+        }
+      }
+    }
+  }
+
+  test("CSV: inference widens a column's type across archive entries") {
+    withArchiveFile() { archive =>
+      writeArchive(archive, Seq(
+        entryName(0) -> csvEntry("c", "1", "2"),
+        entryName(1) -> csvEntry("c", "x")))
+      val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+        .format(format).load(archive.getCanonicalPath).schema
+      assert(schema.length == 1 && schema.head.dataType == StringType,
+        s"expected the column widened to string across entries, got $schema")
+    }
+  }
+
+  test("CSV: inference merges archive entries with loose files in the same 
directory") {
+    withTempDir { dir =>
+      val inArchive = sampleDf((1, "Alice"), (2, "Bob"))
+      val loose = sampleDf((3, "Carol"))
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"),
+        Seq(entryName(0) -> encodeFile(inArchive)))
+      Files.write(new File(dir, s"loose.$fileExtension").toPath, 
encodeFile(loose))
+      val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+        .format(format).load(dir.getCanonicalPath).schema
+      withTempDir { looseDir =>
+        Files.write(new File(looseDir, entryName(0)).toPath, 
encodeFile(inArchive))
+        Files.write(new File(looseDir, s"loose.$fileExtension").toPath, 
encodeFile(loose))
+        val expected = spark.read.options(readOptions).option("inferSchema", 
"true")
+          .format(format).load(looseDir.getCanonicalPath).schema
+        assert(schema == expected,
+          s"mixed archive+loose inference diverged from directory; got 
$schema, want $expected")
+      }
+    }
+  }
+
+  test("CSV: a column empty in the archive but typed in a loose file is not 
collapsed to string") {
+    // One inference pass over all inputs keeps the empty column NullType 
until the end, so it
+    // widens with the loose file's Int. Merging two already-finished schemas 
would have collapsed
+    // the archive side to String first and yielded String here.
+    withTempDir { dir =>
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"),
+        Seq(entryName(0) -> csvEntry("a,b", "1,", "2,")))
+      Files.write(new File(dir, s"loose.$fileExtension").toPath, 
csvEntry("a,b", "3,4"))
+      val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+        .format(format).load(dir.getCanonicalPath).schema
+      assert(schema.length == 2 && schema(1).dataType != StringType,
+        s"empty-in-archive column should widen with the loose Int, not 
collapse to String: $schema")
+    }
+  }
+
+  test("CSV: archive inference fixes the column count from the first entry's 
header") {
+    // The first entry has two columns, the second three; one inference pass 
keys on the first
+    // header, so the extra column in the later entry is dropped -- the same 
first-header-width
+    // model a single-pass directory read uses.
+    withArchiveFile() { archive =>
+      writeArchive(archive, Seq(
+        entryName(0) -> csvEntry("a,b", "1,2"),
+        entryName(1) -> csvEntry("a,b,c", "3,4,5")))
+      val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+        .format(format).load(archive.getCanonicalPath).schema
+      assert(schema.length == 2,
+        s"expected 2 columns fixed by the first entry's header, got $schema")

Review Comment:
   Asserting the types too pins the per-entry header drop: if the `"a,b,c"` 
header line were parsed as data instead of dropped, both columns would widen to 
string and this assertion would still pass.
   ```suggestion
         assert(schema.length == 2 && schema.forall(_.dataType != StringType),
           s"expected 2 typed columns fixed by the first entry's header, got 
$schema")
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CSVArchiveReadBase.scala:
##########
@@ -63,16 +65,158 @@ trait CSVArchiveReadBase extends ArchiveReadSuiteBase {
   /** Raw CSV bytes, for tests that need precise control over the row layout. 
*/
   protected def csvBytes(s: String): Array[Byte] = 
s.getBytes(StandardCharsets.UTF_8)
 
-  test("CSV: reading an archive without a schema fails (inference not yet 
supported)") {
-    // Schema inference for archives is a follow-up; until then an explicit 
schema is required, and
-    // an inference attempt raises Spark's standard UNABLE_TO_INFER_SCHEMA 
error.
+  test("CSV: archive infers the same schema as a directory of the same files") 
{
+    val entries = Seq(sampleDf((1, "Alice"), (2, "Bob")), sampleDf((3, 
"Carol")))
+      .zipWithIndex.map { case (p, i) => entryName(i) -> encodeFile(p) }
+    withArchiveFile() { archive =>
+      writeArchive(archive, entries)
+      val archiveSchema = 
spark.read.options(readOptions).option("inferSchema", "true")
+        .format(format).load(archive.getCanonicalPath).schema
+      withTempDir { dir =>
+        entries.foreach { case (n, b) => Files.write(new File(dir, n).toPath, 
b) }
+        val dirSchema = spark.read.options(readOptions).option("inferSchema", 
"true")
+          .format(format).load(dir.getCanonicalPath).schema
+        assert(archiveSchema == dirSchema,
+          s"inference parity broken; archive=$archiveSchema dir=$dirSchema")
+      }
+    }
+  }
+
+  test("CSV: all archive formats infer the same schema") {
+    val entries = Seq(sampleDf((1, "Alice"), (2, "Bob")), sampleDf((3, 
"Carol")))
+      .zipWithIndex.map { case (p, i) => entryName(i) -> encodeFile(p) }
+    val schemas = archiveExtensions.map { ext =>
+      withArchiveFile(ext) { archive =>
+        writeArchive(archive, entries)
+        spark.read.options(readOptions).option("inferSchema", "true")
+          .format(format).load(archive.getCanonicalPath).schema
+      }
+    }
+    assert(schemas.distinct.size == 1,
+      s"archive formats inferred different schemas: 
${archiveExtensions.zip(schemas)}")
+  }
+
+  /** CSV bytes for `rows`, prefixed with a `cols` header line when [[header]] 
is set. */
+  private def csvEntry(cols: String, rows: String*): Array[Byte] =
+    csvBytes((if (header) cols +: rows else rows).mkString("", "\n", "\n"))
+
+  test("CSV: inference skips a corrupt archive among good ones 
(ignoreCorruptFiles)") {
+    withTempDir { dir =>
+      val good = sampleDf((1, "Alice"), (2, "Bob"))
+      writeArchive(new File(dir, s"good.${archiveExtensions.head}"),
+        Seq(entryName(0) -> encodeFile(good)))
+      writeCorruptArchive(new File(dir, s"bad.$corruptArchiveExtension"))
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+        val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+          .format(format).load(dir.getCanonicalPath).schema
+        withTempDir { onlyGood =>
+          Files.write(new File(onlyGood, entryName(0)).toPath, 
encodeFile(good))
+          val expected = spark.read.options(readOptions).option("inferSchema", 
"true")
+            .format(format).load(onlyGood.getCanonicalPath).schema
+          assert(schema == expected,
+            s"corrupt archive not skipped during inference; got $schema, want 
$expected")
+        }
+      }
+    }
+  }
+
+  test("CSV: inference widens a column's type across archive entries") {
+    withArchiveFile() { archive =>
+      writeArchive(archive, Seq(
+        entryName(0) -> csvEntry("c", "1", "2"),
+        entryName(1) -> csvEntry("c", "x")))
+      val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+        .format(format).load(archive.getCanonicalPath).schema
+      assert(schema.length == 1 && schema.head.dataType == StringType,
+        s"expected the column widened to string across entries, got $schema")
+    }
+  }
+
+  test("CSV: inference merges archive entries with loose files in the same 
directory") {
+    withTempDir { dir =>
+      val inArchive = sampleDf((1, "Alice"), (2, "Bob"))
+      val loose = sampleDf((3, "Carol"))
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"),
+        Seq(entryName(0) -> encodeFile(inArchive)))
+      Files.write(new File(dir, s"loose.$fileExtension").toPath, 
encodeFile(loose))
+      val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+        .format(format).load(dir.getCanonicalPath).schema
+      withTempDir { looseDir =>
+        Files.write(new File(looseDir, entryName(0)).toPath, 
encodeFile(inArchive))
+        Files.write(new File(looseDir, s"loose.$fileExtension").toPath, 
encodeFile(loose))
+        val expected = spark.read.options(readOptions).option("inferSchema", 
"true")
+          .format(format).load(looseDir.getCanonicalPath).schema
+        assert(schema == expected,
+          s"mixed archive+loose inference diverged from directory; got 
$schema, want $expected")
+      }
+    }
+  }
+
+  test("CSV: a column empty in the archive but typed in a loose file is not 
collapsed to string") {
+    // One inference pass over all inputs keeps the empty column NullType 
until the end, so it
+    // widens with the loose file's Int. Merging two already-finished schemas 
would have collapsed
+    // the archive side to String first and yielded String here.
+    withTempDir { dir =>
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"),
+        Seq(entryName(0) -> csvEntry("a,b", "1,", "2,")))
+      Files.write(new File(dir, s"loose.$fileExtension").toPath, 
csvEntry("a,b", "3,4"))
+      val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+        .format(format).load(dir.getCanonicalPath).schema
+      assert(schema.length == 2 && schema(1).dataType != StringType,
+        s"empty-in-archive column should widen with the loose Int, not 
collapse to String: $schema")
+    }
+  }
+
+  test("CSV: archive inference fixes the column count from the first entry's 
header") {
+    // The first entry has two columns, the second three; one inference pass 
keys on the first
+    // header, so the extra column in the later entry is dropped -- the same 
first-header-width
+    // model a single-pass directory read uses.
+    withArchiveFile() { archive =>
+      writeArchive(archive, Seq(
+        entryName(0) -> csvEntry("a,b", "1,2"),
+        entryName(1) -> csvEntry("a,b,c", "3,4,5")))
+      val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+        .format(format).load(archive.getCanonicalPath).schema
+      assert(schema.length == 2,
+        s"expected 2 columns fixed by the first entry's header, got $schema")
+    }
+  }
+
+  test("CSV: inference uses the same record model as the scan (quoted embedded 
newline)") {

Review Comment:
   The multiLine branch of `tokenizeForInference` has no inference coverage — 
the only multiLine archive tests pass an explicit schema, so nothing pins that 
archive inference dispatches to the stream model. A multiLine variant of this 
test (same body with `.option("multiLine", "true")` on both reads) would pin it.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala:
##########
@@ -67,16 +67,22 @@ abstract class CSVDataSource extends Serializable {
   final def inferSchema(
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
-      parsedOptions: CSVOptions): Option[StructType] = {
+      parsedOptions: CSVOptions,
+      supportsArchiveScan: Boolean): Option[StructType] = {
     parsedOptions.singleVariantColumn match {
       case Some(columnName) => Some(StructType(Array(StructField(columnName, 
VariantType))))
       case None =>
-        if (parsedOptions.archiveFormatEnabled &&
-            inputPaths.exists(f => ArchiveReader.isArchivePath(f.getPath))) {
-          // Schema inference is not yet supported for tar archives. Returning 
None makes Spark
-          // raise its standard "Unable to infer schema ... It must be 
specified manually" error
-          // (UNABLE_TO_INFER_SCHEMA), so reading an archive requires an 
explicit `.schema(...)`.
-          // Inferring a schema by streaming archive entries is planned as a 
follow-up.
+        val hasArchive = parsedOptions.archiveFormatEnabled &&
+          inputPaths.exists(f => ArchiveReader.isArchivePath(f.getPath))
+        if (hasArchive && supportsArchiveScan) {
+          // Archives (and any loose files alongside them) are inferred in a 
single CSVInferSchema
+          // pass over all inputs -- archive entries are streamed, never 
unpacked -- so the result
+          // matches a directory read of the same files.

Review Comment:
   Stale now that the Scaladocs below make the scan-parity claim — with 
per-entry header dropping, mismatched-header inputs intentionally diverge from 
plain-directory inference (`filterHeaderLine`).
   ```suggestion
             // matches what the scan returns for the same files.
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala:
##########
@@ -46,7 +46,11 @@ case class CSVTable(
       columnPruning = sparkSession.sessionState.conf.csvColumnPruning,
       sparkSession.sessionState.conf.sessionLocalTimeZone)
 
-    CSVDataSource(parsedOptions).inferSchema(sparkSession, files, 
parsedOptions)
+    // The DSv2 reader does not route archives to `readArchive` (it calls 
`readFile` directly), so
+    // archive scans aren't supported here; pass supportsArchiveScan = false 
so an archive input
+    // keeps failing with UNABLE_TO_INFER_SCHEMA rather than being mis-read as 
raw CSV bytes.

Review Comment:
   The direction is garbled — the risk is the scan parsing raw archive bytes as 
CSV:
   ```suggestion
       // keeps failing with UNABLE_TO_INFER_SCHEMA rather than having its raw 
bytes parsed as CSV.
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CSVArchiveReadBase.scala:
##########
@@ -63,16 +65,158 @@ trait CSVArchiveReadBase extends ArchiveReadSuiteBase {
   /** Raw CSV bytes, for tests that need precise control over the row layout. 
*/
   protected def csvBytes(s: String): Array[Byte] = 
s.getBytes(StandardCharsets.UTF_8)
 
-  test("CSV: reading an archive without a schema fails (inference not yet 
supported)") {
-    // Schema inference for archives is a follow-up; until then an explicit 
schema is required, and
-    // an inference attempt raises Spark's standard UNABLE_TO_INFER_SCHEMA 
error.
+  test("CSV: archive infers the same schema as a directory of the same files") 
{
+    val entries = Seq(sampleDf((1, "Alice"), (2, "Bob")), sampleDf((3, 
"Carol")))
+      .zipWithIndex.map { case (p, i) => entryName(i) -> encodeFile(p) }
+    withArchiveFile() { archive =>
+      writeArchive(archive, entries)
+      val archiveSchema = 
spark.read.options(readOptions).option("inferSchema", "true")
+        .format(format).load(archive.getCanonicalPath).schema
+      withTempDir { dir =>
+        entries.foreach { case (n, b) => Files.write(new File(dir, n).toPath, 
b) }
+        val dirSchema = spark.read.options(readOptions).option("inferSchema", 
"true")
+          .format(format).load(dir.getCanonicalPath).schema
+        assert(archiveSchema == dirSchema,
+          s"inference parity broken; archive=$archiveSchema dir=$dirSchema")
+      }
+    }
+  }
+
+  test("CSV: all archive formats infer the same schema") {
+    val entries = Seq(sampleDf((1, "Alice"), (2, "Bob")), sampleDf((3, 
"Carol")))
+      .zipWithIndex.map { case (p, i) => entryName(i) -> encodeFile(p) }
+    val schemas = archiveExtensions.map { ext =>
+      withArchiveFile(ext) { archive =>
+        writeArchive(archive, entries)
+        spark.read.options(readOptions).option("inferSchema", "true")
+          .format(format).load(archive.getCanonicalPath).schema
+      }
+    }
+    assert(schemas.distinct.size == 1,
+      s"archive formats inferred different schemas: 
${archiveExtensions.zip(schemas)}")
+  }
+
+  /** CSV bytes for `rows`, prefixed with a `cols` header line when [[header]] 
is set. */
+  private def csvEntry(cols: String, rows: String*): Array[Byte] =
+    csvBytes((if (header) cols +: rows else rows).mkString("", "\n", "\n"))
+
+  test("CSV: inference skips a corrupt archive among good ones 
(ignoreCorruptFiles)") {
+    withTempDir { dir =>
+      val good = sampleDf((1, "Alice"), (2, "Bob"))
+      writeArchive(new File(dir, s"good.${archiveExtensions.head}"),
+        Seq(entryName(0) -> encodeFile(good)))
+      writeCorruptArchive(new File(dir, s"bad.$corruptArchiveExtension"))
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+        val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+          .format(format).load(dir.getCanonicalPath).schema
+        withTempDir { onlyGood =>
+          Files.write(new File(onlyGood, entryName(0)).toPath, 
encodeFile(good))
+          val expected = spark.read.options(readOptions).option("inferSchema", 
"true")
+            .format(format).load(onlyGood.getCanonicalPath).schema
+          assert(schema == expected,
+            s"corrupt archive not skipped during inference; got $schema, want 
$expected")
+        }
+      }
+    }
+  }
+
+  test("CSV: inference widens a column's type across archive entries") {
+    withArchiveFile() { archive =>
+      writeArchive(archive, Seq(
+        entryName(0) -> csvEntry("c", "1", "2"),
+        entryName(1) -> csvEntry("c", "x")))
+      val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+        .format(format).load(archive.getCanonicalPath).schema
+      assert(schema.length == 1 && schema.head.dataType == StringType,
+        s"expected the column widened to string across entries, got $schema")
+    }
+  }
+
+  test("CSV: inference merges archive entries with loose files in the same 
directory") {
+    withTempDir { dir =>
+      val inArchive = sampleDf((1, "Alice"), (2, "Bob"))
+      val loose = sampleDf((3, "Carol"))
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"),
+        Seq(entryName(0) -> encodeFile(inArchive)))
+      Files.write(new File(dir, s"loose.$fileExtension").toPath, 
encodeFile(loose))
+      val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+        .format(format).load(dir.getCanonicalPath).schema
+      withTempDir { looseDir =>
+        Files.write(new File(looseDir, entryName(0)).toPath, 
encodeFile(inArchive))
+        Files.write(new File(looseDir, s"loose.$fileExtension").toPath, 
encodeFile(loose))
+        val expected = spark.read.options(readOptions).option("inferSchema", 
"true")
+          .format(format).load(looseDir.getCanonicalPath).schema
+        assert(schema == expected,
+          s"mixed archive+loose inference diverged from directory; got 
$schema, want $expected")
+      }
+    }
+  }
+
+  test("CSV: a column empty in the archive but typed in a loose file is not 
collapsed to string") {
+    // One inference pass over all inputs keeps the empty column NullType 
until the end, so it
+    // widens with the loose file's Int. Merging two already-finished schemas 
would have collapsed
+    // the archive side to String first and yielded String here.
+    withTempDir { dir =>
+      writeArchive(new File(dir, s"data.${archiveExtensions.head}"),
+        Seq(entryName(0) -> csvEntry("a,b", "1,", "2,")))
+      Files.write(new File(dir, s"loose.$fileExtension").toPath, 
csvEntry("a,b", "3,4"))
+      val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+        .format(format).load(dir.getCanonicalPath).schema
+      assert(schema.length == 2 && schema(1).dataType != StringType,
+        s"empty-in-archive column should widen with the loose Int, not 
collapse to String: $schema")
+    }
+  }
+
+  test("CSV: archive inference fixes the column count from the first entry's 
header") {
+    // The first entry has two columns, the second three; one inference pass 
keys on the first
+    // header, so the extra column in the later entry is dropped -- the same 
first-header-width
+    // model a single-pass directory read uses.
+    withArchiveFile() { archive =>
+      writeArchive(archive, Seq(
+        entryName(0) -> csvEntry("a,b", "1,2"),
+        entryName(1) -> csvEntry("a,b,c", "3,4,5")))
+      val schema = spark.read.options(readOptions).option("inferSchema", 
"true")
+        .format(format).load(archive.getCanonicalPath).schema
+      assert(schema.length == 2,
+        s"expected 2 columns fixed by the first entry's header, got $schema")
+    }
+  }
+
+  test("CSV: inference uses the same record model as the scan (quoted embedded 
newline)") {
+    // In default (non-multiLine) mode the scan reads line by line, so a 
quoted field containing a
+    // newline is split across rows; inference must tokenize the archived 
entry the same way, so it
+    // infers the same schema as that entry read as a loose file (rather than 
parsing the entry as
+    // one continuous stream and disagreeing with the read).
+    val entry = csvEntry("a,b", "\"x\ny\",2")
+    withArchiveFile() { archive =>
+      writeArchive(archive, Seq(entryName(0) -> entry))
+      val archiveSchema = 
spark.read.options(readOptions).option("inferSchema", "true")
+        .format(format).load(archive.getCanonicalPath).schema
+      withTempDir { dir =>
+        Files.write(new File(dir, entryName(0)).toPath, entry)
+        val dirSchema = spark.read.options(readOptions).option("inferSchema", 
"true")
+          .format(format).load(dir.getCanonicalPath).schema
+        assert(archiveSchema == dirSchema,
+          s"archive inference diverged from the line-based read; 
archive=$archiveSchema dir=$dirSchema")
+      }
+    }
+  }
+
+  test("CSV: the DSv2 path refuses to infer a schema for an archive 
(UNABLE_TO_INFER_SCHEMA)") {
+    // Archive scanning is wired into the V1 file source only, so the DSv2 
reader cannot read archives.
+    // On the V2 path inference must keep returning None for an archive input 
-- raising
+    // UNABLE_TO_INFER_SCHEMA -- rather than inferring a schema the V2 scan 
would then mis-read as raw
+    // archive bytes. Forcing csv off the V1 source list routes the read 
through CSVTable.

Review Comment:
   Rewrap: "a schema the V2 scan would then mis-read as raw archive bytes" 
reads as if the scan mis-reads the schema, and lines 206/208 exceed the 
100-char scalastyle limit. Lines 200 and 218 are over the limit too and worth 
rewrapping in the same pass.
   ```suggestion
       // Archive scanning is wired into the V1 file source only, so the DSv2 
reader cannot read
       // archives. On the V2 path inference must keep returning None for an 
archive input -- raising
       // UNABLE_TO_INFER_SCHEMA -- rather than inferring a schema and letting 
the V2 scan parse the
       // raw archive bytes as CSV. Forcing csv off the V1 source list routes 
the read through
       // CSVTable.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to