cloud-fan commented on code in PR #52866:
URL: https://github.com/apache/spark/pull/52866#discussion_r2505769618
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -990,6 +995,81 @@ object MergeIntoTable {
CaseInsensitiveMap(fieldMap)
}
}
+
+ // A pruned version of source schema that only contains columns/nested fields
+ // explicitly and directly assigned to a target counterpart in MERGE INTO
actions,
+ // which are relevant for schema evolution.
+ // New columns/nested fields in this schema that are not existing in target
schema
+ // will be added for schema evolution.
+ def sourceSchemaForSchemaEvolution(merge: MergeIntoTable): StructType = {
+
+ val actions = merge.matchedActions ++ merge.notMatchedActions
+ val assignments = actions.collect {
+ case a: UpdateAction => a.assignments
+ case a: InsertAction => a.assignments
+ }.flatten
+
+ val containsStarAction = actions.exists {
+ case _: UpdateStarAction => true
+ case _: InsertStarAction => true
+ case _ => false
+ }
+
+ def filterSchema(sourceSchema: StructType, basePath: Seq[String]):
StructType =
+ StructType(sourceSchema.flatMap { field =>
+ val fieldPath = basePath :+ field.name
+
+ field.dataType match {
+ // Specifically assigned to in one clause:
+ // always keep, including all nested attributes
+ case _ if assignments.exists(isEqual(_, fieldPath)) => Some(field)
+ // If this is a struct and one of the children is being assigned to
in a merge clause,
+ // keep it and continue filtering children.
+ case struct: StructType if assignments.exists(assign =>
+ isPrefix(fieldPath, extractFieldPath(assign.key))) =>
+ Some(field.copy(dataType = filterSchema(struct, fieldPath)))
+ // The field isn't assigned to directly or indirectly (i.e. its
children) in any non-*
+ // clause. Check if it should be kept with any * action.
+ case struct: StructType if containsStarAction =>
+ Some(field.copy(dataType = filterSchema(struct, fieldPath)))
+ case _ if containsStarAction => Some(field)
+ // The field and its children are not assigned to in any * or non-*
action, drop it.
+ case _ => None
+ }
+ })
+
+ filterSchema(merge.sourceTable.schema, Seq.empty)
+ }
+
+ // Helper method to extract field path from an Expression.
+ private def extractFieldPath(expr: Expression): Seq[String] = expr match {
+ case UnresolvedAttribute(nameParts) => nameParts
+ case a: AttributeReference => Seq(a.name)
+ case GetStructField(child, ordinal, nameOpt) =>
+ extractFieldPath(child) :+ nameOpt.getOrElse(s"col$ordinal")
+ case _ => Seq.empty
+ }
+
+ // Helper method to check if a given field path is a prefix of another path.
+ private def isPrefix(prefix: Seq[String], path: Seq[String]): Boolean =
+ prefix.length <= path.length && prefix.zip(path).forall {
+ case (prefixNamePart, pathNamePart) =>
+ SQLConf.get.resolver(prefixNamePart, pathNamePart)
+ }
+
+ // Helper method to check if a given field path is a suffix of another path.
+ private def isSuffix(suffix: Seq[String], path: Seq[String]): Boolean =
+ isPrefix(suffix.reverse, path.reverse)
+
+ // Helper method to check if an assignment key is equal to a source column
+ // and if the assignment value is the corresponding source column directly
+ private def isEqual(assignment: Assignment, path: Seq[String]): Boolean = {
+ val assignmenKeyExpr = extractFieldPath(assignment.key)
+ val assignmentValueExpr = extractFieldPath(assignment.value)
+ // Valid assignments are: col = s.col or col.nestedField =
s.col.nestedField
+ assignmenKeyExpr.length == path.length && isPrefix(assignmenKeyExpr, path)
&&
+ isSuffix(path, assignmentValueExpr)
Review Comment:
is this only to skip the source table qualifier? it seems wrong to trigger
schema evolution for `col = wrong_table.col` which should fail analysis without
schema evolution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]