dongjoon-hyun commented on code in PR #52578:
URL: https://github.com/apache/spark/pull/52578#discussion_r2434386289


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala:
##########
@@ -291,103 +297,146 @@ object PushVariantIntoScan extends Rule[LogicalPlan] {
       filters: Seq[Expression],
       relation: LogicalRelation,
       hadoopFsRelation: HadoopFsRelation): LogicalPlan = {
+    val variants = new VariantInRelation
+
     val schemaAttributes = relation.resolve(hadoopFsRelation.dataSchema,
       hadoopFsRelation.sparkSession.sessionState.analyzer.resolver)
-
-    // Collect variant fields from the relation output
-    val variants = collectAndRewriteVariants(schemaAttributes)
+    val defaultValues = 
ResolveDefaultColumns.existenceDefaultValues(StructType(
+      schemaAttributes.map(a => StructField(a.name, a.dataType, a.nullable, 
a.metadata))))
+    for ((a, defaultValue) <- schemaAttributes.zip(defaultValues)) {
+      variants.addVariantFields(a.exprId, a.dataType, defaultValue, Nil)
+    }
     if (variants.mapping.isEmpty) return originalPlan
 
-    // Collect requested fields from projections and filters
     projectList.foreach(variants.collectRequestedFields)
     filters.foreach(variants.collectRequestedFields)
     // `collectRequestedFields` may have removed all variant columns.
     if (variants.mapping.forall(_._2.isEmpty)) return originalPlan
 
-    // Build attribute map with rewritten types
-    val attributeMap = buildAttributeMap(schemaAttributes, variants)
-
-    // Build new schema with variant types replaced by struct types
+    val attributeMap = schemaAttributes.map { a =>
+      if (variants.mapping.get(a.exprId).exists(_.nonEmpty)) {
+        val newType = variants.rewriteType(a.exprId, a.dataType, Nil)
+        val newAttr = AttributeReference(a.name, newType, a.nullable, 
a.metadata)(
+          qualifier = a.qualifier)
+        (a.exprId, newAttr)
+      } else {
+        // `relation.resolve` actually returns `Seq[AttributeReference]`, 
although the return type
+        // is `Seq[Attribute]`.
+        (a.exprId, a.asInstanceOf[AttributeReference])
+      }
+    }.toMap
     val newFields = schemaAttributes.map { a =>
       val dataType = attributeMap(a.exprId).dataType
       StructField(a.name, dataType, a.nullable, a.metadata)
     }
-    // Update relation output attributes with new types
     val newOutput = relation.output.map(a => attributeMap.getOrElse(a.exprId, 
a))
 
-    // Update HadoopFsRelation's data schema so the file source reads the 
struct columns
     val newHadoopFsRelation = hadoopFsRelation.copy(dataSchema = 
StructType(newFields))(
       hadoopFsRelation.sparkSession)
     val newRelation = relation.copy(relation = newHadoopFsRelation, output = 
newOutput.toIndexedSeq)
 
-    // Build filter and project with rewritten expressions
     buildFilterAndProject(newRelation, projectList, filters, variants, 
attributeMap)
   }
 
-  private def rewriteV2RelationPlan(
+  // BEGIN-V2-SUPPORT: DataSource V2 rewrite method using 
SupportsPushDownVariants API
+  // Key differences from V1 implementation:
+  // 1. V2 uses DataSourceV2ScanRelation instead of LogicalRelation
+  // 2. Uses SupportsPushDownVariants API instead of directly manipulating scan
+  // 3. Schema is already resolved in scanRelation.output (no need for 
relation.resolve())
+  // 4. Scan rebuilding is handled by the scan implementation via the API
+  private def rewritePlanV2(
       originalPlan: LogicalPlan,
       projectList: Seq[NamedExpression],
       filters: Seq[Expression],
-      relation: DataSourceV2Relation): LogicalPlan = {
+      scanRelation: DataSourceV2ScanRelation,

Review Comment:
   Thank you for narrowing down to `DataSourceV2ScanRelation` from 
`DataSourceV2Relation`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to