szehon-ho commented on code in PR #56242:
URL: https://github.com/apache/spark/pull/56242#discussion_r3344433082
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionBasicTests.scala:
##########
@@ -1303,4 +1303,218 @@ trait MergeIntoSchemaEvolutionBasicTests extends
MergeIntoSchemaEvolutionSuiteBa
))
)
}
+
+ for (subFieldName <- Seq("job.title", "job title")) {
+ testNestedStructsEvolution(
+ s"SPARK-57197: source struct has extra nested field with special-char
name: $subFieldName")(
+ target = Seq(
+ """{ "pk": 1, "info": { "name": "Alice" }, "dep": "hr" }""",
+ """{ "pk": 2, "info": { "name": "Bob" }, "dep": "finance" }"""
+ ),
+ source = Seq(
+ s"""{ "pk": 1, "info": { "name": "Alice2", "$subFieldName": "engineer"
}, "dep": "hr" }""",
+ s"""{ "pk": 3, "info": { "name": "Cathy", "$subFieldName": "manager"
}, "dep": "sales" }"""
+ ),
+ targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ sourceSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, StringType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ clauses = Seq(updateAll(), insertAll()),
+ result = Seq(
+ s"""{ "pk": 1, "info": { "name": "Alice2", "$subFieldName": "engineer"
}, "dep": "hr" }""",
+ s"""{ "pk": 2, "info": { "name": "Bob", "$subFieldName": null },
"dep": "finance" }""",
+ s"""{ "pk": 3, "info": { "name": "Cathy", "$subFieldName": "manager"
}, "dep": "sales" }"""
+ ),
+ resultSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, StringType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ expectErrorWithoutEvolutionContains = "Cannot write extra fields"
+ )
+ }
+
+ for (colName <- Seq("job.title", "job title")) {
+ testEvolution(
+ s"SPARK-57197: special-char column already in target gets updated with
type widening:" +
+ s" $colName")(
+ targetData = {
+ val schema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("salary", IntegerType),
+ StructField("dep", StringType),
+ StructField(colName, ShortType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(Seq(
+ Row(1, 100, "hr", 1.toShort),
+ Row(2, 200, "software", 2.toShort),
+ Row(3, 300, "hr", 3.toShort)
+ )), schema)
+ },
+ sourceData = {
+ val schema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("salary", IntegerType),
+ StructField("dep", StringType),
+ StructField(colName, IntegerType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(Seq(
+ Row(2, 150, "finance", 50000),
+ Row(4, 400, "finance", 60000)
+ )), schema)
+ },
+ clauses = Seq(updateAll(), insertAll()),
+ expected = Seq(
+ (1, 100, "hr", 1),
+ (2, 150, "finance", 50000),
+ (3, 300, "hr", 3),
+ (4, 400, "finance", 60000)
+ ).toDF("pk", "salary", "dep", colName),
+ expectedSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("salary", IntegerType),
+ StructField("dep", StringType),
+ StructField(colName, IntegerType)
+ )),
+ expectErrorWithoutEvolutionContains =
+ "Fail to assign a value of \"INT\" type to the \"SMALLINT\" type
column or variable"
+ )
+ }
+
+ for (subFieldName <- Seq("job.title", "job title")) {
+ testNestedStructsEvolution(
+ s"SPARK-57197: nested special-char field already in target gets updated
with type widening:" +
+ s" $subFieldName")(
+ target = Seq(
+ s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": 1 }, "dep":
"hr" }""",
+ s"""{ "pk": 2, "info": { "name": "Bob", "$subFieldName": 2 }, "dep":
"software" }""",
+ s"""{ "pk": 3, "info": { "name": "Charlie", "$subFieldName": 3 },
"dep": "hr" }"""
+ ),
+ source = Seq(
+ s"""{ "pk": 2, "info": { "name": "Bob2", "$subFieldName": 50000 },
"dep": "finance" }""",
+ s"""{ "pk": 4, "info": { "name": "Diana", "$subFieldName": 60000 },
"dep": "finance" }"""
+ ),
+ targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, ShortType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ sourceSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, IntegerType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ clauses = Seq(updateAll(), insertAll()),
+ result = Seq(
+ s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": 1 }, "dep":
"hr" }""",
+ s"""{ "pk": 2, "info": { "name": "Bob2", "$subFieldName": 50000 },
"dep": "finance" }""",
+ s"""{ "pk": 3, "info": { "name": "Charlie", "$subFieldName": 3 },
"dep": "hr" }""",
+ s"""{ "pk": 4, "info": { "name": "Diana", "$subFieldName": 60000 },
"dep": "finance" }"""
+ ),
+ resultSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, IntegerType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ expectErrorWithoutEvolutionContains =
+ "Fail to assign a value of \"INT\" type to the \"SMALLINT\" type
column or variable"
+ )
+ }
+
+ for (colName <- Seq("job.title", "job title")) {
+ testEvolution(
+ s"SPARK-57197: target has special-char column missing from source:
$colName")(
Review Comment:
nit: can we remove the SPARK JIRA from these tests? its not very helpful
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionBasicTests.scala:
##########
@@ -1303,4 +1303,218 @@ trait MergeIntoSchemaEvolutionBasicTests extends
MergeIntoSchemaEvolutionSuiteBa
))
)
}
+
+ for (subFieldName <- Seq("job.title", "job title")) {
+ testNestedStructsEvolution(
+ s"SPARK-57197: source struct has extra nested field with special-char
name: $subFieldName")(
+ target = Seq(
+ """{ "pk": 1, "info": { "name": "Alice" }, "dep": "hr" }""",
+ """{ "pk": 2, "info": { "name": "Bob" }, "dep": "finance" }"""
+ ),
+ source = Seq(
+ s"""{ "pk": 1, "info": { "name": "Alice2", "$subFieldName": "engineer"
}, "dep": "hr" }""",
+ s"""{ "pk": 3, "info": { "name": "Cathy", "$subFieldName": "manager"
}, "dep": "sales" }"""
+ ),
+ targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ sourceSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, StringType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ clauses = Seq(updateAll(), insertAll()),
+ result = Seq(
+ s"""{ "pk": 1, "info": { "name": "Alice2", "$subFieldName": "engineer"
}, "dep": "hr" }""",
+ s"""{ "pk": 2, "info": { "name": "Bob", "$subFieldName": null },
"dep": "finance" }""",
+ s"""{ "pk": 3, "info": { "name": "Cathy", "$subFieldName": "manager"
}, "dep": "sales" }"""
+ ),
+ resultSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, StringType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ expectErrorWithoutEvolutionContains = "Cannot write extra fields"
+ )
+ }
+
+ for (colName <- Seq("job.title", "job title")) {
+ testEvolution(
+ s"SPARK-57197: special-char column already in target gets updated with
type widening:" +
+ s" $colName")(
+ targetData = {
+ val schema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("salary", IntegerType),
+ StructField("dep", StringType),
+ StructField(colName, ShortType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(Seq(
+ Row(1, 100, "hr", 1.toShort),
+ Row(2, 200, "software", 2.toShort),
+ Row(3, 300, "hr", 3.toShort)
+ )), schema)
+ },
+ sourceData = {
+ val schema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("salary", IntegerType),
+ StructField("dep", StringType),
+ StructField(colName, IntegerType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(Seq(
+ Row(2, 150, "finance", 50000),
+ Row(4, 400, "finance", 60000)
+ )), schema)
+ },
+ clauses = Seq(updateAll(), insertAll()),
+ expected = Seq(
+ (1, 100, "hr", 1),
+ (2, 150, "finance", 50000),
+ (3, 300, "hr", 3),
+ (4, 400, "finance", 60000)
+ ).toDF("pk", "salary", "dep", colName),
+ expectedSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("salary", IntegerType),
+ StructField("dep", StringType),
+ StructField(colName, IntegerType)
+ )),
+ expectErrorWithoutEvolutionContains =
+ "Fail to assign a value of \"INT\" type to the \"SMALLINT\" type
column or variable"
+ )
+ }
+
+ for (subFieldName <- Seq("job.title", "job title")) {
+ testNestedStructsEvolution(
+ s"SPARK-57197: nested special-char field already in target gets updated
with type widening:" +
+ s" $subFieldName")(
+ target = Seq(
+ s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": 1 }, "dep":
"hr" }""",
+ s"""{ "pk": 2, "info": { "name": "Bob", "$subFieldName": 2 }, "dep":
"software" }""",
+ s"""{ "pk": 3, "info": { "name": "Charlie", "$subFieldName": 3 },
"dep": "hr" }"""
+ ),
+ source = Seq(
+ s"""{ "pk": 2, "info": { "name": "Bob2", "$subFieldName": 50000 },
"dep": "finance" }""",
+ s"""{ "pk": 4, "info": { "name": "Diana", "$subFieldName": 60000 },
"dep": "finance" }"""
+ ),
+ targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, ShortType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ sourceSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, IntegerType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ clauses = Seq(updateAll(), insertAll()),
+ result = Seq(
+ s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": 1 }, "dep":
"hr" }""",
+ s"""{ "pk": 2, "info": { "name": "Bob2", "$subFieldName": 50000 },
"dep": "finance" }""",
+ s"""{ "pk": 3, "info": { "name": "Charlie", "$subFieldName": 3 },
"dep": "hr" }""",
+ s"""{ "pk": 4, "info": { "name": "Diana", "$subFieldName": 60000 },
"dep": "finance" }"""
+ ),
+ resultSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, IntegerType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ expectErrorWithoutEvolutionContains =
+ "Fail to assign a value of \"INT\" type to the \"SMALLINT\" type
column or variable"
+ )
+ }
+
+ for (colName <- Seq("job.title", "job title")) {
+ testEvolution(
+ s"SPARK-57197: target has special-char column missing from source:
$colName")(
+ targetData = Seq(
+ (1, 100, "hr", "engineer"),
+ (2, 200, "finance", "manager"),
+ (3, 300, "hr", "analyst")
+ ).toDF("pk", "salary", "dep", colName),
+ sourceData = Seq(
+ (2, 150, "sales"),
+ (4, 400, "engineering")
+ ).toDF("pk", "salary", "dep"),
+ clauses = Seq(updateAll(), insertAll()),
+ expected = Seq[(Int, Int, String, String)](
+ (1, 100, "hr", "engineer"),
+ (2, 150, "sales", "manager"),
+ (3, 300, "hr", "analyst"),
+ (4, 400, "engineering", null)
+ ).toDF("pk", "salary", "dep", colName),
+ expectedSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("salary", IntegerType, nullable = false),
+ StructField("dep", StringType),
+ StructField(colName, StringType)
+ )),
+ expectErrorWithoutEvolutionContains = "cannot be resolved"
+ )
+ }
+
+ for (subFieldName <- Seq("job.title", "job title")) {
+ testNestedStructsEvolution(
+ s"SPARK-57197: source struct missing nested field with special-char
name: $subFieldName")(
Review Comment:
can we harmonize the description with the previous (ie, target has ...
missing from source')
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]