MaxGekk commented on code in PR #39937:
URL: https://github.com/apache/spark/pull/39937#discussion_r1160535300


##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -776,38 +808,62 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
       SQLConf.STORE_ASSIGNMENT_POLICY.key -> 
SQLConf.StoreAssignmentPolicy.ANSI.toString) {
       withTable("t") {
         sql("CREATE TABLE t(i int, t timestamp) USING parquet")
-        val msg = intercept[AnalysisException] {
-          sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), 1)")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'i': timestamp to int"))
-        assert(msg.contains("Cannot safely cast 't': int to timestamp"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), 1)")
+          },
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "colPath" -> "`i`",
+            "from" -> "\"TIMESTAMP\"",
+            "to" -> "\"INT\"")
+        )
       }
 
       withTable("t") {
         sql("CREATE TABLE t(i int, d date) USING parquet")
-        val msg = intercept[AnalysisException] {
-          sql("INSERT INTO t VALUES (date('2010-09-02'), 1)")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'i': date to int"))
-        assert(msg.contains("Cannot safely cast 'd': int to date"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("INSERT INTO t VALUES (date('2010-09-02'), 1)")
+          },
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "colPath" -> "`i`",
+            "from" -> "\"DATE\"",
+            "to" -> "\"INT\"")
+        )
       }
 
       withTable("t") {
         sql("CREATE TABLE t(b boolean, t timestamp) USING parquet")
-        val msg = intercept[AnalysisException] {
-          sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), true)")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'b': timestamp to boolean"))
-        assert(msg.contains("Cannot safely cast 't': boolean to timestamp"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), 
true)")
+          },
+            errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+            parameters = Map(
+        "tableName" -> "`spark_catalog`.`default`.`t`",
+        "colPath" -> "`b`",
+        "from" -> "\"TIMESTAMP\"",
+        "to" -> "\"BOOLEAN\"")

Review Comment:
   Please, fix indentation.



##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -776,38 +808,62 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
       SQLConf.STORE_ASSIGNMENT_POLICY.key -> 
