cloud-fan commented on code in PR #56193:
URL: https://github.com/apache/spark/pull/56193#discussion_r3360232824


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -44,6 +44,11 @@ case class CSVFileFormat() extends TextBasedFileFormat with 
DataSourceRegister {
       options: Map[String, String],
       path: Path): Boolean = {
     val parsedOptions = getCsvOptions(sparkSession, options)
+    // A tar archive is decompressed/unpacked as a sequential stream, so it 
must be read as a
+    // single split rather than carved into byte ranges.
+    if (parsedOptions.archiveFormatEnabled && 
ArchiveReader.isArchivePath(path)) {

Review Comment:
   On reusability for later formats (per the offline discussion — CSV-only now 
is fine): the reusable core is in good shape. 
`ArchiveReader`/`TarArchiveReader`/`lineIterator` live in 
`execution.datasources` and are format-agnostic, so a future 
`JsonDataSource.readArchive` can call `ArchiveReader(path).readEntries(conf) { 
(name, in) => ... }` directly, and `archiveFormatEnabled` already sits on the 
shared `FileSourceOptions`.
   
   The one piece that isn't general is this per-`FileFormat` gating — the 
`isSplitable` archive guard here plus the `buildReader` archive-vs-normal 
branch. `JsonFileFormat`/`TextFileFormat` have structurally identical 
`isSplitable`/`buildReader` today, so they'd copy these verbatim. Optionally 
lift the gating into a shared helper/trait on `TextBasedFileFormat` (e.g. 
`ArchiveReadableFileFormat`) so a new format supplies only its per-entry 
parser. Minor; the bigger reuse gap is schema inference (see the comment at the 
archive branch in `buildReader`).



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -119,24 +124,26 @@ case class CSVFileFormat() extends TextBasedFileFormat 
with DataSourceRegister {
         dataSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
       val actualRequiredSchema = StructType(
         requiredSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
-      val parser = new UnivocityParser(
-        actualDataSchema,
-        actualRequiredSchema,
-        parsedOptions,
-        actualFilters)
       // Use column pruning when specified by Catalyst, except when one or 
more columns have
       // existence default value(s), since in that case we instruct the CSV 
parser to disable column
       // pruning and instead read each entire row in order to correctly assign 
the default value(s).
       val schema = if (isColumnPruningEnabled) actualRequiredSchema else 
actualDataSchema
-      val isStartOfFile = file.start == 0
-      val headerChecker = new CSVHeaderChecker(
-        schema, parsedOptions, source = s"CSV file: ${file.urlEncodedPath}", 
isStartOfFile)
-      CSVDataSource(parsedOptions).readFile(
-        conf,
-        file,
-        parser,
-        headerChecker,
-        requiredSchema)
+
+      def newParser(): UnivocityParser =
+        new UnivocityParser(actualDataSchema, actualRequiredSchema, 
parsedOptions, actualFilters)
+      def getHeaderChecker(isStartOfFile: Boolean, source: String): 
CSVHeaderChecker =
+        new CSVHeaderChecker(schema, parsedOptions, source, isStartOfFile)
+
+      // A tar archive (always a single split, see `isSplitable`) is streamed 
entry by entry when
+      // archive reads are enabled; otherwise the file is parsed directly.
+      if (parsedOptions.archiveFormatEnabled && 
ArchiveReader.isArchivePath(file.toPath)) {
+        CSVDataSource(parsedOptions).readArchive(

Review Comment:
   **Schema inference is not archive-aware — the key framework-completeness 
gap.** `inferSchema` → `CSVDataSource.inferSchema` → `infer` reads raw file 
bytes (`TextFileFormat` for non-multiline, `BinaryFileRDD` for multiline); none 
go through `ArchiveReader`. So with the config enabled and no user-supplied 
schema (`spark.read.csv("data.tar")`), inference reads tar headers/bytes as CSV 
and yields a garbage schema rather than erroring. Every test passes 
`.schema(...)` explicitly, so this path is untested, and the description 
doesn't state a schema is required.
   
   This matters most for reuse: `JsonDataSource`/`TextFileFormat` infer through 
the *same* `createBaseDataset`/`BinaryFileRDD` raw-bytes path, so leaving it 
unsolved means every future format re-hits (or re-fixes) the same bug. Worth 
solving once in the reusable layer — a shared "infer from the first archive 
entry via `ArchiveReader`" path, or at minimum a shared clear "schema required 
for archive reads" error — plus a test and a description note.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala:
##########
@@ -144,6 +184,39 @@ object TextInputCSVDataSource extends CSVDataSource {
     UnivocityParser.parseIterator(lines, parser, headerChecker, requiredSchema)
   }
 
+  override def readArchive(
+      conf: Configuration,
+      file: PartitionedFile,
+      getParser: () => UnivocityParser,
+      getHeaderChecker: (Boolean, String) => CSVHeaderChecker,
+      requiredSchema: StructType): Iterator[InternalRow] =
+    // Stream each tar entry through the line-based parser, treating the entry 
exactly like a
+    // standalone CSV file (a fresh parser/header checker is built per entry).
+    streamArchiveEntries(conf, file, getParser, getHeaderChecker) { (parser, 
headerChecker, in) =>
+      UnivocityParser.parseIterator(
+        entryLines(in, parser.options), parser, headerChecker, requiredSchema)
+    }
+
+  /**
+   * Decodes one archive entry's bytes into the same CSV line strings the 
non-archive [[readFile]]
+   * path feeds to the parser: [[ArchiveReader.lineIterator]] splits the entry 
into lines (honoring
+   * a custom line separator), each line is decoded with the configured 
charset, and the separator
+   * is re-appended so `UnivocityParser` does not raise EOF on the final line.
+   *
+   * @param in bytes of one already-decompressed archive entry; not closed 
here (the archive owns
+   *           the underlying stream).
+   * @param options CSV options supplying the read line separator and charset.
+   * @return an iterator over the entry's lines, each terminated with the line 
separator.
+   */
+  private def entryLines(in: InputStream, options: CSVOptions): 
Iterator[String] = {
+    val newline = options.lineSeparatorInRead.getOrElse(
+      Array(options.asParserSettings.getFormat.getNormalizedNewline.toByte))
+    ArchiveReader.lineIterator(in, options.lineSeparatorInRead).map { line =>
+      line.append(newline, 0, newline.length)

Review Comment:
   Nit: the comment says the trailing newline is appended "so `UnivocityParser` 
does not raise EOF on the final line," but the non-archive `readFile` path 
feeds lines to the *same* `parseIterator` (via `HadoopFileLinesReader`) 
**without** re-appending a terminator. So either the EOF claim is inaccurate 
(and this append is unnecessary), or there's a subtlety worth spelling out. The 
single-byte `getNormalizedNewline` append also diverges from `readFile` for 
non-UTF-8 multi-byte charsets. Could you double-check — if the append isn't 
needed, dropping it keeps the archive path identical to `readFile`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala:
##########
@@ -83,6 +83,46 @@ abstract class CSVDataSource extends Serializable {
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
       parsedOptions: CSVOptions): StructType
+
+  /**
+   * Streams a tar archive (`.tar`/`.tar.gz`/`.tgz`) entry by entry through 
the CSV parser without
+   * unpacking it to disk. The whole archive is a single split (see 
`CSVFileFormat.isSplitable`); a
+   * fresh header checker and parser are built per entry so each entry is 
parsed exactly like a
+   * standalone CSV file -- its header, if any, validated and dropped 
independently. The
+   * mode-specific implementation turns one entry into rows via `parseStream` 
/ `parseIterator`.
+   *
+   * @param getParser builds a fresh [[UnivocityParser]].
+   * @param getHeaderChecker builds a fresh [[CSVHeaderChecker]] for 
`(isStartOfFile, source)`.
+   */
+  def readArchive(
+      conf: Configuration,
+      file: PartitionedFile,
+      getParser: () => UnivocityParser,
+      getHeaderChecker: (Boolean, String) => CSVHeaderChecker,
+      requiredSchema: StructType): Iterator[InternalRow]
+
+  /**
+   * Shared driver used by the [[readArchive]] implementations: streams each 
non-skipped entry's
+   * `(parser, headerChecker, stream)` -- a fresh parser/header checker per 
entry -- through
+   * `parseEntry`. The header checker `source` (`CSV archive entry: 
<archive>!/<entryName>`) names
+   * the entry in error messages.
+   */
+  protected def streamArchiveEntries(
+      conf: Configuration,
+      file: PartitionedFile,
+      getParser: () => UnivocityParser,
+      getHeaderChecker: (Boolean, String) => CSVHeaderChecker)(
+      parseEntry: (UnivocityParser, CSVHeaderChecker, InputStream) => 
Iterator[InternalRow])
+    : Iterator[InternalRow] = {
+    ArchiveReader(file.toPath).readEntries(conf) { (entryName, in) =>

Review Comment:
   `ignoreCorruptFiles` ends up archive-granular here. An archive is a single 
non-splittable `PartitionedFile`, so `FileScanRDD`'s per-file corrupt handling 
skips the **whole** archive on any throw (corrupt gzip/tar, an IO error 
mid-entry, or a FAILFAST malformed record), whereas a directory of loose files 
skips only the bad file and keeps the rest. The PR claims directory parity, but 
the corrupt test only covers a whole-corrupt archive — not a bad entry among 
good ones. Worth a test that asserts the actual behavior and a one-line note 
that corrupt handling is archive-granular.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.{Closeable, InputStream}
+import java.util.Locale
+import java.util.zip.GZIPInputStream
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, 
TarArchiveInputStream}
+import org.apache.commons.io.ByteOrderMark
+import org.apache.commons.io.input.{BOMInputStream, CloseShieldInputStream}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.util.LineReader
+
+import org.apache.spark.TaskContext
+
+/**
+ * Streaming reader for a single archive file. The archive is opened once and 
decompressed/unpacked
+ * as a stream -- entries are never materialized to local disk. 
[[readEntries]] hands each entry's
+ * bytes to a caller-supplied parse function as a bounded [[InputStream]] and 
concatenates the
+ * per-entry results into a single iterator, advancing to the next entry only 
once the current one
+ * is fully consumed. At most one entry is in flight at a time, so memory 
stays bounded regardless
+ * of archive size.
+ *
+ * This is format-agnostic: a data source whose per-file reader can consume an 
`InputStream` wires
+ * up archive support by calling [[readEntries]] from its read/inference paths 
and supplying a
+ * `parseEntry` that turns one entry stream into rows (or tokens). Formats 
that need random access
+ * within a file (e.g. Parquet/ORC footers) cannot use this streaming path.
+ *
+ * A concrete subclass implements [[readEntries]] for a specific archive 
format. Obtain the reader
+ * for a path via `ArchiveReader(path)`, which selects the implementation by 
file extension; new
+ * archive formats are added by writing another subclass rather than modifying 
existing ones.
+ */
+abstract class ArchiveReader(path: Path) {
+
+  /**
+   * Streams the archive entry by entry, applying `parseEntry` to each 
non-skipped entry's
+   * `(name, stream)` and concatenating the results into a single iterator. 
The next entry is opened
+   * only once the current entry's iterator is exhausted, so nothing is 
buffered to disk and at most
+   * one entry's bytes are read at a time. The archive stream is closed when 
the returned iterator
+   * is exhausted, when [[Closeable.close]] is called on it, and (defensively) 
on task completion.
+   */
+  def readEntries[T](
+      conf: Configuration)(
+      parseEntry: (String, InputStream) => Iterator[T]): Iterator[T]
+}
+
+object ArchiveReader {
+
+  /**
+   * Whether `path` names an archive this reader can stream. Dispatched purely 
on the file
+   * extension -- `.tar`, `.tar.gz`, or `.tgz` -- since the bytes are not 
inspected here.
+   */
+  def isArchivePath(path: Path): Boolean = {
+    val name = path.getName.toLowerCase(Locale.ROOT)
+    name.endsWith(".tar") || name.endsWith(".tar.gz") || name.endsWith(".tgz")
+  }
+
+  /**
+   * Returns the [[ArchiveReader]] implementation for `path`, selected by its 
file extension. Only
+   * paths for which [[isArchivePath]] is true are supported; new archive 
formats add a case here.
+   */
+  def apply(path: Path): ArchiveReader = new TarArchiveReader(path)
+
+  /**
+   * Splits one already-decompressed archive entry's bytes into lines. The 
reusable, format-agnostic
+   * line source for archive entries; the entry stream is not closed here (the 
reader owns the
+   * underlying stream).
+   *
+   * @param in bytes of one archive entry.
+   * @param lineSeparatorInRead the explicit read line separator, or `None` to 
detect line breaks.
+   * @return an iterator over the entry's lines as [[Text]], without the 
trailing separator.
+   */
+  def lineIterator(in: InputStream, lineSeparatorInRead: Option[Array[Byte]]): 
Iterator[Text] = {
+    // A leading byte-order mark is stripped (LineReader does not strip it on 
its own) so the lines
+    // match the non-archive read path.
+    val bomInputStream = BOMInputStream.builder()
+      .setInputStream(in)
+      .setByteOrderMarks(
+        ByteOrderMark.UTF_8,
+        ByteOrderMark.UTF_16LE,
+        ByteOrderMark.UTF_16BE,
+        ByteOrderMark.UTF_32LE,
+        ByteOrderMark.UTF_32BE)
+      .setInclude(false)
+      .get()
+    val reader = lineSeparatorInRead match {
+      case Some(sep) => new LineReader(bomInputStream, sep)
+      case _ => new LineReader(bomInputStream)
+    }
+    new Iterator[Text] {
+      private val text = new Text()
+      private var finished = false
+      private var hasValue = false
+
+      override def hasNext: Boolean = {
+        if (!finished && !hasValue) {
+          finished = reader.readLine(text) == 0
+          hasValue = !finished
+        }
+        !finished
+      }
+
+      override def next(): Text = {
+        if (!hasNext) throw new NoSuchElementException
+        hasValue = false
+        text
+      }
+    }
+  }
+}
+
+/**
+ * [[ArchiveReader]] for tar archives: plain `.tar`, gzipped `.tar.gz`, and 
`.tgz`.
+ *
+ * Gzip handling: Hadoop's `CompressionCodecFactory` matches the trailing 
`.gz` extension and
+ * auto-decompresses `.tar.gz` via `CodecStreams`, so we just wrap that stream 
in
+ * `TarArchiveInputStream`. `.tgz` is not a registered Hadoop codec extension, 
so the gzip layer is
+ * unwrapped explicitly here.
+ */
+class TarArchiveReader(path: Path) extends ArchiveReader(path) {
+
+  // Paths Hadoop's codec factory won't auto-decompress: we apply the gzip 
layer here.
+  private def needsExplicitGunzip: Boolean =
+    path.getName.toLowerCase(Locale.ROOT).endsWith(".tgz")
+
+  /**
+   * Whether an entry is not a real data file and must be skipped: a 
directory, or an OS sidecar
+   * dotfile (basename starting with `.`, e.g. macOS `._x` resource forks or 
`.DS_Store`).
+   */
+  private def shouldSkipEntry(entry: TarArchiveEntry): Boolean = {
+    if (entry.isDirectory) return true
+    val name = entry.getName
+    val basename = name.substring(name.lastIndexOf('/') + 1)
+    basename.startsWith(".")

Review Comment:
   `shouldSkipEntry` skips only `.`-prefixed entries, but Spark's loose-file 
listing filters both `.`- and `_`-prefixed names via 
`HadoopFSUtils.shouldFilterOutPathName` (`InMemoryFileIndex`). So an entry 
named `_SUCCESS` / `_committed_*` inside an archive is read as data, breaking 
the "parse like a directory of the same files" parity the suite asserts. Mirror 
the `_` filter (ideally reuse `shouldFilterOutPathName`). Good news: this lives 
in the shared `TarArchiveReader`, so the fix benefits every future format for 
free.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to