dongjoon-hyun commented on code in PR #52578:
URL: https://github.com/apache/spark/pull/52578#discussion_r2434380187


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala:
##########
@@ -291,103 +297,146 @@ object PushVariantIntoScan extends Rule[LogicalPlan] {
       filters: Seq[Expression],
       relation: LogicalRelation,
       hadoopFsRelation: HadoopFsRelation): LogicalPlan = {
+    val variants = new VariantInRelation
+
     val schemaAttributes = relation.resolve(hadoopFsRelation.dataSchema,
       hadoopFsRelation.sparkSession.sessionState.analyzer.resolver)
-
-    // Collect variant fields from the relation output
-    val variants = collectAndRewriteVariants(schemaAttributes)
+    val defaultValues = 
ResolveDefaultColumns.existenceDefaultValues(StructType(
+      schemaAttributes.map(a => StructField(a.name, a.dataType, a.nullable, 
a.metadata))))
+    for ((a, defaultValue) <- schemaAttributes.zip(defaultValues)) {
+      variants.addVariantFields(a.exprId, a.dataType, defaultValue, Nil)
+    }
     if (variants.mapping.isEmpty) return originalPlan
 
-    // Collect requested fields from projections and filters
     projectList.foreach(variants.collectRequestedFields)
     filters.foreach(variants.collectRequestedFields)
     // `collectRequestedFields` may have removed all variant columns.
     if (variants.mapping.forall(_._2.isEmpty)) return originalPlan
 
-    // Build attribute map with rewritten types
-    val attributeMap = buildAttributeMap(schemaAttributes, variants)
-
-    // Build new schema with variant types replaced by struct types
+    val attributeMap = schemaAttributes.map { a =>
+      if (variants.mapping.get(a.exprId).exists(_.nonEmpty)) {
+        val newType = variants.rewriteType(a.exprId, a.dataType, Nil)
+        val newAttr = AttributeReference(a.name, newType, a.nullable, 
a.metadata)(
+          qualifier = a.qualifier)
+        (a.exprId, newAttr)
+      } else {
+        // `relation.resolve` actually returns `Seq[AttributeReference]`, 
although the return type
+        // is `Seq[Attribute]`.
+        (a.exprId, a.asInstanceOf[AttributeReference])
+      }
+    }.toMap
     val newFields = schemaAttributes.map { a =>
       val dataType = attributeMap(a.exprId).dataType
       StructField(a.name, dataType, a.nullable, a.metadata)
     }
-    // Update relation output attributes with new types
     val newOutput = relation.output.map(a => attributeMap.getOrElse(a.exprId, 
a))
 
-    // Update HadoopFsRelation's data schema so the file source reads the 
struct columns
     val newHadoopFsRelation = hadoopFsRelation.copy(dataSchema = 
StructType(newFields))(
       hadoopFsRelation.sparkSession)
     val newRelation = relation.copy(relation = newHadoopFsRelation, output = 
newOutput.toIndexedSeq)
 
-    // Build filter and project with rewritten expressions
     buildFilterAndProject(newRelation, projectList, filters, variants, 
attributeMap)
   }
 
-  private def rewriteV2RelationPlan(
+  // BEGIN-V2-SUPPORT: DataSource V2 rewrite method using 
SupportsPushDownVariants API

Review Comment:
   Ditto. This tag is simply surrounding this function.
   ```
   // BEGIN-V2-SUPPORT: 
   // END-V2-SUPPORT
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to