cloud-fan commented on code in PR #56193:
URL: https://github.com/apache/spark/pull/56193#discussion_r3360232824
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -44,6 +44,11 @@ case class CSVFileFormat() extends TextBasedFileFormat with
DataSourceRegister {
options: Map[String, String],
path: Path): Boolean = {
val parsedOptions = getCsvOptions(sparkSession, options)
+ // A tar archive is decompressed/unpacked as a sequential stream, so it
must be read as a
+ // single split rather than carved into byte ranges.
+ if (parsedOptions.archiveFormatEnabled &&
ArchiveReader.isArchivePath(path)) {
Review Comment:
On reusability for later formats (per the offline discussion — CSV-only now
is fine): the reusable core is in good shape.
`ArchiveReader`/`TarArchiveReader`/`lineIterator` live in
`execution.datasources` and are format-agnostic, so a future
`JsonDataSource.readArchive` can call `ArchiveReader(path).readEntries(conf) {
(name, in) => ... }` directly, and `archiveFormatEnabled` already sits on the
shared `FileSourceOptions`.
The one piece that isn't general is this per-`FileFormat` gating — the
`isSplitable` archive guard here plus the `buildReader` archive-vs-normal
branch. `JsonFileFormat`/`TextFileFormat` have structurally identical
`isSplitable`/`buildReader` today, so they'd copy these verbatim. Optionally
lift the gating into a shared helper/trait on `TextBasedFileFormat` (e.g.
`ArchiveReadableFileFormat`) so a new format supplies only its per-entry
parser. Minor; the bigger reuse gap is schema inference (see the comment at the
archive branch in `buildReader`).
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -119,24 +124,26 @@ case class CSVFileFormat() extends TextBasedFileFormat
with DataSourceRegister {
dataSchema.filterNot(_.name ==
parsedOptions.columnNameOfCorruptRecord))
val actualRequiredSchema = StructType(
requiredSchema.filterNot(_.name ==
parsedOptions.columnNameOfCorruptRecord))
- val parser = new UnivocityParser(
- actualDataSchema,
- actualRequiredSchema,
- parsedOptions,
- actualFilters)
// Use column pruning when specified by Catalyst, except when one or
more columns have
// existence default value(s), since in that case we instruct the CSV
parser to disable column
// pruning and instead read each entire row in order to correctly assign
the default value(s).
val schema = if (isColumnPruningEnabled) actualRequiredSchema else
actualDataSchema
- val isStartOfFile = file.start == 0
- val headerChecker = new CSVHeaderChecker(
- schema, parsedOptions, source = s"CSV file: ${file.urlEncodedPath}",
isStartOfFile)
- CSVDataSource(parsedOptions).readFile(
- conf,
- file,
- parser,
- headerChecker,
- requiredSchema)
+
+ def newParser(): UnivocityParser =
+ new UnivocityParser(actualDataSchema, actualRequiredSchema,
parsedOptions, actualFilters)
+ def getHeaderChecker(isStartOfFile: Boolean, source: String):
CSVHeaderChecker =
+ new CSVHeaderChecker(schema, parsedOptions, source, isStartOfFile)
+
+ // A tar archive (always a single split, see `isSplitable`) is streamed
entry by entry when
+ // archive reads are enabled; otherwise the file is parsed directly.
+ if (parsedOptions.archiveFormatEnabled &&
ArchiveReader.isArchivePath(file.toPath)) {
+ CSVDataSource(parsedOptions).readArchive(
Review Comment:
**Schema inference is not archive-aware — the key framework-completeness
gap.** `inferSchema` → `CSVDataSource.inferSchema` → `infer` reads raw file
bytes (`TextFileFormat` for non-multiline, `BinaryFileRDD` for multiline); none
go through `ArchiveReader`. So with the config enabled and no user-supplied
schema (`spark.read.csv("data.tar")`), inference reads tar headers/bytes as CSV
and yields a garbage schema rather than erroring. Every test passes
`.schema(...)` explicitly, so this path is untested, and the description
doesn't state a schema is required.
This matters most for reuse: `JsonDataSource`/`TextFileFormat` infer through
the *same* `createBaseDataset`/`BinaryFileRDD` raw-bytes path, so leaving it
unsolved means every future format re-hits (or re-fixes) the same bug. Worth
solving once in the reusable layer — a shared "infer from the first archive
entry via `ArchiveReader`" path, or at minimum a shared clear "schema required
for archive reads" error — plus a test and a description note.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala:
##########
@@ -144,6 +184,39 @@ object TextInputCSVDataSource extends CSVDataSource {
UnivocityParser.parseIterator(lines, parser, headerChecker, requiredSchema)
}
+ override def readArchive(
+ conf: Configuration,
+ file: PartitionedFile,
+ getParser: () => UnivocityParser,
+ getHeaderChecker: (Boolean, String) => CSVHeaderChecker,
+ requiredSchema: StructType): Iterator[InternalRow] =
+ // Stream each tar entry through the line-based parser, treating the entry
exactly like a
+ // standalone CSV file (a fresh parser/header checker is built per entry).
+ streamArchiveEntries(conf, file, getParser, getHeaderChecker) { (parser,
headerChecker, in) =>
+ UnivocityParser.parseIterator(
+ entryLines(in, parser.options), parser, headerChecker, requiredSchema)
+ }
+
+ /**
+ * Decodes one archive entry's bytes into the same CSV line strings the
non-archive [[readFile]]
+ * path feeds to the parser: [[ArchiveReader.lineIterator]] splits the entry
into lines (honoring
+ * a custom line separator), each line is decoded with the configured
charset, and the separator
+ * is re-appended so `UnivocityParser` does not raise EOF on the final line.
+ *
+ * @param in bytes of one already-decompressed archive entry; not closed
here (the archive owns
+ * the underlying stream).
+ * @param options CSV options supplying the read line separator and charset.
+ * @return an iterator over the entry's lines, each terminated with the line
separator.
+ */
+ private def entryLines(in: InputStream, options: CSVOptions):
Iterator[String] = {
+ val newline = options.lineSeparatorInRead.getOrElse(
+ Array(options.asParserSettings.getFormat.getNormalizedNewline.toByte))
+ ArchiveReader.lineIterator(in, options.lineSeparatorInRead).map { line =>
+ line.append(newline, 0, newline.length)
Review Comment:
Nit: the comment says the trailing newline is appended "so `UnivocityParser`
does not raise EOF on the final line," but the non-archive `readFile` path
feeds lines to the *same* `parseIterator` (via `HadoopFileLinesReader`)
**without** re-appending a terminator. So either the EOF claim is inaccurate
(and this append is unnecessary), or there's a subtlety worth spelling out. The
single-byte `getNormalizedNewline` append also diverges from `readFile` for
non-UTF-8 multi-byte charsets. Could you double-check — if the append isn't
needed, dropping it keeps the archive path identical to `readFile`.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala:
##########
@@ -83,6 +83,46 @@ abstract class CSVDataSource extends Serializable {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): StructType
+
+ /**
+ * Streams a tar archive (`.tar`/`.tar.gz`/`.tgz`) entry by entry through
the CSV parser without
+ * unpacking it to disk. The whole archive is a single split (see
`CSVFileFormat.isSplitable`); a
+ * fresh header checker and parser are built per entry so each entry is
parsed exactly like a
+ * standalone CSV file -- its header, if any, validated and dropped
independently. The
+ * mode-specific implementation turns one entry into rows via `parseStream`
/ `parseIterator`.
+ *
+ * @param getParser builds a fresh [[UnivocityParser]].
+ * @param getHeaderChecker builds a fresh [[CSVHeaderChecker]] for
`(isStartOfFile, source)`.
+ */
+ def readArchive(
+ conf: Configuration,
+ file: PartitionedFile,
+ getParser: () => UnivocityParser,
+ getHeaderChecker: (Boolean, String) => CSVHeaderChecker,
+ requiredSchema: StructType): Iterator[InternalRow]
+
+ /**
+ * Shared driver used by the [[readArchive]] implementations: streams each
non-skipped entry's
+ * `(parser, headerChecker, stream)` -- a fresh parser/header checker per
entry -- through
+ * `parseEntry`. The header checker `source` (`CSV archive entry:
<archive>!/<entryName>`) names
+ * the entry in error messages.
+ */
+ protected def streamArchiveEntries(
+ conf: Configuration,
+ file: PartitionedFile,
+ getParser: () => UnivocityParser,
+ getHeaderChecker: (Boolean, String) => CSVHeaderChecker)(
+ parseEntry: (UnivocityParser, CSVHeaderChecker, InputStream) =>
Iterator[InternalRow])
+ : Iterator[InternalRow] = {
+ ArchiveReader(file.toPath).readEntries(conf) { (entryName, in) =>
Review Comment:
`ignoreCorruptFiles` ends up archive-granular here. An archive is a single
non-splittable `PartitionedFile`, so `FileScanRDD`'s per-file corrupt handling
skips the **whole** archive on any throw (corrupt gzip/tar, an IO error
mid-entry, or a FAILFAST malformed record), whereas a directory of loose files
skips only the bad file and keeps the rest. The PR claims directory parity, but
the corrupt test only covers a whole-corrupt archive — not a bad entry among
good ones. Worth a test that asserts the actual behavior and a one-line note
that corrupt handling is archive-granular.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.{Closeable, InputStream}
+import java.util.Locale
+import java.util.zip.GZIPInputStream
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.compress.archivers.tar.{TarArchiveEntry,
TarArchiveInputStream}
+import org.apache.commons.io.ByteOrderMark
+import org.apache.commons.io.input.{BOMInputStream, CloseShieldInputStream}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.util.LineReader
+
+import org.apache.spark.TaskContext
+
+/**
+ * Streaming reader for a single archive file. The archive is opened once and
decompressed/unpacked
+ * as a stream -- entries are never materialized to local disk.
[[readEntries]] hands each entry's
+ * bytes to a caller-supplied parse function as a bounded [[InputStream]] and
concatenates the
+ * per-entry results into a single iterator, advancing to the next entry only
once the current one
+ * is fully consumed. At most one entry is in flight at a time, so memory
stays bounded regardless
+ * of archive size.
+ *
+ * This is format-agnostic: a data source whose per-file reader can consume an
`InputStream` wires
+ * up archive support by calling [[readEntries]] from its read/inference paths
and supplying a
+ * `parseEntry` that turns one entry stream into rows (or tokens). Formats
that need random access
+ * within a file (e.g. Parquet/ORC footers) cannot use this streaming path.
+ *
+ * A concrete subclass implements [[readEntries]] for a specific archive
format. Obtain the reader
+ * for a path via `ArchiveReader(path)`, which selects the implementation by
file extension; new
+ * archive formats are added by writing another subclass rather than modifying
existing ones.
+ */
+abstract class ArchiveReader(path: Path) {
+
+ /**
+ * Streams the archive entry by entry, applying `parseEntry` to each
non-skipped entry's
+ * `(name, stream)` and concatenating the results into a single iterator.
The next entry is opened
+ * only once the current entry's iterator is exhausted, so nothing is
buffered to disk and at most
+ * one entry's bytes are read at a time. The archive stream is closed when
the returned iterator
+ * is exhausted, when [[Closeable.close]] is called on it, and (defensively)
on task completion.
+ */
+ def readEntries[T](
+ conf: Configuration)(
+ parseEntry: (String, InputStream) => Iterator[T]): Iterator[T]
+}
+
+object ArchiveReader {
+
+ /**
+ * Whether `path` names an archive this reader can stream. Dispatched purely
on the file
+ * extension -- `.tar`, `.tar.gz`, or `.tgz` -- since the bytes are not
inspected here.
+ */
+ def isArchivePath(path: Path): Boolean = {
+ val name = path.getName.toLowerCase(Locale.ROOT)
+ name.endsWith(".tar") || name.endsWith(".tar.gz") || name.endsWith(".tgz")
+ }
+
+ /**
+ * Returns the [[ArchiveReader]] implementation for `path`, selected by its
file extension. Only
+ * paths for which [[isArchivePath]] is true are supported; new archive
formats add a case here.
+ */
+ def apply(path: Path): ArchiveReader = new TarArchiveReader(path)
+
+ /**
+ * Splits one already-decompressed archive entry's bytes into lines. The
reusable, format-agnostic
+ * line source for archive entries; the entry stream is not closed here (the
reader owns the
+ * underlying stream).
+ *
+ * @param in bytes of one archive entry.
+ * @param lineSeparatorInRead the explicit read line separator, or `None` to
detect line breaks.
+ * @return an iterator over the entry's lines as [[Text]], without the
trailing separator.
+ */
+ def lineIterator(in: InputStream, lineSeparatorInRead: Option[Array[Byte]]):
Iterator[Text] = {
+ // A leading byte-order mark is stripped (LineReader does not strip it on
its own) so the lines
+ // match the non-archive read path.
+ val bomInputStream = BOMInputStream.builder()
+ .setInputStream(in)
+ .setByteOrderMarks(
+ ByteOrderMark.UTF_8,
+ ByteOrderMark.UTF_16LE,
+ ByteOrderMark.UTF_16BE,
+ ByteOrderMark.UTF_32LE,
+ ByteOrderMark.UTF_32BE)
+ .setInclude(false)
+ .get()
+ val reader = lineSeparatorInRead match {
+ case Some(sep) => new LineReader(bomInputStream, sep)
+ case _ => new LineReader(bomInputStream)
+ }
+ new Iterator[Text] {
+ private val text = new Text()
+ private var finished = false
+ private var hasValue = false
+
+ override def hasNext: Boolean = {
+ if (!finished && !hasValue) {
+ finished = reader.readLine(text) == 0
+ hasValue = !finished
+ }
+ !finished
+ }
+
+ override def next(): Text = {
+ if (!hasNext) throw new NoSuchElementException
+ hasValue = false
+ text
+ }
+ }
+ }
+}
+
+/**
+ * [[ArchiveReader]] for tar archives: plain `.tar`, gzipped `.tar.gz`, and
`.tgz`.
+ *
+ * Gzip handling: Hadoop's `CompressionCodecFactory` matches the trailing
`.gz` extension and
+ * auto-decompresses `.tar.gz` via `CodecStreams`, so we just wrap that stream
in
+ * `TarArchiveInputStream`. `.tgz` is not a registered Hadoop codec extension,
so the gzip layer is
+ * unwrapped explicitly here.
+ */
+class TarArchiveReader(path: Path) extends ArchiveReader(path) {
+
+ // Paths Hadoop's codec factory won't auto-decompress: we apply the gzip
layer here.
+ private def needsExplicitGunzip: Boolean =
+ path.getName.toLowerCase(Locale.ROOT).endsWith(".tgz")
+
+ /**
+ * Whether an entry is not a real data file and must be skipped: a
directory, or an OS sidecar
+ * dotfile (basename starting with `.`, e.g. macOS `._x` resource forks or
`.DS_Store`).
+ */
+ private def shouldSkipEntry(entry: TarArchiveEntry): Boolean = {
+ if (entry.isDirectory) return true
+ val name = entry.getName
+ val basename = name.substring(name.lastIndexOf('/') + 1)
+ basename.startsWith(".")
Review Comment:
`shouldSkipEntry` skips only `.`-prefixed entries, but Spark's loose-file
listing filters both `.`- and `_`-prefixed names via
`HadoopFSUtils.shouldFilterOutPathName` (`InMemoryFileIndex`). So an entry
named `_SUCCESS` / `_committed_*` inside an archive is read as data, breaking
the "parse like a directory of the same files" parity the suite asserts. Mirror
the `_` filter (ideally reuse `shouldFilterOutPathName`). Good news: this lives
in the shared `TarArchiveReader`, so the fix benefits every future format for
free.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]