SQLConf.StoreAssignmentPolicy.ANSI.toString) {
       withTable("t") {
         sql("CREATE TABLE t(i int, t timestamp) USING parquet")
-        val msg = intercept[AnalysisException] {
-          sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), 1)")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'i': timestamp to int"))
-        assert(msg.contains("Cannot safely cast 't': int to timestamp"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), 1)")
+          },
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "colPath" -> "`i`",
+            "from" -> "\"TIMESTAMP\"",
+            "to" -> "\"INT\"")
+        )
       }
 
       withTable("t") {
         sql("CREATE TABLE t(i int, d date) USING parquet")
-        val msg = intercept[AnalysisException] {
-          sql("INSERT INTO t VALUES (date('2010-09-02'), 1)")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'i': date to int"))
-        assert(msg.contains("Cannot safely cast 'd': int to date"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("INSERT INTO t VALUES (date('2010-09-02'), 1)")
+          },
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "colPath" -> "`i`",
+            "from" -> "\"DATE\"",
+            "to" -> "\"INT\"")
+        )
       }
 
       withTable("t") {
         sql("CREATE TABLE t(b boolean, t timestamp) USING parquet")
-        val msg = intercept[AnalysisException] {
-          sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), true)")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'b': timestamp to boolean"))
-        assert(msg.contains("Cannot safely cast 't': boolean to timestamp"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), 
true)")
+          },
+            errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+            parameters = Map(
+        "tableName" -> "`spark_catalog`.`default`.`t`",
+        "colPath" -> "`b`",
+        "from" -> "\"TIMESTAMP\"",
+        "to" -> "\"BOOLEAN\"")
+        )
       }
 
       withTable("t") {
         sql("CREATE TABLE t(b boolean, d date) USING parquet")
-        val msg = intercept[AnalysisException] {
-          sql("INSERT INTO t VALUES (date('2010-09-02'), true)")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'b': date to boolean"))
-        assert(msg.contains("Cannot safely cast 'd': boolean to date"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("INSERT INTO t VALUES (date('2010-09-02'), true)")
+          },
+            errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+            parameters = Map(
+        "tableName" -> "`spark_catalog`.`default`.`t`",
+        "colPath" -> "`b`",
+        "from" -> "\"DATE\"",
+        "to" -> "\"BOOLEAN\"")

Review Comment:
   Fix indentation here, please.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala:
##########
@@ -32,56 +34,94 @@ class StrictDataTypeWriteCompatibilitySuite extends 
DataTypeWriteCompatibilityBa
   override def canCast: (DataType, DataType) => Boolean = Cast.canUpCast
 
   test("Check struct types: unsafe casts are not allowed") {
-    assertNumErrors(widerPoint2, point2, "t",
-      "Should fail because types require unsafe casts", 2) { errs =>
-
-      assert(errs(0).contains("'t.x'"), "Should include the nested field name 
context")
-      assert(errs(0).contains("Cannot safely cast"))
-
-      assert(errs(1).contains("'t.y'"), "Should include the nested field name 
context")
-      assert(errs(1).contains("Cannot safely cast"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataType.canWrite("", widerPoint2, point2, true,
+          analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colPath" -> "`t`.`x`",
+        "from" -> "\"DOUBLE\"",
+        "to" -> "\"FLOAT\"")
+    )
   }
 
   test("Check array types: unsafe casts are not allowed") {
     val arrayOfLong = ArrayType(LongType)
     val arrayOfInt = ArrayType(IntegerType)
 
-    assertSingleError(arrayOfLong, arrayOfInt, "arr",
-      "Should not allow array of longs to array of ints") { err =>
-      assert(err.contains("'arr.element'"),
-        "Should identify problem with named array's element type")
-      assert(err.contains("Cannot safely cast"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataType.canWrite("", arrayOfLong, arrayOfInt, true,
+          analysis.caseSensitiveResolution, "arr", storeAssignmentPolicy, 
errMsg => errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colPath" -> "`arr`.`element`",
+        "from" -> "\"BIGINT\"",
+        "to" -> "\"INT\"")
+    )
   }
 
   test("Check map value types: casting Long to Integer is not allowed") {
     val mapOfLong = MapType(StringType, LongType)
     val mapOfInt = MapType(StringType, IntegerType)
 
-    assertSingleError(mapOfLong, mapOfInt, "m",
-      "Should not allow map of longs to map of ints") { err =>
-      assert(err.contains("'m.value'"), "Should identify problem with named 
map's value type")
-      assert(err.contains("Cannot safely cast"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataType.canWrite("", mapOfLong, mapOfInt, true,
+          analysis.caseSensitiveResolution, "m", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colPath" -> "`m`.`value`",
+        "from" -> "\"BIGINT\"",
+        "to" -> "\"INT\"")
+    )
   }
 
   test("Check map key types: unsafe casts are not allowed") {
     val mapKeyLong = MapType(LongType, StringType)
     val mapKeyInt = MapType(IntegerType, StringType)
 
-    assertSingleError(mapKeyLong, mapKeyInt, "m",
-      "Should not allow map of long keys to map of int keys") { err =>
-      assert(err.contains("'m.key'"), "Should identify problem with named 
map's key type")
-      assert(err.contains("Cannot safely cast"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataType.canWrite("", mapKeyLong, mapKeyInt, true,
+          analysis.caseSensitiveResolution, "m", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colPath" -> "`m`.`key`",
+        "from" -> "\"BIGINT\"",
+        "to" -> "\"INT\"")
+    )
   }
 
   test("Check NullType is incompatible with all other types") {
     allNonNullTypes.foreach { t =>
-      assertSingleError(NullType, t, "nulls", s"Should not allow writing None 
to type $t") { err =>
-        assert(err.contains(s"incompatible with ${t.catalogString}"))
-      }
+      val errs = new mutable.ArrayBuffer[String]()
+      checkError(
+        exception = intercept[AnalysisException] (
+          DataType.canWrite("", NullType, t, true,
+            analysis.caseSensitiveResolution, "nulls", storeAssignmentPolicy,
+            errMsg => errs += errMsg)
+        ),
+        errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+        parameters = Map(
+          "tableName" -> "``",
+          "colPath" -> "`nulls`",
+          "from" -> "\"VOID\"",
+          "to" -> toSQLType(t.catalogString))

Review Comment:
   ```suggestion
             "to" -> toSQLType(t))
   ```



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala:
##########
@@ -32,56 +34,94 @@ class StrictDataTypeWriteCompatibilitySuite extends 
DataTypeWriteCompatibilityBa
   override def canCast: (DataType, DataType) => Boolean = Cast.canUpCast
 
   test("Check struct types: unsafe casts are not allowed") {
-    assertNumErrors(widerPoint2, point2, "t",
-      "Should fail because types require unsafe casts", 2) { errs =>
-
-      assert(errs(0).contains("'t.x'"), "Should include the nested field name 
context")
-      assert(errs(0).contains("Cannot safely cast"))
-
-      assert(errs(1).contains("'t.y'"), "Should include the nested field name 
context")
-      assert(errs(1).contains("Cannot safely cast"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataType.canWrite("", widerPoint2, point2, true,
+          analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",

Review Comment:
   Why is it empty? Can't you set it properly?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -137,35 +140,37 @@ object TableOutputResolver {
 
     if (inputCols.size > expectedCols.size) {
       val extraColsStr = inputCols.takeRight(inputCols.size - 
expectedCols.size)
-        .map(col => s"'${col.name}'")
+        .map(col => s"${toSQLId(col.name)}")
         .mkString(", ")
-      addError(s"Cannot write extra fields to struct '${colPath.quoted}': 
$extraColsStr")
-      return Nil
+      throw 
QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError(
+        tableName, colPath.quoted, extraColsStr
+      )
     } else if (inputCols.size < expectedCols.size) {
       val missingColsStr = expectedCols.takeRight(expectedCols.size - 
inputCols.size)
-        .map(col => s"'${col.name}'")
+        .map(col => s"${toSQLId(col.name)}")

Review Comment:
   ```suggestion
           .map(col => toSQLId(col.name))
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -137,35 +140,37 @@ object TableOutputResolver {
 
     if (inputCols.size > expectedCols.size) {
       val extraColsStr = inputCols.takeRight(inputCols.size - 
expectedCols.size)
-        .map(col => s"'${col.name}'")
+        .map(col => s"${toSQLId(col.name)}")

Review Comment:
   ```suggestion
           .map(col => toSQLId(col.name))
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:
##########
@@ -485,6 +485,7 @@ object DataType {
    * @return true if data written with the write type can be read using the 
read type
    */
   def canWrite(
+      tableName: String,

Review Comment:
   No



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -83,11 +82,13 @@ object TableOutputResolver {
       val matched = inputCols.filter(col => conf.resolver(col.name, 
expectedCol.name))
       val newColPath = colPath :+ expectedCol.name
       if (matched.isEmpty) {
-        addError(s"Cannot find data for output column '${newColPath.quoted}'")
-        None
+        throw 
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(

Review Comment:
   Let's remove `addError`, and raise the first exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to