szehon-ho commented on code in PR #54704:
URL: https://github.com/apache/spark/pull/54704#discussion_r2913861154


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala:
##########
@@ -40,46 +43,58 @@ object ResolveMergeIntoSchemaEvolution extends 
Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     // This rule should run only if all assignments are resolved, except those
     // that will be satisfied by schema evolution
-    case m@MergeIntoTable(_, _, _, _, _, _, _) if m.evaluateSchemaEvolution =>
-      val changes = m.changesForSchemaEvolution
-      if (changes.isEmpty) {
-        m
-      } else {
-        val finalAttrMapping = ArrayBuffer.empty[(Attribute, Attribute)]
-        val newTarget = m.targetTable.transform {
-          case r: DataSourceV2Relation =>
-            val referencedSourceSchema = 
MergeIntoTable.sourceSchemaForSchemaEvolution(m)
-            val newTarget = performSchemaEvolution(r, referencedSourceSchema, 
changes)
-            val oldTargetOutput = m.targetTable.output
-            val newTargetOutput = newTarget.output
-            val attributeMapping = oldTargetOutput.zip(newTargetOutput)
-            finalAttrMapping ++= attributeMapping
-            newTarget
-        }
-        val res = m.copy(targetTable = newTarget)
-        res.rewriteAttrs(AttributeMap(finalAttrMapping.toSeq))
+    case m: MergeIntoTable if m.pendingSchemaChanges.nonEmpty =>
+      EliminateSubqueryAliases(m.targetTable) match {
+        case ExtractV2CatalogAndIdentifier(catalog, ident) =>
+          val newTable = evolveSchema(m, catalog, ident)
+          val mergeWithNewTarget = replaceMergeTarget(m, newTable)
+
+          val remainingChanges = mergeWithNewTarget.pendingSchemaChanges
+          if (remainingChanges.nonEmpty) {
+            throw 
QueryCompilationErrors.unsupportedTableChangesInAutoSchemaEvolutionError(
+              ident, catalog.name(), remainingChanges.toArray)
+          }
+
+          mergeWithNewTarget
+        case _ =>
+          m
       }
   }
 
-  private def performSchemaEvolution(
-      relation: DataSourceV2Relation,
-      referencedSourceSchema: StructType,
-      changes: Array[TableChange]): DataSourceV2Relation = {
-    (relation.catalog, relation.identifier) match {
-      case (Some(c: TableCatalog), Some(i)) =>
-        c.alterTable(i, changes: _*)
-        val newTable = c.loadTable(i)
-        val newSchema = CatalogV2Util.v2ColumnsToStructType(newTable.columns())
-        // Check if there are any remaining changes not applied.
-        val remainingChanges = MergeIntoTable.schemaChanges(newSchema, 
referencedSourceSchema)
-        if (remainingChanges.nonEmpty) {
-          throw 
QueryCompilationErrors.unsupportedTableChangesInAutoSchemaEvolutionError(
-            remainingChanges, i.toQualifiedNameParts(c))
-        }
-        relation.copy(table = newTable, output = 
DataTypeUtils.toAttributes(newSchema))
-      case _ => logWarning(s"Schema Evolution enabled but data source 
$relation " +
-        s"does not support it, skipping.")
-        relation
+  private def evolveSchema(

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to