cloud-fan commented on code in PR #56254:
URL: https://github.com/apache/spark/pull/56254#discussion_r3382857858
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala:
##########
@@ -71,8 +71,24 @@ abstract class CSVDataSource extends Serializable {
parsedOptions.singleVariantColumn match {
case Some(columnName) => Some(StructType(Array(StructField(columnName,
VariantType))))
case None =>
- if (inputPaths.nonEmpty) {
- Some(infer(sparkSession, inputPaths, parsedOptions))
+ // Tar archives are inferred by streaming their entries (never
unpacked to disk); any
+ // non-archive files are inferred normally and the two schemas are
merged.
+ val (archives, nonArchives) =
+ if (parsedOptions.archiveFormatEnabled) {
+ inputPaths.partition(f => ArchiveReader.isArchivePath(f.getPath))
Review Comment:
`inferSchema` is shared with the DSv2 path (`CSVTable.inferSchema` calls it
too), but the archive *scan* wiring is v1-only: `CSVFileFormat` routes archives
to `readArchive`, while `CSVPartitionReaderFactory.buildReader` calls
`readFile` directly and nothing under `datasources/v2/` checks `isArchivePath`.
Before this PR a DSv2 archive read without a schema failed safely with
`UNABLE_TO_INFER_SCHEMA`; with this change inference succeeds and the v2 scan
then parses raw tar bytes as CSV. It's only reachable when `csv` is removed
from `spark.sql.sources.useV1SourceList` (and the explicit-schema variant
already exists in #56193), but it's silent wrong output when hit.
I'd wire the same gate + `readArchive` into the v2 read path (arguably in
#56193); if v2 support is deliberately deferred, the v2 entry should keep the
old `None`-for-archives inference behavior so DSv2 keeps failing loudly.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala:
##########
@@ -83,6 +99,109 @@ abstract class CSVDataSource extends Serializable {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): StructType
+
+ /**
+ * Streams a tar archive (`.tar`/`.tar.gz`/`.tgz`) entry by entry through
the CSV parser without
+ * unpacking it to disk. The whole archive is a single split (see
`CSVFileFormat.isSplitable`); a
+ * fresh header checker and parser are built per entry so each entry is
parsed exactly like a
+ * standalone CSV file -- its header, if any, validated and dropped
independently. The
+ * mode-specific implementation turns one entry into rows via `parseStream`
/ `parseIterator`.
+ *
+ * @param getParser builds a fresh [[UnivocityParser]].
+ * @param getHeaderChecker builds a fresh [[CSVHeaderChecker]] for
`(isStartOfFile, source)`.
+ */
+ def readArchive(
+ conf: Configuration,
+ file: PartitionedFile,
+ getParser: () => UnivocityParser,
+ getHeaderChecker: (Boolean, String) => CSVHeaderChecker,
+ requiredSchema: StructType): Iterator[InternalRow]
+
+ /**
+ * Shared driver used by the [[readArchive]] implementations: streams each
non-skipped entry's
+ * `(parser, headerChecker, stream)` -- a fresh parser/header checker per
entry -- through
+ * `parseEntry`. The header checker `source` (`CSV archive entry:
<archive>!/<entryName>`) names
+ * the entry in error messages.
+ */
+ protected def streamArchiveEntries(
+ conf: Configuration,
+ file: PartitionedFile,
+ getParser: () => UnivocityParser,
+ getHeaderChecker: (Boolean, String) => CSVHeaderChecker)(
+ parseEntry: (UnivocityParser, CSVHeaderChecker, InputStream) =>
Iterator[InternalRow])
+ : Iterator[InternalRow] = {
+ ArchiveReader(file.toPath).readEntries(conf) { (entryName, in) =>
+ val headerChecker =
+ getHeaderChecker(true, s"CSV archive entry:
${file.urlEncodedPath}!/$entryName")
+ val parser = getParser()
+ headerChecker.setHeaderForSingleVariantColumn =
+ CSVDataSource.setHeaderForSingleVariantColumn(conf, file, parser)
+ parseEntry(parser, headerChecker, in)
+ }
+ }
+
+ /**
+ * Infers a CSV schema from tar archives (`.tar`/`.tar.gz`/`.tgz`) by
streaming their entries --
+ * the archive is never unpacked to disk. Each entry is tokenized like a
standalone CSV file (its
+ * header row dropped when `header` is set), and all entries' rows feed a
single
+ * [[CSVInferSchema]] pass keyed on the first entry's header, so the result
matches reading the
+ * entries as separate files. Mirrors [[MultiLineCSVDataSource.infer]] but
iterates tar entries.
+ */
+ private def inferArchives(
+ sparkSession: SparkSession,
+ archives: Seq[FileStatus],
+ parsedOptions: CSVOptions): StructType = {
+ val archiveRdd = MultiLineCSVDataSource.createBaseRdd(sparkSession,
archives, parsedOptions)
+ def tokens(dropHeader: Boolean): RDD[Array[String]] = archiveRdd.flatMap {
stream =>
+ val path = new Path(stream.getPath())
+ try {
+ ArchiveReader(path).readEntries(stream.getConfiguration) { (_, in) =>
+ UnivocityParser.tokenizeStream(
+ in, dropHeader, new CsvParser(parsedOptions.asParserSettings),
parsedOptions.charset)
+ }
+ } catch {
+ // A tar is opened as a single unit, so corrupt/missing handling is
archive-granular --
+ // the whole archive is skipped -- mirroring
`MultiLineCSVDataSource.infer` for loose files.
+ case e: FileNotFoundException if parsedOptions.ignoreMissingFiles =>
+ logWarning(log"Skipped missing archive: ${MDC(PATH,
stream.getPath())}", e)
+ Iterator.empty
+ case e: FileNotFoundException => throw e
+ case e @ (_: RuntimeException | _: IOException) if
parsedOptions.ignoreCorruptFiles =>
+ logWarning(log"Skipped the corrupted archive: ${MDC(PATH,
stream.getPath())}", e)
+ Iterator.empty
+ case NonFatal(e) =>
+ throw QueryExecutionErrors.cannotReadFilesError(
+ e, SparkPath.fromPathString(stream.getPath()).urlEncoded)
+ }
+ }
+ tokens(dropHeader = false).take(1).headOption match {
+ case Some(firstRow) =>
+ val caseSensitive =
sparkSession.sessionState.conf.caseSensitiveAnalysis
+ val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive,
parsedOptions)
+ val sampled = CSVUtils.sample(tokens(dropHeader =
parsedOptions.headerFlag), parsedOptions)
+ SQLExecution.withSQLConfPropagated(sparkSession) {
+ new CSVInferSchema(parsedOptions).infer(sampled, header)
+ }
+ case None =>
+ StructType(Nil)
+ }
+ }
+
+ /**
+ * Positionally merges two already-inferred CSV schemas (the non-archive vs
archive results),
+ * widening per-column types via [[CSVInferSchema.mergeRowTypes]] and
keeping the longer header --
+ * the same first-header, positional model used for a multi-file CSV read.
Review Comment:
This merge diverges from the model it claims. A multi-file directory read
makes ONE `CSVInferSchema` pass: `NullType` survives until the final
`toStructFields`, and both width and names are fixed by the *first* file's
header (`startType` sized to `header.length`, `inferRowType` fills only `min`
length). Merging two finished schemas instead means: (a) a column that is
all-null on one side has already collapsed to `StringType`, and
`compatibleType(String, Int) = String` where a directory read of the same files
infers `Int`; (b) width becomes the max of the two sides rather than the first
header's width; (c) on equal width the non-archive names always win regardless
of input order. The mixed-parity test doesn't catch any of these because both
sides have identical width/names/types.
I'd merge at the pre-`toStructFields` type level: expose the aggregated
`Array[DataType]` + header from each side (a small `CSVInferSchema` variant
that returns the row types before `toStructFields`) and run `toStructFields`
once after `mergeRowTypes` — plus tests with an all-null column on one side and
differing widths. If you'd rather keep the post-hoc merge, the divergences
should be documented and this Scaladoc claim corrected ("keeping the longer
header" is not the first-header model).
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala:
##########
@@ -278,7 +439,7 @@ object MultiLineCSVDataSource extends CSVDataSource with
Logging {
}
}
- private def createBaseRdd(
+ private[csv] def createBaseRdd(
Review Comment:
Rather than widening visibility so the abstract base class can reach into a
concrete subobject's helper, consider moving `createBaseRdd` to the
`CSVDataSource` companion object, which already hosts shared helpers like
`setHeaderForSingleVariantColumn`. That keeps the base-class -> subclass-object
dependency out and the helper genuinely shared.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CSVArchiveReadBase.scala:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.util.Utils
+
+/**
+ * Binds [[ArchiveReadSuiteBase]]'s file-format hooks to CSV. The
header-mode-specific tests live in
+ * the [[CSVHeaderArchiveReadBase]] and [[CSVHeaderlessArchiveReadBase]]
sub-traits, so the shared
+ * archive tests from [[ArchiveReadSuiteBase]] run for both modes.
+ */
+trait CSVArchiveReadBase extends ArchiveReadSuiteBase {
+
+ /** Whether the archived CSV files are written and read with a header row. */
+ protected def header: Boolean
+
+ override protected def format: String = "csv"
+
+ override protected def fileExtension: String = "csv"
+
+ override protected def readOptions: Map[String, String] = Map("header" ->
header.toString)
+
+ override protected def readSchema: String = "id INT, name STRING"
+
+ override protected def encodeFile(
+ df: DataFrame,
+ writeOptions: Map[String, String]): Array[Byte] = {
+ val dir = Utils.createTempDir(namePrefix = "archive-test-encode")
+ try {
+ df.coalesce(1).write.format("csv")
+ .options(Map("header" -> header.toString) ++ writeOptions)
+ .mode("overwrite").save(dir.getCanonicalPath)
+ val parts = dir.listFiles().filter { f =>
+ f.isFile && !f.getName.startsWith("_") && !f.getName.startsWith(".") &&
+ !f.getName.endsWith(".crc")
+ }
+ assert(parts.length == 1,
+ s"expected exactly one data file, got: ${parts.map(_.getName).toList}")
+ Files.readAllBytes(parts.head.toPath)
+ } finally Utils.deleteRecursively(dir)
+ }
+
+ /** Raw CSV bytes, for tests that need precise control over the row layout.
*/
+ protected def csvBytes(s: String): Array[Byte] =
s.getBytes(StandardCharsets.UTF_8)
+
+ test("CSV: archive infers the same schema as a directory of the same files")
{
+ val entries = Seq(sampleDf((1, "Alice"), (2, "Bob")), sampleDf((3,
"Carol")))
+ .zipWithIndex.map { case (p, i) => entryName(i) -> encodeFile(p) }
+ withArchiveFile() { archive =>
+ writeArchive(archive, entries)
+ val archiveSchema =
spark.read.options(readOptions).option("inferSchema", "true")
+ .format(format).load(archive.getCanonicalPath).schema
+ withTempDir { dir =>
+ entries.foreach { case (n, b) => Files.write(new File(dir, n).toPath,
b) }
+ val dirSchema = spark.read.options(readOptions).option("inferSchema",
"true")
+ .format(format).load(dir.getCanonicalPath).schema
+ assert(archiveSchema == dirSchema,
+ s"inference parity broken; archive=$archiveSchema dir=$dirSchema")
+ }
+ }
+ }
+
+ test("CSV: all archive formats infer the same schema") {
+ val entries = Seq(sampleDf((1, "Alice"), (2, "Bob")), sampleDf((3,
"Carol")))
+ .zipWithIndex.map { case (p, i) => entryName(i) -> encodeFile(p) }
+ val schemas = archiveExtensions.map { ext =>
+ val dir = Utils.createTempDir(namePrefix = "archive-infer")
Review Comment:
This hand-rolls the temp-dir management that `withArchiveFile` exists to
centralize, only because it returns `Unit`. Generalizing it to `def
withArchiveFile[T](extension: String = archiveExtensions.head)(f: File => T):
T` lets this test reuse it:
```scala
val schemas = archiveExtensions.map { ext =>
withArchiveFile(ext) { archive =>
writeArchive(archive, entries)
spark.read.options(readOptions).option("inferSchema", "true")
.format(format).load(archive.getCanonicalPath).schema
}
}
```
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala:
##########
@@ -83,6 +99,109 @@ abstract class CSVDataSource extends Serializable {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): StructType
+
+ /**
+ * Streams a tar archive (`.tar`/`.tar.gz`/`.tgz`) entry by entry through
the CSV parser without
+ * unpacking it to disk. The whole archive is a single split (see
`CSVFileFormat.isSplitable`); a
+ * fresh header checker and parser are built per entry so each entry is
parsed exactly like a
+ * standalone CSV file -- its header, if any, validated and dropped
independently. The
+ * mode-specific implementation turns one entry into rows via `parseStream`
/ `parseIterator`.
+ *
+ * @param getParser builds a fresh [[UnivocityParser]].
+ * @param getHeaderChecker builds a fresh [[CSVHeaderChecker]] for
`(isStartOfFile, source)`.
+ */
+ def readArchive(
+ conf: Configuration,
+ file: PartitionedFile,
+ getParser: () => UnivocityParser,
+ getHeaderChecker: (Boolean, String) => CSVHeaderChecker,
+ requiredSchema: StructType): Iterator[InternalRow]
+
+ /**
+ * Shared driver used by the [[readArchive]] implementations: streams each
non-skipped entry's
+ * `(parser, headerChecker, stream)` -- a fresh parser/header checker per
entry -- through
+ * `parseEntry`. The header checker `source` (`CSV archive entry:
<archive>!/<entryName>`) names
+ * the entry in error messages.
+ */
+ protected def streamArchiveEntries(
+ conf: Configuration,
+ file: PartitionedFile,
+ getParser: () => UnivocityParser,
+ getHeaderChecker: (Boolean, String) => CSVHeaderChecker)(
+ parseEntry: (UnivocityParser, CSVHeaderChecker, InputStream) =>
Iterator[InternalRow])
+ : Iterator[InternalRow] = {
+ ArchiveReader(file.toPath).readEntries(conf) { (entryName, in) =>
+ val headerChecker =
+ getHeaderChecker(true, s"CSV archive entry:
${file.urlEncodedPath}!/$entryName")
+ val parser = getParser()
+ headerChecker.setHeaderForSingleVariantColumn =
+ CSVDataSource.setHeaderForSingleVariantColumn(conf, file, parser)
+ parseEntry(parser, headerChecker, in)
+ }
+ }
+
+ /**
+ * Infers a CSV schema from tar archives (`.tar`/`.tar.gz`/`.tgz`) by
streaming their entries --
+ * the archive is never unpacked to disk. Each entry is tokenized like a
standalone CSV file (its
+ * header row dropped when `header` is set), and all entries' rows feed a
single
+ * [[CSVInferSchema]] pass keyed on the first entry's header, so the result
matches reading the
+ * entries as separate files. Mirrors [[MultiLineCSVDataSource.infer]] but
iterates tar entries.
+ */
+ private def inferArchives(
+ sparkSession: SparkSession,
+ archives: Seq[FileStatus],
+ parsedOptions: CSVOptions): StructType = {
+ val archiveRdd = MultiLineCSVDataSource.createBaseRdd(sparkSession,
archives, parsedOptions)
+ def tokens(dropHeader: Boolean): RDD[Array[String]] = archiveRdd.flatMap {
stream =>
+ val path = new Path(stream.getPath())
+ try {
+ ArchiveReader(path).readEntries(stream.getConfiguration) { (_, in) =>
+ UnivocityParser.tokenizeStream(
+ in, dropHeader, new CsvParser(parsedOptions.asParserSettings),
parsedOptions.charset)
+ }
+ } catch {
+ // A tar is opened as a single unit, so corrupt/missing handling is
archive-granular --
Review Comment:
Minor: the mirror is slightly looser than stated — because `tokens()` wraps
*both* the header pass and the sampled pass in this catch, the second pass
honors `ignoreCorruptFiles`/`ignoreMissingFiles` for archives, whereas
`MultiLineCSVDataSource.infer`'s second pass has no try/catch at all. Strictly
more forgiving, so fine to keep — just worth a word here so the comment doesn't
overstate the parity.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala:
##########
@@ -48,7 +48,7 @@ import org.apache.spark.util.Utils
/**
* Common functions for parsing CSV files
*/
-abstract class CSVDataSource extends Serializable {
+abstract class CSVDataSource extends Serializable with Logging {
Review Comment:
Now that the abstract class mixes in `Logging`, the `with Logging` on
`object MultiLineCSVDataSource` (line 361) is redundant and can be dropped.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]