dtenedor commented on code in PR #40996:
URL: https://github.com/apache/spark/pull/40996#discussion_r1183114221


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala:
##########
@@ -218,10 +219,29 @@ object ResolveDefaultColumns {
     } else if (Cast.canUpCast(analyzed.dataType, dataType)) {
       Cast(analyzed, dataType)
     } else {
-      throw new AnalysisException(
-        s"Failed to execute $statementType command because the destination 
table column " +
-          s"$colName has a DEFAULT value with type $dataType, but the " +
-          s"statement provided a value of incompatible type 
${analyzed.dataType}")
+      // If the provided default value is a literal of a wider type than the 
target column, but the
+      // literal value fits within the narrower type, just coerce it for 
convenience.
+      val result = if (analyzed.isInstanceOf[Literal] &&
+        !Seq(dataType, analyzed.dataType).exists(_ == BooleanType)) {

Review Comment:
   Good suggestion, done.



##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -1096,7 +1096,7 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     withTable("t") {
       assert(intercept[AnalysisException] {
         sql("insert into t values(false, default)")
-      }.getMessage.contains(Errors.COLUMN_DEFAULT_NOT_FOUND))
+      }.getMessage.contains(Errors.TARGET_TABLE_NOT_FOUND))

Review Comment:
   Good question, the only test cases that checks for this error message is:
   
   ```
       // Explicit default values have a reasonable error path if the table is 
not found.
       withTable("t") {
         assert(intercept[AnalysisException] {
           sql("insert into t values(false, default)")
         }.getMessage.contains(Errors.TARGET_TABLE_NOT_FOUND))
       }
   ```
   
   My hunch is the change is because of the new `normalizeFieldName` calls 
added. In any case, the reported error message is accurate.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala:
##########
@@ -218,10 +219,29 @@ object ResolveDefaultColumns {
     } else if (Cast.canUpCast(analyzed.dataType, dataType)) {
       Cast(analyzed, dataType)
     } else {
-      throw new AnalysisException(
-        s"Failed to execute $statementType command because the destination 
table column " +
-          s"$colName has a DEFAULT value with type $dataType, but the " +
-          s"statement provided a value of incompatible type 
${analyzed.dataType}")
+      // If the provided default value is a literal of a wider type than the 
target column, but the
+      // literal value fits within the narrower type, just coerce it for 
convenience.
+      val result = if (analyzed.isInstanceOf[Literal] &&
+        !Seq(dataType, analyzed.dataType).exists(_ == BooleanType)) {
+        try {
+          val casted = Cast(analyzed, dataType, evalMode = EvalMode.TRY).eval()
+          if (casted != null) {
+            Some(Literal(casted))
+          } else {
+            None

Review Comment:
   Yes, I added it in `ResolveDefaultColumnsSuite`. I added a few more types to 
the test case just now.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala:
##########
@@ -218,10 +219,29 @@ object ResolveDefaultColumns {
     } else if (Cast.canUpCast(analyzed.dataType, dataType)) {
       Cast(analyzed, dataType)
     } else {
-      throw new AnalysisException(
-        s"Failed to execute $statementType command because the destination 
table column " +
-          s"$colName has a DEFAULT value with type $dataType, but the " +
-          s"statement provided a value of incompatible type 
${analyzed.dataType}")
+      // If the provided default value is a literal of a wider type than the 
target column, but the
+      // literal value fits within the narrower type, just coerce it for 
convenience.
+      val result = if (analyzed.isInstanceOf[Literal] &&
+        !Seq(dataType, analyzed.dataType).exists(_ == BooleanType)) {
+        try {
+          val casted = Cast(analyzed, dataType, evalMode = EvalMode.TRY).eval()
+          if (casted != null) {
+            Some(Literal(casted))
+          } else {
+            None
+          }
+        } catch {
+          case _: SparkThrowable | _: RuntimeException => None

Review Comment:
   Good Q, I initially did not include this `try...cast`  block, but found that 
`checkInputDataTypes` can still fail even for `TryCast` sometimes which results 
in an exception. For example, if the target column type is `timestamp` (not 
`timestamp_ntz`) and the provided default value is `timestamp'2021-01-01'`, 
then we get an internal exception because there is no provided timezone (I 
added a test case for this). So I found I needed to add it here.



##########
sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala:
##########
@@ -91,6 +96,120 @@ class ResolveDefaultColumnsSuite extends QueryTest with 
SharedSparkSession {
         checkAnswer(spark.table("demos.test_ts"),
           sql("select null, timestamp'2023-01-01'"))
       }
+      withTable("demos.test_ts") {
+        // If the provided default value is a literal of a wider type than the 
target column, but
+        // the literal value fits within the narrower type, just coerce it for 
convenience.
+        sql("create table demos.test_ts (id int default 42L) using parquet")
+        sql("insert into demos.test_ts(id) values (default)")
+        checkAnswer(spark.table("demos.test_ts"),
+          sql("select 42"))
+        // If the provided default value is a literal of a completely 
different type than the target
+        // column such that no coercion is possible, throw an error.
+        assert(intercept[AnalysisException](
+          sql("create table demos.test_ts_other (id int default 'abc') using 
parquet"))
+        .getMessage.contains("statement provided a value of incompatible 
type"))
+      }
+    }
+  }
+
+  test("SPARK-43313: Add missing default values for MERGE INSERT actions") {
+    // Create a new relation type that defines the 'customSchemaForInserts' 
method.
+    // This implementation drops the last table column as it represents an 
internal pseudocolumn.
+    case class TableWithCustomInsertSchema(output: Seq[Attribute], 
numMetadataColumns: Int)

Review Comment:
   Done, and same for the `relationWithCustomInsertSchema` helper method as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to