cloud-fan commented on code in PR #56709:
URL: https://github.com/apache/spark/pull/56709#discussion_r3468808514
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuiteBase.scala:
##########
@@ -372,22 +389,26 @@ trait ArchiveReadSuiteBase extends QueryTest with
SharedSparkSession {
}
test("inference unions differing fields across archive entries and loose
files") {
- // The archive entry has fields (id, name); the loose file has (id,
extra). A field-name-keyed
- // format unions them by name -- exactly as a directory read of the same
files does.
+ // An archive entry (id, name) and a loose file (id, extra) in one dir
infer a unioned schema
+ // matching the same files all loose. mergeSchema makes the union
explicit for Avro; compare
+ // by (name, dataType, nullable) set since union order is not stable.
+ val opts = Map("mergeSchema" -> "true")
Review Comment:
This test ("inference unions differing fields across archive entries and
loose files") is gated by `if (supportsSchemaMerge)`, and Avro sets
`supportsSchemaMerge = false` (and has no `mergeSchema` option), so **Avro
never runs it** — only JSON and XML do.
This change adds `mergeSchema -> true`, weakens the directory-parity
assertion from exact `schema == inferredSchema(looseDir)` to an
order-insensitive `(name, dataType, nullable)`-set comparison, and justifies it
with the comment "mergeSchema makes the union explicit for Avro; ... union
order is not stable". Since Avro doesn't run this test, the net effect lands
only on JSON/XML: `mergeSchema -> true` is a no-op there (they union by
default), and the real change is a weakening of their field-order parity check
— for an Avro reason that doesn't apply (pre-PR the test used exact equality
and passed).
Suggest reverting this shared test to its prior exact-equality form and
dropping the Avro comment. If Avro field-union coverage is actually wanted, add
a dedicated Avro test rather than modifying the shared one.
##########
sql/core/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala:
##########
@@ -157,6 +170,76 @@ private[sql] class AvroFileFormat extends FileFormat
} else {
Iterator.empty
}
+ }
+ }
+ }
+
+ /**
+ * Streams a tar archive (`.tar`/`.tar.gz`/`.tgz`) entry by entry,
deserializing each entry like a
+ * standalone Avro file via a forward-only [[DataFileStream]] (so the
archive is never unpacked to
+ * disk and memory stays bounded). The whole archive is a single split (see
`isSplitable`). A
+ * fresh datum reader and deserializer are built per entry, since each entry
carries its own
+ * writer schema in its header.
+ *
+ * Kept separate from the per-file reader (rather than dispatched inside it)
because only this V1
+ * read path supports archives; the V2 data source is intentionally left
untouched.
+ */
+ private def readArchive(
+ file: PartitionedFile,
+ conf: Configuration,
+ parsedOptions: AvroOptions,
+ requiredSchema: StructType,
+ filters: Seq[Filter]): Iterator[InternalRow] = {
+ val userProvidedSchema = parsedOptions.schema
+ ArchiveReader(file.toPath).readEntries(conf) { (_, in) =>
+ val datumReader = userProvidedSchema match {
+ case Some(schema) => new GenericDatumReader[GenericRecord](schema)
+ case None => new GenericDatumReader[GenericRecord]()
+ }
+ val stream = new DataFileStream[GenericRecord](in, datumReader)
+ val avroSchema = userProvidedSchema.getOrElse(stream.getSchema)
+ val datetimeRebaseMode = DataSourceUtils.datetimeRebaseSpec(
+ stream.getMetaString, parsedOptions.datetimeRebaseModeInRead)
+ val avroFilters = if (SQLConf.get.avroFilterPushDown) {
Review Comment:
Minor: `avroFilters` (and the `SQLConf.get.avroFilterPushDown` read) are
recomputed for every tar entry inside the `parseEntry` closure, but they depend
only on `(filters, requiredSchema)`, which are constant across entries — they
can be hoisted above `readEntries`.
(`datumReader`/`avroSchema`/`datetimeRebaseMode` are correctly per-entry, since
each entry carries its own header schema.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]