johanl-db commented on code in PR #53732:
URL: https://github.com/apache/spark/pull/53732#discussion_r2675410540


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1249,27 +1249,50 @@ class Analyzer(
         val partCols = partitionColumnNames(r.table)
         validatePartitionSpec(partCols, i.partitionSpec)
 
+        val schemaEvolutionWriteOption: Map[String, String] =
+          if (i.withSchemaEvolution) Map("mergeSchema" -> "true") else 
Map.empty
+
         val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).transform((_, v) => v.get)
         val query = addStaticPartitionColumns(r, 
projectByName.getOrElse(i.query), staticPartitions,
           isByName)
 
         if (!i.overwrite) {
           if (isByName) {
-            AppendData.byName(r, query)
+            AppendData.byName(

Review Comment:
   Having a dedicated flag would be cleaner: `mergeSchema` is fairly 
overloaded, e.g. it can also be a read option in Parquet that means inferring a 
superset schema from multiple Parquet files being read.
   
   **But**: dataframe operations have always been using `mergeSchema` to enable 
schema evolution in Delta and Iceberg. By reusing the `mergeSchema` option, we 
automatically get schema evolution working there.
   If we introduce a new field, then until Delta/Iceberg pick it up, `WITH 
SCHEMA EVOLUTION` will essentially be ignored - not good.
   
   I would use `mergeSchema` for now, we can still introduce a dedicated field 
later if we want to



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala:
##########
@@ -174,8 +175,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
       InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite)
 
     case i @ InsertIntoStatement(
-        l @ LogicalRelationWithTable(t: HadoopFsRelation, table), parts, _, 
query, overwrite, _, _)

Review Comment:
   What happens here if `withSchemaEvolution` = `true`?
   Will the query fail because we can't resolve InsertIntoStatement?
   
   I'm trying to understand whether we throwing a proper error message in cases 
where WITH SCHEMA EVOLUTION wouldn't be supported



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -1325,6 +1325,51 @@ class PlanResolutionSuite extends SharedSparkSession 
with AnalysisTest {
     }
   }
 
+  for {
+    insertOp <- Seq("INTO", "OVERWRITE", "REPLACE WHERE")
+    withSchemaEvolution <- Seq(true, false)
+    isByName <- Seq(true, false)
+    dpoMode <- if (insertOp == "OVERWRITE") {
+      PartitionOverwriteMode.values.toSeq
+    } else {
+      Seq(PartitionOverwriteMode.STATIC)
+    }
+  } {

Review Comment:
   It would be more readable if you split this in 3 tests: 
   INSERT INTO
   INSERT OVERWRITE / REPLACE WHERE (could even split these two in two separate 
tests)
   Partition overwrite
   
   I like simpler tests because it's easier to see that they are correct and 
testing the right thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to