dtenedor commented on code in PR #36415:
URL: https://github.com/apache/spark/pull/36415#discussion_r866113324


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -131,11 +135,41 @@ case class ResolveDefaultColumns(
     val expanded: Project =
       addMissingDefaultValuesForInsertFromProject(
         project, insertTableSchemaWithoutPartitionColumns)
-    val replaced: LogicalPlan =
+    val replaced: Option[LogicalPlan] =
       replaceExplicitDefaultValuesForInputOfInsertInto(
         analyzer, insertTableSchemaWithoutPartitionColumns, expanded)
-        .getOrElse(return i)
-    regenerated.copy(query = replaced)
+    replaced.map { r =>
+      regenerated.copy(query = r)
+    }.getOrElse(i)
+  }
+
+  /**
+   * Resolves DEFAULT column references for an UPDATE command.
+   */
+  private def resolveDefaultColumnsForUpdate(u: UpdateTable): LogicalPlan = {
+    // Return a more descriptive error message if the user tries to use a 
DEFAULT column reference
+    // inside an UPDATE command's WHERE clause; this is not allowed.
+    u.condition.map { c: Expression =>
+      if (c.find(isExplicitDefaultColumn).isDefined) {
+        throw 
QueryCompilationErrors.defaultReferencesNotAllowedInUpdateWhereClause()
+      }
+    }
+    val schema: StructType = getSchemaForTargetTable(u.table).getOrElse(return 
u)

Review Comment:
   SG, done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -91,30 +94,31 @@ case class ResolveDefaultColumns(
    * [[insertsFromInlineTable]] method.
    */
   private def resolveDefaultColumnsForInsertFromInlineTable(i: 
InsertIntoStatement): LogicalPlan = {
+    val insertTableSchemaWithoutPartitionColumns: StructType =
+      getInsertTableSchemaWithoutPartitionColumns(i)
+        .getOrElse(return i)

Review Comment:
   SG, done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -131,11 +135,41 @@ case class ResolveDefaultColumns(
     val expanded: Project =
       addMissingDefaultValuesForInsertFromProject(
         project, insertTableSchemaWithoutPartitionColumns)
-    val replaced: LogicalPlan =
+    val replaced: Option[LogicalPlan] =
       replaceExplicitDefaultValuesForInputOfInsertInto(
         analyzer, insertTableSchemaWithoutPartitionColumns, expanded)
-        .getOrElse(return i)
-    regenerated.copy(query = replaced)
+    replaced.map { r =>
+      regenerated.copy(query = r)
+    }.getOrElse(i)
+  }
+
+  /**
+   * Resolves DEFAULT column references for an UPDATE command.
+   */
+  private def resolveDefaultColumnsForUpdate(u: UpdateTable): LogicalPlan = {
+    // Return a more descriptive error message if the user tries to use a 
DEFAULT column reference
+    // inside an UPDATE command's WHERE clause; this is not allowed.
+    u.condition.map { c: Expression =>

Review Comment:
   Done.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -1035,6 +1060,41 @@ class PlanResolutionSuite extends AnalysisTest {
 
         case _ => fail("Expect UpdateTable, but got:\n" + parsed4.treeString)
       }
+
+      parsed5 match {
+        case UpdateTable(
+        AsDataSourceV2Relation(_),
+        Seq(Assignment(name: UnresolvedAttribute, 
UnresolvedAttribute(Seq("DEFAULT"))),
+        Assignment(age: UnresolvedAttribute, 
UnresolvedAttribute(Seq("DEFAULT")))),
+        None) =>
+          assert(name.name == "name")
+          assert(age.name == "age")
+
+        case _ => fail("Expect UpdateTable, but got:\n" + parsed5.treeString)
+      }
+
+      parsed6 match {
+        case UpdateTable(
+        AsDataSourceV2Relation(_),
+        Seq(Assignment(i: AttributeReference, AnsiCast(Literal(null, _), 
IntegerType, _)),

Review Comment:
   Yes: when resolving DEFAULT column references, the analyzer will insert 
literal NULL values if the corresponding table does not define an explicit 
default value for that column. This is intended.
   
   I left a comment here to this effect in case it's confusing to any future 
readers.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -131,11 +135,41 @@ case class ResolveDefaultColumns(
     val expanded: Project =
       addMissingDefaultValuesForInsertFromProject(
         project, insertTableSchemaWithoutPartitionColumns)
-    val replaced: LogicalPlan =
+    val replaced: Option[LogicalPlan] =
       replaceExplicitDefaultValuesForInputOfInsertInto(
         analyzer, insertTableSchemaWithoutPartitionColumns, expanded)
-        .getOrElse(return i)
-    regenerated.copy(query = replaced)
+    replaced.map { r =>
+      regenerated.copy(query = r)
+    }.getOrElse(i)
+  }
+
+  /**
+   * Resolves DEFAULT column references for an UPDATE command.
+   */
+  private def resolveDefaultColumnsForUpdate(u: UpdateTable): LogicalPlan = {
+    // Return a more descriptive error message if the user tries to use a 
DEFAULT column reference
+    // inside an UPDATE command's WHERE clause; this is not allowed.
+    u.condition.map { c: Expression =>
+      if (c.find(isExplicitDefaultColumn).isDefined) {
+        throw 
QueryCompilationErrors.defaultReferencesNotAllowedInUpdateWhereClause()
+      }
+    }
+    val schema: StructType = getSchemaForTargetTable(u.table).getOrElse(return 
u)

Review Comment:
   SG, done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -91,30 +94,31 @@ case class ResolveDefaultColumns(
    * [[insertsFromInlineTable]] method.
    */
   private def resolveDefaultColumnsForInsertFromInlineTable(i: 
InsertIntoStatement): LogicalPlan = {
+    val insertTableSchemaWithoutPartitionColumns: StructType =

Review Comment:
   Reverted this part of the change.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -91,30 +94,31 @@ case class ResolveDefaultColumns(
    * [[insertsFromInlineTable]] method.
    */
   private def resolveDefaultColumnsForInsertFromInlineTable(i: 
InsertIntoStatement): LogicalPlan = {
+    val insertTableSchemaWithoutPartitionColumns: StructType =

Review Comment:
   Reverted this part of the change.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -91,30 +94,31 @@ case class ResolveDefaultColumns(
    * [[insertsFromInlineTable]] method.
    */
   private def resolveDefaultColumnsForInsertFromInlineTable(i: 
InsertIntoStatement): LogicalPlan = {
+    val insertTableSchemaWithoutPartitionColumns: StructType =
+      getInsertTableSchemaWithoutPartitionColumns(i)
+        .getOrElse(return i)

Review Comment:
   SG, done.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -1035,6 +1060,41 @@ class PlanResolutionSuite extends AnalysisTest {
 
         case _ => fail("Expect UpdateTable, but got:\n" + parsed4.treeString)
       }
+
+      parsed5 match {
+        case UpdateTable(
+        AsDataSourceV2Relation(_),
+        Seq(Assignment(name: UnresolvedAttribute, 
UnresolvedAttribute(Seq("DEFAULT"))),
+        Assignment(age: UnresolvedAttribute, 
UnresolvedAttribute(Seq("DEFAULT")))),
+        None) =>
+          assert(name.name == "name")
+          assert(age.name == "age")
+
+        case _ => fail("Expect UpdateTable, but got:\n" + parsed5.treeString)
+      }
+
+      parsed6 match {
+        case UpdateTable(
+        AsDataSourceV2Relation(_),

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -131,11 +135,41 @@ case class ResolveDefaultColumns(
     val expanded: Project =
       addMissingDefaultValuesForInsertFromProject(
         project, insertTableSchemaWithoutPartitionColumns)
-    val replaced: LogicalPlan =
+    val replaced: Option[LogicalPlan] =
       replaceExplicitDefaultValuesForInputOfInsertInto(
         analyzer, insertTableSchemaWithoutPartitionColumns, expanded)
-        .getOrElse(return i)
-    regenerated.copy(query = replaced)
+    replaced.map { r =>
+      regenerated.copy(query = r)
+    }.getOrElse(i)
+  }
+
+  /**
+   * Resolves DEFAULT column references for an UPDATE command.
+   */
+  private def resolveDefaultColumnsForUpdate(u: UpdateTable): LogicalPlan = {
+    // Return a more descriptive error message if the user tries to use a 
DEFAULT column reference
+    // inside an UPDATE command's WHERE clause; this is not allowed.
+    u.condition.map { c: Expression =>

Review Comment:
   Done.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -1035,6 +1060,41 @@ class PlanResolutionSuite extends AnalysisTest {
 
         case _ => fail("Expect UpdateTable, but got:\n" + parsed4.treeString)
       }
+
+      parsed5 match {
+        case UpdateTable(
+        AsDataSourceV2Relation(_),

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -98,44 +101,74 @@ case class ResolveDefaultColumns(
       node = node.children(0)
     }
     val table = node.asInstanceOf[UnresolvedInlineTable]
-    val insertTableSchemaWithoutPartitionColumns: StructType =
+    val insertTableSchemaWithoutPartitionColumns: Option[StructType] =
       getInsertTableSchemaWithoutPartitionColumns(i)
-        .getOrElse(return i)
-    val regenerated: InsertIntoStatement =
-      regenerateUserSpecifiedCols(i, insertTableSchemaWithoutPartitionColumns)
-    val expanded: UnresolvedInlineTable =
-      addMissingDefaultValuesForInsertFromInlineTable(
-        table, insertTableSchemaWithoutPartitionColumns)
-    val replaced: LogicalPlan =
-      replaceExplicitDefaultValuesForInputOfInsertInto(
-        analyzer, insertTableSchemaWithoutPartitionColumns, expanded)
-        .getOrElse(return i)
-    node = replaced
-    for (child <- children.reverse) {
-      node = child.withNewChildren(Seq(node))
-    }
-    regenerated.copy(query = node)
+    insertTableSchemaWithoutPartitionColumns.map { schema: StructType =>
+      val regenerated: InsertIntoStatement =
+        regenerateUserSpecifiedCols(i, schema)
+      val expanded: UnresolvedInlineTable =
+        addMissingDefaultValuesForInsertFromInlineTable(table, schema)
+      val replaced: Option[LogicalPlan] =
+        replaceExplicitDefaultValuesForInputOfInsertInto(analyzer, schema, 
expanded)
+      replaced.map { r: LogicalPlan =>
+        node = r
+        for (child <- children.reverse) {
+          node = child.withNewChildren(Seq(node))
+        }
+        regenerated.copy(query = node)
+      }.getOrElse(i)
+    }.getOrElse(i)
   }
 
   /**
    * Resolves DEFAULT column references for an INSERT INTO command whose query 
is a general
    * projection.
    */
   private def resolveDefaultColumnsForInsertFromProject(i: 
InsertIntoStatement): LogicalPlan = {
-    val insertTableSchemaWithoutPartitionColumns: StructType =
+    val insertTableSchemaWithoutPartitionColumns: Option[StructType] =
       getInsertTableSchemaWithoutPartitionColumns(i)
-        .getOrElse(return i)
-    val regenerated: InsertIntoStatement =
-      regenerateUserSpecifiedCols(i, insertTableSchemaWithoutPartitionColumns)
-    val project: Project = i.query.asInstanceOf[Project]
-    val expanded: Project =
-      addMissingDefaultValuesForInsertFromProject(
-        project, insertTableSchemaWithoutPartitionColumns)
-    val replaced: LogicalPlan =
-      replaceExplicitDefaultValuesForInputOfInsertInto(
-        analyzer, insertTableSchemaWithoutPartitionColumns, expanded)
-        .getOrElse(return i)
-    regenerated.copy(query = replaced)
+    insertTableSchemaWithoutPartitionColumns.map { schema =>
+      val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(i, 
schema)
+      val project: Project = i.query.asInstanceOf[Project]
+      val expanded: Project =
+        addMissingDefaultValuesForInsertFromProject(project, schema)
+      val replaced: Option[LogicalPlan] =
+        replaceExplicitDefaultValuesForInputOfInsertInto(analyzer, schema, 
expanded)
+      replaced.map { r =>
+        regenerated.copy(query = r)
+      }.getOrElse(i)
+    }.getOrElse(i)
+  }
+
+  /**
+   * Resolves DEFAULT column references for an UPDATE command.
+   */
+  private def resolveDefaultColumnsForUpdate(u: UpdateTable): LogicalPlan = {
+    // Return a more descriptive error message if the user tries to use a 
DEFAULT column reference
+    // inside an UPDATE command's WHERE clause; this is not allowed.
+    u.condition.foreach { c: Expression =>
+      if (c.find(isExplicitDefaultColumn).isDefined) {
+        throw 
QueryCompilationErrors.defaultReferencesNotAllowedInUpdateWhereClause()

Review Comment:
   SG, done.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -1035,6 +1060,41 @@ class PlanResolutionSuite extends AnalysisTest {
 
         case _ => fail("Expect UpdateTable, but got:\n" + parsed4.treeString)
       }
+
+      parsed5 match {
+        case UpdateTable(
+        AsDataSourceV2Relation(_),
+        Seq(Assignment(name: UnresolvedAttribute, 
UnresolvedAttribute(Seq("DEFAULT"))),
+        Assignment(age: UnresolvedAttribute, 
UnresolvedAttribute(Seq("DEFAULT")))),
+        None) =>
+          assert(name.name == "name")
+          assert(age.name == "age")
+
+        case _ => fail("Expect UpdateTable, but got:\n" + parsed5.treeString)
+      }
+
+      parsed6 match {
+        case UpdateTable(
+        AsDataSourceV2Relation(_),
+        Seq(Assignment(i: AttributeReference, AnsiCast(Literal(null, _), 
IntegerType, _)),
+        Assignment(s: AttributeReference, AnsiCast(Literal(null, _), 
StringType, _))),
+        None) =>
+          assert(i.name == "i")
+          assert(s.name == "s")
+
+        case _ => fail("Expect UpdateTable, but got:\n" + parsed6.treeString)
+      }
+
+      parsed7 match {
+        case UpdateTable(
+        _,

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to