johanl-db commented on code in PR #53732:
URL: https://github.com/apache/spark/pull/53732#discussion_r2675410540
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1249,27 +1249,50 @@ class Analyzer(
val partCols = partitionColumnNames(r.table)
validatePartitionSpec(partCols, i.partitionSpec)
+ val schemaEvolutionWriteOption: Map[String, String] =
+ if (i.withSchemaEvolution) Map("mergeSchema" -> "true") else
Map.empty
+
val staticPartitions =
i.partitionSpec.filter(_._2.isDefined).transform((_, v) => v.get)
val query = addStaticPartitionColumns(r,
projectByName.getOrElse(i.query), staticPartitions,
isByName)
if (!i.overwrite) {
if (isByName) {
- AppendData.byName(r, query)
+ AppendData.byName(
Review Comment:
Having a dedicated flag would be cleaner: `mergeSchema` is fairly
overloaded, e.g. it can also be a read option in Parquet that means inferring a
superset schema from multiple Parquet files being read.
**But**: dataframe operations have always been using `mergeSchema` to enable
schema evolution in Delta and Iceberg. By reusing the `mergeSchema` option, we
automatically get schema evolution working there.
If we introduce a new field, then until Delta/Iceberg pick it up, `WITH
SCHEMA EVOLUTION` will essentially be ignored - not good.
I would use `mergeSchema` for now, we can still introduce a dedicated field
later if we want to
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala:
##########
@@ -174,8 +175,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite)
case i @ InsertIntoStatement(
- l @ LogicalRelationWithTable(t: HadoopFsRelation, table), parts, _,
query, overwrite, _, _)
Review Comment:
What happens here if `withSchemaEvolution` = `true`?
Will the query fail because we can't resolve InsertIntoStatement?
I'm trying to understand whether we throwing a proper error message in cases
where WITH SCHEMA EVOLUTION wouldn't be supported
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -1325,6 +1325,51 @@ class PlanResolutionSuite extends SharedSparkSession
with AnalysisTest {
}
}
+ for {
+ insertOp <- Seq("INTO", "OVERWRITE", "REPLACE WHERE")
+ withSchemaEvolution <- Seq(true, false)
+ isByName <- Seq(true, false)
+ dpoMode <- if (insertOp == "OVERWRITE") {
+ PartitionOverwriteMode.values.toSeq
+ } else {
+ Seq(PartitionOverwriteMode.STATIC)
+ }
+ } {
Review Comment:
It would be more readable if you split this in 3 tests:
INSERT INTO
INSERT OVERWRITE / REPLACE WHERE (could even split these two in two separate
tests)
Partition overwrite
I like simpler tests because it's easier to see that they are correct and
testing the right thing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]