dongjoon-hyun commented on code in PR #52578:
URL: https://github.com/apache/spark/pull/52578#discussion_r2434386289
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala:
##########
@@ -291,103 +297,146 @@ object PushVariantIntoScan extends Rule[LogicalPlan] {
filters: Seq[Expression],
relation: LogicalRelation,
hadoopFsRelation: HadoopFsRelation): LogicalPlan = {
+ val variants = new VariantInRelation
+
val schemaAttributes = relation.resolve(hadoopFsRelation.dataSchema,
hadoopFsRelation.sparkSession.sessionState.analyzer.resolver)
-
- // Collect variant fields from the relation output
- val variants = collectAndRewriteVariants(schemaAttributes)
+ val defaultValues =
ResolveDefaultColumns.existenceDefaultValues(StructType(
+ schemaAttributes.map(a => StructField(a.name, a.dataType, a.nullable,
a.metadata))))
+ for ((a, defaultValue) <- schemaAttributes.zip(defaultValues)) {
+ variants.addVariantFields(a.exprId, a.dataType, defaultValue, Nil)
+ }
if (variants.mapping.isEmpty) return originalPlan
- // Collect requested fields from projections and filters
projectList.foreach(variants.collectRequestedFields)
filters.foreach(variants.collectRequestedFields)
// `collectRequestedFields` may have removed all variant columns.
if (variants.mapping.forall(_._2.isEmpty)) return originalPlan
- // Build attribute map with rewritten types
- val attributeMap = buildAttributeMap(schemaAttributes, variants)
-
- // Build new schema with variant types replaced by struct types
+ val attributeMap = schemaAttributes.map { a =>
+ if (variants.mapping.get(a.exprId).exists(_.nonEmpty)) {
+ val newType = variants.rewriteType(a.exprId, a.dataType, Nil)
+ val newAttr = AttributeReference(a.name, newType, a.nullable,
a.metadata)(
+ qualifier = a.qualifier)
+ (a.exprId, newAttr)
+ } else {
+ // `relation.resolve` actually returns `Seq[AttributeReference]`,
although the return type
+ // is `Seq[Attribute]`.
+ (a.exprId, a.asInstanceOf[AttributeReference])
+ }
+ }.toMap
val newFields = schemaAttributes.map { a =>
val dataType = attributeMap(a.exprId).dataType
StructField(a.name, dataType, a.nullable, a.metadata)
}
- // Update relation output attributes with new types
val newOutput = relation.output.map(a => attributeMap.getOrElse(a.exprId,
a))
- // Update HadoopFsRelation's data schema so the file source reads the
struct columns
val newHadoopFsRelation = hadoopFsRelation.copy(dataSchema =
StructType(newFields))(
hadoopFsRelation.sparkSession)
val newRelation = relation.copy(relation = newHadoopFsRelation, output =
newOutput.toIndexedSeq)
- // Build filter and project with rewritten expressions
buildFilterAndProject(newRelation, projectList, filters, variants,
attributeMap)
}
- private def rewriteV2RelationPlan(
+ // BEGIN-V2-SUPPORT: DataSource V2 rewrite method using
SupportsPushDownVariants API
+ // Key differences from V1 implementation:
+ // 1. V2 uses DataSourceV2ScanRelation instead of LogicalRelation
+ // 2. Uses SupportsPushDownVariants API instead of directly manipulating scan
+ // 3. Schema is already resolved in scanRelation.output (no need for
relation.resolve())
+ // 4. Scan rebuilding is handled by the scan implementation via the API
+ private def rewritePlanV2(
originalPlan: LogicalPlan,
projectList: Seq[NamedExpression],
filters: Seq[Expression],
- relation: DataSourceV2Relation): LogicalPlan = {
+ scanRelation: DataSourceV2ScanRelation,
Review Comment:
Thank you for narrowing down to `DataSourceV2ScanRelation` from
`DataSourceV2Relation`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]