cloud-fan commented on code in PR #56193: URL: https://github.com/apache/spark/pull/56193#discussion_r3375345452
########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReaderSuite.scala: ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{ByteArrayOutputStream, Closeable, File, FileOutputStream, InputStream, OutputStream} +import java.nio.charset.StandardCharsets +import java.util.Properties +import java.util.zip.GZIPOutputStream + +import scala.collection.mutable.ArrayBuffer + +import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveOutputStream} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.{SparkFunSuite, TaskContext, TaskContextImpl} + +/** + * Unit tests for the streaming [[ArchiveReader]] core: `isArchivePath` dispatch and `readEntries` + * (entry ordering, gzip handling, dir/dotfile skipping, lazy advance, the non-closing entry + * stream, and cleanup). Nothing here touches local disk -- entries are consumed as streams. + */ +class ArchiveReaderSuite extends SparkFunSuite { + + private case class Entry(name: String, data: Array[Byte], isDir: Boolean = false) + + private def writeTar(file: File, entries: Seq[Entry]): Unit = + writeTarTo(new FileOutputStream(file), entries) + + /** Write a gzipped tar, used to verify the `.tar.gz` / `.tgz` archive paths. */ + private def writeTarGz(file: File, entries: Seq[Entry]): Unit = + writeTarTo(new GZIPOutputStream(new FileOutputStream(file)), entries) + + private def writeTarTo(rawOut: OutputStream, entries: Seq[Entry]): Unit = { + val out = new TarArchiveOutputStream(rawOut) + try { + entries.foreach { e => + // TarArchiveEntry treats a trailing slash in the name as a directory marker. + val rawName = if (e.isDir && !e.name.endsWith("/")) e.name + "/" else e.name + val tarEntry = new TarArchiveEntry(rawName) + if (!e.isDir) tarEntry.setSize(e.data.length.toLong) + out.putArchiveEntry(tarEntry) + if (!e.isDir) out.write(e.data) + out.closeArchiveEntry() + } + out.finish() + } finally out.close() + } + + private def textEntry(name: String, body: String): Entry = + Entry(name, body.getBytes(StandardCharsets.UTF_8)) + + private def readAll(in: InputStream): Array[Byte] = { + val out = new ByteArrayOutputStream() + val buf = new Array[Byte](4096) + var n = in.read(buf) + while (n >= 0) { + out.write(buf, 0, n) + n = in.read(buf) + } + out.toByteArray + } + + /** Drains every entry into `(name, decodedText)` pairs through `ArchiveReader.readEntries`. */ + private def collect(file: File): Seq[(String, String)] = + ArchiveReader(new Path(file.toURI)).readEntries(new Configuration()) { (name, in) => + Iterator.single((name, new String(readAll(in), StandardCharsets.UTF_8))) + }.toList + + // ----- isArchivePath ------------------------------------------------------ + + test("isArchivePath: positive cases") { + Seq( + "foo.tar", "FOO.TAR", "/a/b/c/x.tar", "weird.TaR", + "foo.tar.gz", "FOO.TAR.GZ", "mixed.Tar.Gz", "/a/b/c/x.tar.gz", + "foo.tgz", "FOO.TGZ", "/a/b/c/x.tgz" + ).foreach { p => + assert(ArchiveReader.isArchivePath(new Path(p)), s"expected archive match for $p") + } + } + + test("isArchivePath: negative cases") { + Seq("foo.csv", "foo.gz", "foo", "dir/", "foo.tarball", "data.zip", + "foo.tar.bz2", "foo.targz").foreach { p => + assert(!ArchiveReader.isArchivePath(new Path(p)), s"expected non-match for $p") + } + } + + // ----- readEntries -------------------------------------------------------- + + test("readEntries: empty tar yields empty iterator") { + withTempDir { dir => + val tar = new File(dir, "empty.tar") + writeTar(tar, Seq.empty) + assert(collect(tar).isEmpty) + } + } + + test("readEntries: single entry exposes its name and bytes") { + withTempDir { dir => + val tar = new File(dir, "single.tar") + writeTar(tar, Seq(textEntry("only.csv", "hello\n"))) + assert(collect(tar) == Seq("only.csv" -> "hello\n")) + } + } + + test("readEntries: multiple entries chained in tar order") { + withTempDir { dir => + val tar = new File(dir, "multi.tar") + writeTar(tar, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"), textEntry("c.csv", "c"))) + assert(collect(tar) == Seq("a.csv" -> "a", "b.csv" -> "b", "c.csv" -> "c")) + } + } + + test("readEntries: gzipped tar (.tar.gz) via Hadoop codec factory") { + withTempDir { dir => + val tarGz = new File(dir, "data.tar.gz") + writeTarGz(tarGz, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"))) + assert(collect(tarGz) == Seq("a.csv" -> "a", "b.csv" -> "b")) + } + } + + test("readEntries: gzipped tar (.tgz) via explicit GZIPInputStream wrap") { + withTempDir { dir => + val tgz = new File(dir, "data.tgz") + writeTarGz(tgz, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"))) + assert(collect(tgz) == Seq("a.csv" -> "a", "b.csv" -> "b")) + } + } + + test("readEntries: directory entries are skipped") { + withTempDir { dir => + val tar = new File(dir, "dirs.tar") + writeTar(tar, Seq( + Entry("subdir", Array.emptyByteArray, isDir = true), + textEntry("subdir/data.csv", "x"))) + assert(collect(tar) == Seq("subdir/data.csv" -> "x")) + } + } + + test("readEntries: dotfile entries (e.g. macOS ._foo) are skipped") { + withTempDir { dir => + val tar = new File(dir, "dots.tar") + writeTar(tar, Seq( + textEntry("._real.csv", "junk"), // macOS AppleDouble sidecar + textEntry(".hidden", "ignored"), // bare dotfile + textEntry("real.csv", "kept"), Review Comment: The skip fix from the last round (`basename.startsWith(".")` -> `HadoopFSUtils.shouldFilterOutPathName(basename)`) added skipping of `_`-prefixed markers (`_SUCCESS`, `_committed_*`), but this test only exercises `.`-prefixed names, and no other archive test uses a `_`-prefixed entry. A regression back to `startsWith(".")` would pass the whole suite. Adding a `_SUCCESS` entry locks the new behavior in (the assertion stays `Seq("real.csv" -> "kept")` since `_SUCCESS` is skipped): ```suggestion textEntry("_SUCCESS", "marker"), // _-prefixed marker (now also skipped) textEntry("real.csv", "kept"), ``` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{Closeable, InputStream} +import java.util.Locale +import java.util.zip.GZIPInputStream + +import scala.util.control.NonFatal + +import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream} +import org.apache.commons.io.ByteOrderMark +import org.apache.commons.io.input.{BOMInputStream, CloseShieldInputStream} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.Text +import org.apache.hadoop.util.LineReader + +import org.apache.spark.TaskContext +import org.apache.spark.util.HadoopFSUtils + +/** + * Streaming reader for a single archive file. The archive is opened once and decompressed/unpacked + * as a stream -- entries are never materialized to local disk. [[readEntries]] hands each entry's + * bytes to a caller-supplied parse function as a bounded [[InputStream]] and concatenates the + * per-entry results into a single iterator, advancing to the next entry only once the current one + * is fully consumed. At most one entry is in flight at a time, so memory stays bounded regardless + * of archive size. + * + * This is format-agnostic: a data source whose per-file reader can consume an `InputStream` wires + * up archive support by calling [[readEntries]] from its read/inference paths and supplying a + * `parseEntry` that turns one entry stream into rows (or tokens). Formats that need random access + * within a file (e.g. Parquet/ORC footers) cannot use this streaming path. + * + * A concrete subclass implements [[readEntries]] for a specific archive format. Obtain the reader + * for a path via `ArchiveReader(path)`, which selects the implementation by file extension; new + * archive formats are added by writing another subclass rather than modifying existing ones. + */ +abstract class ArchiveReader(path: Path) { + + /** + * Streams the archive entry by entry, applying `parseEntry` to each non-skipped entry's + * `(name, stream)` and concatenating the results into a single iterator. The next entry is opened + * only once the current entry's iterator is exhausted, so nothing is buffered to disk and at most + * one entry's bytes are read at a time. The archive stream is closed when the returned iterator + * is exhausted, when [[Closeable.close]] is called on it, and (defensively) on task completion. + */ + def readEntries[T]( + conf: Configuration)( + parseEntry: (String, InputStream) => Iterator[T]): Iterator[T] +} + +object ArchiveReader { + + /** + * Whether `path` names an archive this reader can stream. Dispatched purely on the file + * extension -- `.tar`, `.tar.gz`, or `.tgz` -- since the bytes are not inspected here. + */ + def isArchivePath(path: Path): Boolean = { + val name = path.getName.toLowerCase(Locale.ROOT) + name.endsWith(".tar") || name.endsWith(".tar.gz") || name.endsWith(".tgz") + } + + /** + * Returns the [[ArchiveReader]] implementation for `path`, selected by its file extension. Only + * paths for which [[isArchivePath]] is true are supported; new archive formats add a case here. + */ + def apply(path: Path): ArchiveReader = new TarArchiveReader(path) + + /** + * Splits one already-decompressed archive entry's bytes into lines. The reusable, format-agnostic + * line source for archive entries; the entry stream is not closed here (the reader owns the + * underlying stream). + * + * @param in bytes of one archive entry. + * @param lineSeparatorInRead the explicit read line separator, or `None` to detect line breaks. + * @return an iterator over the entry's lines as [[Text]], without the trailing separator. + */ + def lineIterator(in: InputStream, lineSeparatorInRead: Option[Array[Byte]]): Iterator[Text] = { + // A leading byte-order mark is stripped (LineReader does not strip it on its own) so the lines + // match the non-archive read path. + val bomInputStream = BOMInputStream.builder() + .setInputStream(in) + .setByteOrderMarks( + ByteOrderMark.UTF_8, + ByteOrderMark.UTF_16LE, + ByteOrderMark.UTF_16BE, + ByteOrderMark.UTF_32LE, + ByteOrderMark.UTF_32BE) + .setInclude(false) + .get() + val reader = lineSeparatorInRead match { + case Some(sep) => new LineReader(bomInputStream, sep) + case _ => new LineReader(bomInputStream) + } + new Iterator[Text] { + private val text = new Text() + private var finished = false + private var hasValue = false + + override def hasNext: Boolean = { + if (!finished && !hasValue) { + finished = reader.readLine(text) == 0 + hasValue = !finished + } + !finished + } + + override def next(): Text = { + if (!hasNext) throw new NoSuchElementException + hasValue = false + text + } + } + } +} + +/** + * [[ArchiveReader]] for tar archives: plain `.tar`, gzipped `.tar.gz`, and `.tgz`. + * + * Gzip handling: Hadoop's `CompressionCodecFactory` matches the trailing `.gz` extension and + * auto-decompresses `.tar.gz` via `CodecStreams`, so we just wrap that stream in + * `TarArchiveInputStream`. `.tgz` is not a registered Hadoop codec extension, so the gzip layer is + * unwrapped explicitly here. + */ +class TarArchiveReader(path: Path) extends ArchiveReader(path) { + + // Paths Hadoop's codec factory won't auto-decompress: we apply the gzip layer here. + private def needsExplicitGunzip: Boolean = + path.getName.toLowerCase(Locale.ROOT).endsWith(".tgz") + + /** + * Whether an entry is not a real data file and must be skipped: a directory, or a name Spark's + * own file listing would filter out. Reusing [[HadoopFSUtils.shouldFilterOutPathName]] (the + * `InMemoryFileIndex` filter) keeps archive reads in parity with reading the same entries as + * loose files: `.`-prefixed sidecars (macOS `._x`, `.DS_Store`) and `_`-prefixed markers + * (`_SUCCESS`, `_committed_*`) are skipped, while data files are kept. + */ + private def shouldSkipEntry(entry: TarArchiveEntry): Boolean = { + if (entry.isDirectory) return true + val name = entry.getName + val basename = name.substring(name.lastIndexOf('/') + 1) + HadoopFSUtils.shouldFilterOutPathName(basename) Review Comment: Minor: the doc above says reusing `shouldFilterOutPathName` "keeps archive reads in parity with reading the same entries as loose files," but the filter is applied to the basename only, whereas `InMemoryFileIndex` also skips entire `_`/`.`-prefixed *directories* (it doesn't recurse into them). So an entry like `_temporary/part-0.csv` (basename `part-0.csv`) is read as data here but would be skipped as a loose file — e.g. a leftover `_temporary` from a failed write tarred up with the data. Narrow edge; either apply the filter per path component or soften the comment to say it filters entry *basenames* the way loose-file listing filters names. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
