cloud-fan commented on code in PR #56709:
URL: https://github.com/apache/spark/pull/56709#discussion_r3468808514


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuiteBase.scala:
##########
@@ -372,22 +389,26 @@ trait ArchiveReadSuiteBase extends QueryTest with 
SharedSparkSession {
     }
 
     test("inference unions differing fields across archive entries and loose 
files") {
-      // The archive entry has fields (id, name); the loose file has (id, 
extra). A field-name-keyed
-      // format unions them by name -- exactly as a directory read of the same 
files does.
+      // An archive entry (id, name) and a loose file (id, extra) in one dir 
infer a unioned schema
+      // matching the same files all loose. mergeSchema makes the union 
explicit for Avro; compare
+      // by (name, dataType, nullable) set since union order is not stable.
+      val opts = Map("mergeSchema" -> "true")

Review Comment:
   This test ("inference unions differing fields across archive entries and 
loose files") is gated by `if (supportsSchemaMerge)`, and Avro sets 
`supportsSchemaMerge = false` (and has no `mergeSchema` option), so **Avro 
never runs it** — only JSON and XML do.
   
   This change adds `mergeSchema -> true`, weakens the directory-parity 
assertion from exact `schema == inferredSchema(looseDir)` to an 
order-insensitive `(name, dataType, nullable)`-set comparison, and justifies it 
with the comment "mergeSchema makes the union explicit for Avro; ... union 
order is not stable". Since Avro doesn't run this test, the net effect lands 
only on JSON/XML: `mergeSchema -> true` is a no-op there (they union by 
default), and the real change is a weakening of their field-order parity check 
— for an Avro reason that doesn't apply (pre-PR the test used exact equality 
and passed).
   
   Suggest reverting this shared test to its prior exact-equality form and 
dropping the Avro comment. If Avro field-union coverage is actually wanted, add 
a dedicated Avro test rather than modifying the shared one.



##########
sql/core/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala:
##########
@@ -157,6 +170,76 @@ private[sql] class AvroFileFormat extends FileFormat
       } else {
         Iterator.empty
       }
+      }
+    }
+  }
+
+  /**
+   * Streams a tar archive (`.tar`/`.tar.gz`/`.tgz`) entry by entry, 
deserializing each entry like a
+   * standalone Avro file via a forward-only [[DataFileStream]] (so the 
archive is never unpacked to
+   * disk and memory stays bounded). The whole archive is a single split (see 
`isSplitable`). A
+   * fresh datum reader and deserializer are built per entry, since each entry 
carries its own
+   * writer schema in its header.
+   *
+   * Kept separate from the per-file reader (rather than dispatched inside it) 
because only this V1
+   * read path supports archives; the V2 data source is intentionally left 
untouched.
+   */
+  private def readArchive(
+      file: PartitionedFile,
+      conf: Configuration,
+      parsedOptions: AvroOptions,
+      requiredSchema: StructType,
+      filters: Seq[Filter]): Iterator[InternalRow] = {
+    val userProvidedSchema = parsedOptions.schema
+    ArchiveReader(file.toPath).readEntries(conf) { (_, in) =>
+      val datumReader = userProvidedSchema match {
+        case Some(schema) => new GenericDatumReader[GenericRecord](schema)
+        case None => new GenericDatumReader[GenericRecord]()
+      }
+      val stream = new DataFileStream[GenericRecord](in, datumReader)
+      val avroSchema = userProvidedSchema.getOrElse(stream.getSchema)
+      val datetimeRebaseMode = DataSourceUtils.datetimeRebaseSpec(
+        stream.getMetaString, parsedOptions.datetimeRebaseModeInRead)
+      val avroFilters = if (SQLConf.get.avroFilterPushDown) {

Review Comment:
   Minor: `avroFilters` (and the `SQLConf.get.avroFilterPushDown` read) are 
recomputed for every tar entry inside the `parseEntry` closure, but they depend 
only on `(filters, requiredSchema)`, which are constant across entries — they 
can be hoisted above `readEntries`. 
(`datumReader`/`avroSchema`/`datetimeRebaseMode` are correctly per-entry, since 
each entry carries its own header schema.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to