szehon-ho commented on code in PR #52866:
URL: https://github.com/apache/spark/pull/52866#discussion_r2501115001
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala:
##########
@@ -3510,6 +3510,527 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
}
+ test("Merge schema evolution should not evolve if not directly referencing
new column: update") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceDF = Seq((2, 150, "dummy", "blah"),
+ (3, 250, "dummy", "blah")).toDF("pk", "salary", "dep", "extra")
+ sourceDF.createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET dep='software'
+ |""".stripMargin
+
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr"),
+ Row(2, 200, "software")))
+
+ sql(s"DROP TABLE $tableNameAsString")
+ }
+ }
+ }
+
+ test("Merge schema evolution should not evolve if not directly referencing
new column: insert") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceDF = Seq((2, 150, "dummy", "blah"),
+ (3, 250, "dummy", "blah")).toDF("pk", "salary", "dep", "extra")
+ sourceDF.createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN NOT MATCHED THEN
+ | INSERT (pk, salary, dep) VALUES (s.pk, s.salary, 'newdep')
+ |""".stripMargin
+
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr"),
+ Row(2, 200, "software"),
+ Row(3, 250, "newdep")))
+
+ sql(s"DROP TABLE $tableNameAsString")
+ }
+ }
+ }
+
+ test("Merge schema evolution should not evolve if not directly referencing
new column:" +
+ "update and insert") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceDF = Seq((2, 150, "dummy", "blah"),
+ (3, 250, "dummy", "blah")).toDF("pk", "salary", "dep", "extra")
+ sourceDF.createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET dep='software'
+ |WHEN NOT MATCHED THEN
+ | INSERT (pk, salary, dep) VALUES (s.pk, s.salary, 'newdep')
+ |""".stripMargin
+
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr"),
+ Row(2, 200, "software"),
+ Row(3, 250, "newdep")))
+
+ sql(s"DROP TABLE $tableNameAsString")
+ }
+ }
+ }
+
+ test("Merge schema evolution should only evolve referenced column when
source " +
+ "has multiple new columns") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceDF = Seq((2, 150, "dummy", 50, "blah"),
+ (3, 250, "dummy", 75, "blah")).toDF("pk", "salary", "dep", "bonus",
"extra")
+ sourceDF.createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET salary = s.salary, bonus = s.bonus
+ |WHEN NOT MATCHED THEN
+ | INSERT (pk, salary, dep, bonus) VALUES (s.pk, s.salary,
'newdep', s.bonus)
+ |""".stripMargin
+
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr", null),
+ Row(2, 150, "software", 50),
+ Row(3, 250, "newdep", 75)))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
"UNRESOLVED_COLUMN.WITH_SUGGESTION")
+ }
+
+ sql(s"DROP TABLE $tableNameAsString")
+ }
+ }
+ }
+
+ test("Merge schema evolution should only evolve referenced struct field when
source " +
+ "has multiple new struct fields") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ withTempView("source") {
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |info STRUCT<salary: INT, status: STRING>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 1, "info": { "salary": 100, "status": "active" }, "dep":
"hr" }
+ |{ "pk": 2, "info": { "salary": 200, "status": "inactive" },
"dep": "software" }
+ |""".stripMargin)
+
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("salary", IntegerType),
+ StructField("status", StringType),
+ StructField("bonus", IntegerType), // new field 1
+ StructField("extra", StringType) // new field 2
+ ))),
+ StructField("dep", StringType)
+ ))
+ val data = Seq(
+ Row(2, Row(150, "dummy", 50, "blah"), "active"),
+ Row(3, Row(250, "dummy", 75, "blah"), "active")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET info.bonus = s.info.bonus
+ |""".stripMargin
+
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ // Only 'bonus' field should be added, not 'extra'
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row(100, "active", null), "hr"),
+ Row(2, Row(200, "inactive", 50), "software")))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get == "FIELD_NOT_FOUND")
+ }
+
+ sql(s"DROP TABLE $tableNameAsString")
+ }
+ }
+ }
+
+ test("Merge schema evolution should not evolve when assigning existing
target column " +
+ "from source column that does not exist in target") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceDF = Seq((2, 150, "dummy", 50),
+ (3, 250, "dummy", 75)).toDF("pk", "salary", "dep", "bonus")
+ sourceDF.createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET salary = s.bonus
+ |WHEN NOT MATCHED THEN
+ | INSERT (pk, salary, dep) VALUES (s.pk, s.bonus, 'newdep')
+ |""".stripMargin
+
+ sql(mergeStmt)
+ // bonus column should NOT be added to target schema
+ // Only salary is updated with bonus value
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr"),
+ Row(2, 50, "software"),
+ Row(3, 75, "newdep")))
+
+ sql(s"DROP TABLE $tableNameAsString")
+ }
+ }
+ }
+
+ test("Merge schema evolution should evolve struct if directly referencing
new field " +
+ "in top level struct: insert") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ withTempView("source") {
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |info STRUCT<salary: INT, status: STRING>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 1, "info": { "salary": 100, "status": "active" }, "dep":
"hr" }
+ |{ "pk": 2, "info": { "salary": 200, "status": "inactive" },
"dep": "software" }
+ |""".stripMargin)
+
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("salary", IntegerType),
+ StructField("status", StringType),
+ StructField("bonus", IntegerType) // new field not in target
+ ))),
+ StructField("dep", StringType)
+ ))
+ val data = Seq(
+ Row(2, Row(150, "dummy", 50), "active"),
+ Row(3, Row(250, "dummy", 75), "active")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN NOT MATCHED THEN
+ | INSERT (pk, info, dep) VALUES (s.pk,
+ | named_struct('salary', s.info.salary, 'status', 'active'),
'marketing')
Review Comment:
discuss offline, refine the logic to be more selective (direct assignment to
source column)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]