cloud-fan commented on code in PR #55427:
URL: https://github.com/apache/spark/pull/55427#discussion_r3247636588
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -367,7 +369,7 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
reordered
}
} else {
- Nil
+ cannotFindOutputData(tableName, colPath, expectedCols.map(_.name))
Review Comment:
The latest commit (`2c56adef`, "Use enforceFullOutput for MERGE
resolveUpdate as well") removed the `enforceFullOutput` parameter and made this
throw — plus the matching throws in `resolveStructType` (line 504),
`resolveArrayType` (line 545), `resolveMapType` (line 608) — unconditional. The
previous commit `0be4cc8d951` had `enforceFullOutput=true` for INSERT (via
`resolveOutputColumns`) and `false` for MERGE (via `resolveUpdate`), so MERGE
could return `Nil` here, the recursive `resolveStructType` could return `None`,
`resolveUpdate`'s `.getOrElse(value)` could fall back, and
`AssignmentUtils.alignUpdateAssignments` would throw
`DATATYPE_MISMATCH.INVALID_ROW_LEVEL_OPERATION_ASSIGNMENTS` with the specific
`addError` message (e.g. "Cannot safely cast string to int").
Now the throw fires for both callers and the `addError`-collected
cast-failure detail is replaced by a generic
`INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA` for MERGE/UPDATE. Concrete tests
that assert the prior behavior:
- `AlignMergeAssignmentsSuite` lines 738-744 (ANSI mode, `UPDATE SET s.n_s =
named_struct('dn_i', 'string-value', 'dn_l', 1L)` on target `struct<dn_i:int,
dn_l:bigint>`) — `assertAnalysisException(…, "Cannot safely cast")`. Same
pattern at lines 920-925 (strict mode).
- `AlignUpdateAssignmentsSuite` lines 546-548 and 599-601, same pattern.
Repro path: `applyAssignments` → `resolveUpdate` (struct value, struct col)
→ `resolveStructType` → `reorderColumnsByName` with `NONE` mode. For field
`dn_i`: matched but types `(string, int)` ⇒ `checkField`'s `canWrite` fails,
`addError("Cannot safely cast string to int")`, returns `None`. The flatMap
drops it, `reordered.length=1 != expectedCols.length=2`, and HEAD's `else`
branch on line 372 throws — bypassing
`AssignmentUtils.alignUpdateAssignments`'s `errors.nonEmpty` check.
`assertAnalysisException` does substring matching, so it fails on "Cannot find
data" vs the expected "Cannot safely cast".
The commit message says MergeInto schema evolution suites and
`MergeIntoDataFrameSuite` nested struct tests pass; those don't exercise the
type-mismatch path that goes through `reorderColumnsByName`'s `addError`
accumulation.
Side effect: `resolveStructType` / `resolveArrayType` / `resolveMapType`
still declare `Option[NamedExpression]` but `None` is now unreachable.
`resolveUpdate`'s `.getOrElse(value)` is dead code and no longer protects MERGE
error reporting.
Two reasonable resolutions:
- **Restore** `enforceFullOutput` as in `0be4cc8d951` — surgical, MERGE
behavior preserved.
- **Commit to** the new MERGE behavior — but then it belongs in the PR
description (this is scope beyond "INSERT WITH SCHEMA EVOLUTION"), the
AlignMerge/AlignUpdate assertions need to be updated, and ideally the thrown
`Cannot find data` error should still carry the `addError` cast-failure detail
so MERGE users don't lose actionable error information.
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala:
##########
@@ -1298,4 +1318,712 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
assert(spark.table(t1).schema("id").dataType === IntegerType)
}
}
+
+ //
---------------------------------------------------------------------------
+ // Tests for source with fewer columns/fields than target
+ //
---------------------------------------------------------------------------
+
+ test("Insert schema evolution: source missing top-level column by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val schema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("salary", IntegerType),
+ StructField("dep", StringType)))
+ val data = Seq(Row(0, 100, "sales"))
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, spark.createDataFrame(spark.sparkContext.parallelize(data),
schema))
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, "engineering")).toDF("id", "dep"),
+ byName = true)
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, null, "engineering")))
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column by position")
{
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ // By position: source col 1 maps to target col 1, source col 2 maps to
target col 2,
+ // trailing target col 3 is filled with null.
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, 200)).toDF("id", "salary"))
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, 200, null)))
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column with DEFAULT
by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int DEFAULT 200, dep string)
USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, "engineering")).toDF("id", "dep"),
+ byName = true)
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, 200, "engineering")))
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column with DEFAULT
by position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string DEFAULT
'unknown') USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, 200)).toDF("id", "salary"))
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, 200, "unknown")))
+ }
+ }
+
+ test("Insert schema evolution: source missing nested struct field by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+ }
+ }
+
+ test("Insert schema evolution: source missing nested struct field by
position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+ }
+ }
+
+ test("Insert schema evolution: source missing field in struct nested in
array by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+ }
+ }
+
+ test("Insert schema evolution: source missing field in struct nested in
array by position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+ }
+ }
+
+ test("Insert schema evolution: source missing deeply nested struct field by
name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", BooleanType)))))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)))))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(20, Row(30))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, Row(30, null)))))
+ }
+ }
+
+ test("Insert schema evolution: source with null struct and missing nested
field by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", IntegerType))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, null))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+ }
+ }
+
+ test("Insert schema evolution: source with null struct and missing nested
field by position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", IntegerType))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, null))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+ }
+ }
+
+ test("Insert schema evolution: mixed null and non-null structs with missing
field by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")), Row(2,
null))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)), Row(2,
null)))
+ }
+ }
+
+ test("Insert schema evolution: null deeply nested struct with missing field
by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", BooleanType)))))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)))))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(20, null)))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, null))))
+ }
+ }
+
+ test("Insert schema evolution: null struct in array with missing field by
name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"a array<struct<c1:int,c2:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10), null)))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Seq(Row(1, true))), Row(1, Seq(Row(10, null), null))))
+ }
+ }
+
+ test("Insert schema evolution: source missing field in struct nested in map
value by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("m", MapType(StringType, StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("m", MapType(StringType, StructType(Seq(
+ StructField("c1", IntegerType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(
+ Row(0, Map("x" -> Row(1, true))),
+ Row(1, Map("y" -> Row(10, null)))))
+ }
+ }
+
+ test("Insert schema evolution: source missing field in struct nested in map
value by position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("m", MapType(StringType, StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("m", MapType(StringType, StructType(Seq(
+ StructField("c1", IntegerType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(
+ Row(0, Map("x" -> Row(1, true))),
+ Row(1, Map("y" -> Row(10, null)))))
+ }
+ }
+
+ test("Insert schema evolution: extra and missing top-level column by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ // Source has "active" (extra) but is missing "salary". Column count is
the same (3)
+ // but names differ; by-name resolution should add "active" via schema
evolution
+ // and fill "salary" with null.
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, "engineering", true)).toDF("id", "dep", "active"),
+ byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT id, salary, dep, active FROM $t1"),
+ Seq(Row(0, 100, "sales", null), Row(1, null, "engineering", true)))
+ }
+ }
+
+ test("Insert schema evolution: extra and missing nested struct field by
name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ // Source struct has "c1", "c2", "c4" (extra) but is missing "c3". Field
count is the same
+ // (3) but names differ; by-name resolution should add "c4" via schema
evolution and fill
+ // "c3" with null.
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c4", DoubleType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b", 3.14)))),
sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT id, s.c1, s.c2, s.c3, s.c4 FROM $t1"),
+ Seq(Row(0, 1, "a", true, null), Row(1, 10, "b", null, 3.14)))
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Negative tests: missing columns/fields should fail WITHOUT schema
evolution
+ //
---------------------------------------------------------------------------
+
+ test("Insert without evolution: source missing top-level column by name
fails") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ // Without explicit DEFAULT on `salary`, missing by-name data only
errors when null-fill
+ // for missing defaults is disabled; otherwise FILL mode inserts null
for `salary`.
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"false") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsertByName(t1, Seq((1, "engineering")).toDF("id", "dep"))
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`salary`")
+ )
+ }
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column by position
fails " +
+ "when null default disabled and column has no explicit DEFAULT") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"false") {
+ withInsertNestedTypeCoercion {
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, 200)).toDF("id", "salary"))
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`dep`")
+ )
+ }
+ }
+ }
+ }
+
+ test("Insert schema evolution: source missing nested struct field by
position fails " +
+ "when null default disabled") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"false") {
+ withInsertNestedTypeCoercion {
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`s`.`c3`")
+ )
+ }
+ }
+ }
+ }
+
+ test("Insert without evolution: source missing top-level column by position
fails") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsert(t1, Seq((1, 200)).toDF("id", "salary"))
+ },
+ condition = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "tableColumns" -> "`id`, `salary`, `dep`",
+ "dataColumns" -> "`id`, `salary`")
+ )
+ }
+ }
+
+ test("Insert without evolution: source missing nested struct field by name
fails") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ val ex = intercept[AnalysisException] {
+ doInsertByName(t1, sourceData)
+ }
+ assert(ex.getMessage.contains("Cannot find data"))
Review Comment:
The other negative tests added in this patch (e.g. lines 1879-1892,
1905-1918) use `checkError(condition =
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", parameters = Map(...))`, which
pins `tableName` and `colName` and would catch a future change that drops the
column path. Suggest aligning here (and at line 1989-1992 in the next test):
```suggestion
checkError(
exception = intercept[AnalysisException] {
doInsertByName(t1, sourceData)
},
condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
parameters = Map(
"tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
"colName" -> "`s`.`c3`")
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]