cloud-fan commented on code in PR #56193:
URL: https://github.com/apache/spark/pull/56193#discussion_r3375345452


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReaderSuite.scala:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.{ByteArrayOutputStream, Closeable, File, FileOutputStream, 
InputStream, OutputStream}
+import java.nio.charset.StandardCharsets
+import java.util.Properties
+import java.util.zip.GZIPOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, 
TarArchiveOutputStream}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.{SparkFunSuite, TaskContext, TaskContextImpl}
+
+/**
+ * Unit tests for the streaming [[ArchiveReader]] core: `isArchivePath` 
dispatch and `readEntries`
+ * (entry ordering, gzip handling, dir/dotfile skipping, lazy advance, the 
non-closing entry
+ * stream, and cleanup). Nothing here touches local disk -- entries are 
consumed as streams.
+ */
+class ArchiveReaderSuite extends SparkFunSuite {
+
+  private case class Entry(name: String, data: Array[Byte], isDir: Boolean = 
false)
+
+  private def writeTar(file: File, entries: Seq[Entry]): Unit =
+    writeTarTo(new FileOutputStream(file), entries)
+
+  /** Write a gzipped tar, used to verify the `.tar.gz` / `.tgz` archive 
paths. */
+  private def writeTarGz(file: File, entries: Seq[Entry]): Unit =
+    writeTarTo(new GZIPOutputStream(new FileOutputStream(file)), entries)
+
+  private def writeTarTo(rawOut: OutputStream, entries: Seq[Entry]): Unit = {
+    val out = new TarArchiveOutputStream(rawOut)
+    try {
+      entries.foreach { e =>
+        // TarArchiveEntry treats a trailing slash in the name as a directory 
marker.
+        val rawName = if (e.isDir && !e.name.endsWith("/")) e.name + "/" else 
e.name
+        val tarEntry = new TarArchiveEntry(rawName)
+        if (!e.isDir) tarEntry.setSize(e.data.length.toLong)
+        out.putArchiveEntry(tarEntry)
+        if (!e.isDir) out.write(e.data)
+        out.closeArchiveEntry()
+      }
+      out.finish()
+    } finally out.close()
+  }
+
+  private def textEntry(name: String, body: String): Entry =
+    Entry(name, body.getBytes(StandardCharsets.UTF_8))
+
+  private def readAll(in: InputStream): Array[Byte] = {
+    val out = new ByteArrayOutputStream()
+    val buf = new Array[Byte](4096)
+    var n = in.read(buf)
+    while (n >= 0) {
+      out.write(buf, 0, n)
+      n = in.read(buf)
+    }
+    out.toByteArray
+  }
+
+  /** Drains every entry into `(name, decodedText)` pairs through 
`ArchiveReader.readEntries`. */
+  private def collect(file: File): Seq[(String, String)] =
+    ArchiveReader(new Path(file.toURI)).readEntries(new Configuration()) { 
(name, in) =>
+      Iterator.single((name, new String(readAll(in), StandardCharsets.UTF_8)))
+    }.toList
+
+  // ----- isArchivePath ------------------------------------------------------
+
+  test("isArchivePath: positive cases") {
+    Seq(
+      "foo.tar", "FOO.TAR", "/a/b/c/x.tar", "weird.TaR",
+      "foo.tar.gz", "FOO.TAR.GZ", "mixed.Tar.Gz", "/a/b/c/x.tar.gz",
+      "foo.tgz", "FOO.TGZ", "/a/b/c/x.tgz"
+    ).foreach { p =>
+      assert(ArchiveReader.isArchivePath(new Path(p)), s"expected archive 
match for $p")
+    }
+  }
+
+  test("isArchivePath: negative cases") {
+    Seq("foo.csv", "foo.gz", "foo", "dir/", "foo.tarball", "data.zip",
+        "foo.tar.bz2", "foo.targz").foreach { p =>
+      assert(!ArchiveReader.isArchivePath(new Path(p)), s"expected non-match 
for $p")
+    }
+  }
+
+  // ----- readEntries --------------------------------------------------------
+
+  test("readEntries: empty tar yields empty iterator") {
+    withTempDir { dir =>
+      val tar = new File(dir, "empty.tar")
+      writeTar(tar, Seq.empty)
+      assert(collect(tar).isEmpty)
+    }
+  }
+
+  test("readEntries: single entry exposes its name and bytes") {
+    withTempDir { dir =>
+      val tar = new File(dir, "single.tar")
+      writeTar(tar, Seq(textEntry("only.csv", "hello\n")))
+      assert(collect(tar) == Seq("only.csv" -> "hello\n"))
+    }
+  }
+
+  test("readEntries: multiple entries chained in tar order") {
+    withTempDir { dir =>
+      val tar = new File(dir, "multi.tar")
+      writeTar(tar, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"), 
textEntry("c.csv", "c")))
+      assert(collect(tar) == Seq("a.csv" -> "a", "b.csv" -> "b", "c.csv" -> 
"c"))
+    }
+  }
+
+  test("readEntries: gzipped tar (.tar.gz) via Hadoop codec factory") {
+    withTempDir { dir =>
+      val tarGz = new File(dir, "data.tar.gz")
+      writeTarGz(tarGz, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b")))
+      assert(collect(tarGz) == Seq("a.csv" -> "a", "b.csv" -> "b"))
+    }
+  }
+
+  test("readEntries: gzipped tar (.tgz) via explicit GZIPInputStream wrap") {
+    withTempDir { dir =>
+      val tgz = new File(dir, "data.tgz")
+      writeTarGz(tgz, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b")))
+      assert(collect(tgz) == Seq("a.csv" -> "a", "b.csv" -> "b"))
+    }
+  }
+
+  test("readEntries: directory entries are skipped") {
+    withTempDir { dir =>
+      val tar = new File(dir, "dirs.tar")
+      writeTar(tar, Seq(
+        Entry("subdir", Array.emptyByteArray, isDir = true),
+        textEntry("subdir/data.csv", "x")))
+      assert(collect(tar) == Seq("subdir/data.csv" -> "x"))
+    }
+  }
+
+  test("readEntries: dotfile entries (e.g. macOS ._foo) are skipped") {
+    withTempDir { dir =>
+      val tar = new File(dir, "dots.tar")
+      writeTar(tar, Seq(
+        textEntry("._real.csv", "junk"),          // macOS AppleDouble sidecar
+        textEntry(".hidden", "ignored"),          // bare dotfile
+        textEntry("real.csv", "kept"),

Review Comment:
   The skip fix from the last round (`basename.startsWith(".")` -> 
`HadoopFSUtils.shouldFilterOutPathName(basename)`) added skipping of 
`_`-prefixed markers (`_SUCCESS`, `_committed_*`), but this test only exercises 
`.`-prefixed names, and no other archive test uses a `_`-prefixed entry. A 
regression back to `startsWith(".")` would pass the whole suite. Adding a 
`_SUCCESS` entry locks the new behavior in (the assertion stays `Seq("real.csv" 
-> "kept")` since `_SUCCESS` is skipped):
   ```suggestion
           textEntry("_SUCCESS", "marker"),          // _-prefixed marker (now 
also skipped)
           textEntry("real.csv", "kept"),
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.{Closeable, InputStream}
+import java.util.Locale
+import java.util.zip.GZIPInputStream
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, 
TarArchiveInputStream}
+import org.apache.commons.io.ByteOrderMark
+import org.apache.commons.io.input.{BOMInputStream, CloseShieldInputStream}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.util.LineReader
+
+import org.apache.spark.TaskContext
+import org.apache.spark.util.HadoopFSUtils
+
+/**
+ * Streaming reader for a single archive file. The archive is opened once and 
decompressed/unpacked
+ * as a stream -- entries are never materialized to local disk. 
[[readEntries]] hands each entry's
+ * bytes to a caller-supplied parse function as a bounded [[InputStream]] and 
concatenates the
+ * per-entry results into a single iterator, advancing to the next entry only 
once the current one
+ * is fully consumed. At most one entry is in flight at a time, so memory 
stays bounded regardless
+ * of archive size.
+ *
+ * This is format-agnostic: a data source whose per-file reader can consume an 
`InputStream` wires
+ * up archive support by calling [[readEntries]] from its read/inference paths 
and supplying a
+ * `parseEntry` that turns one entry stream into rows (or tokens). Formats 
that need random access
+ * within a file (e.g. Parquet/ORC footers) cannot use this streaming path.
+ *
+ * A concrete subclass implements [[readEntries]] for a specific archive 
format. Obtain the reader
+ * for a path via `ArchiveReader(path)`, which selects the implementation by 
file extension; new
+ * archive formats are added by writing another subclass rather than modifying 
existing ones.
+ */
+abstract class ArchiveReader(path: Path) {
+
+  /**
+   * Streams the archive entry by entry, applying `parseEntry` to each 
non-skipped entry's
+   * `(name, stream)` and concatenating the results into a single iterator. 
The next entry is opened
+   * only once the current entry's iterator is exhausted, so nothing is 
buffered to disk and at most
+   * one entry's bytes are read at a time. The archive stream is closed when 
the returned iterator
+   * is exhausted, when [[Closeable.close]] is called on it, and (defensively) 
on task completion.
+   */
+  def readEntries[T](
+      conf: Configuration)(
+      parseEntry: (String, InputStream) => Iterator[T]): Iterator[T]
+}
+
+object ArchiveReader {
+
+  /**
+   * Whether `path` names an archive this reader can stream. Dispatched purely 
on the file
+   * extension -- `.tar`, `.tar.gz`, or `.tgz` -- since the bytes are not 
inspected here.
+   */
+  def isArchivePath(path: Path): Boolean = {
+    val name = path.getName.toLowerCase(Locale.ROOT)
+    name.endsWith(".tar") || name.endsWith(".tar.gz") || name.endsWith(".tgz")
+  }
+
+  /**
+   * Returns the [[ArchiveReader]] implementation for `path`, selected by its 
file extension. Only
+   * paths for which [[isArchivePath]] is true are supported; new archive 
formats add a case here.
+   */
+  def apply(path: Path): ArchiveReader = new TarArchiveReader(path)
+
+  /**
+   * Splits one already-decompressed archive entry's bytes into lines. The 
reusable, format-agnostic
+   * line source for archive entries; the entry stream is not closed here (the 
reader owns the
+   * underlying stream).
+   *
+   * @param in bytes of one archive entry.
+   * @param lineSeparatorInRead the explicit read line separator, or `None` to 
detect line breaks.
+   * @return an iterator over the entry's lines as [[Text]], without the 
trailing separator.
+   */
+  def lineIterator(in: InputStream, lineSeparatorInRead: Option[Array[Byte]]): 
Iterator[Text] = {
+    // A leading byte-order mark is stripped (LineReader does not strip it on 
its own) so the lines
+    // match the non-archive read path.
+    val bomInputStream = BOMInputStream.builder()
+      .setInputStream(in)
+      .setByteOrderMarks(
+        ByteOrderMark.UTF_8,
+        ByteOrderMark.UTF_16LE,
+        ByteOrderMark.UTF_16BE,
+        ByteOrderMark.UTF_32LE,
+        ByteOrderMark.UTF_32BE)
+      .setInclude(false)
+      .get()
+    val reader = lineSeparatorInRead match {
+      case Some(sep) => new LineReader(bomInputStream, sep)
+      case _ => new LineReader(bomInputStream)
+    }
+    new Iterator[Text] {
+      private val text = new Text()
+      private var finished = false
+      private var hasValue = false
+
+      override def hasNext: Boolean = {
+        if (!finished && !hasValue) {
+          finished = reader.readLine(text) == 0
+          hasValue = !finished
+        }
+        !finished
+      }
+
+      override def next(): Text = {
+        if (!hasNext) throw new NoSuchElementException
+        hasValue = false
+        text
+      }
+    }
+  }
+}
+
+/**
+ * [[ArchiveReader]] for tar archives: plain `.tar`, gzipped `.tar.gz`, and 
`.tgz`.
+ *
+ * Gzip handling: Hadoop's `CompressionCodecFactory` matches the trailing 
`.gz` extension and
+ * auto-decompresses `.tar.gz` via `CodecStreams`, so we just wrap that stream 
in
+ * `TarArchiveInputStream`. `.tgz` is not a registered Hadoop codec extension, 
so the gzip layer is
+ * unwrapped explicitly here.
+ */
+class TarArchiveReader(path: Path) extends ArchiveReader(path) {
+
+  // Paths Hadoop's codec factory won't auto-decompress: we apply the gzip 
layer here.
+  private def needsExplicitGunzip: Boolean =
+    path.getName.toLowerCase(Locale.ROOT).endsWith(".tgz")
+
+  /**
+   * Whether an entry is not a real data file and must be skipped: a 
directory, or a name Spark's
+   * own file listing would filter out. Reusing 
[[HadoopFSUtils.shouldFilterOutPathName]] (the
+   * `InMemoryFileIndex` filter) keeps archive reads in parity with reading 
the same entries as
+   * loose files: `.`-prefixed sidecars (macOS `._x`, `.DS_Store`) and 
`_`-prefixed markers
+   * (`_SUCCESS`, `_committed_*`) are skipped, while data files are kept.
+   */
+  private def shouldSkipEntry(entry: TarArchiveEntry): Boolean = {
+    if (entry.isDirectory) return true
+    val name = entry.getName
+    val basename = name.substring(name.lastIndexOf('/') + 1)
+    HadoopFSUtils.shouldFilterOutPathName(basename)

Review Comment:
   Minor: the doc above says reusing `shouldFilterOutPathName` "keeps archive 
reads in parity with reading the same entries as loose files," but the filter 
is applied to the basename only, whereas `InMemoryFileIndex` also skips entire 
`_`/`.`-prefixed *directories* (it doesn't recurse into them). So an entry like 
`_temporary/part-0.csv` (basename `part-0.csv`) is read as data here but would 
be skipped as a loose file — e.g. a leftover `_temporary` from a failed write 
tarred up with the data. Narrow edge; either apply the filter per path 
component or soften the comment to say it filters entry *basenames* the way 
loose-file listing filters names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to