dtenedor commented on code in PR #36415:
URL: https://github.com/apache/spark/pull/36415#discussion_r866113324
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -131,11 +135,41 @@ case class ResolveDefaultColumns(
val expanded: Project =
addMissingDefaultValuesForInsertFromProject(
project, insertTableSchemaWithoutPartitionColumns)
- val replaced: LogicalPlan =
+ val replaced: Option[LogicalPlan] =
replaceExplicitDefaultValuesForInputOfInsertInto(
analyzer, insertTableSchemaWithoutPartitionColumns, expanded)
- .getOrElse(return i)
- regenerated.copy(query = replaced)
+ replaced.map { r =>
+ regenerated.copy(query = r)
+ }.getOrElse(i)
+ }
+
+ /**
+ * Resolves DEFAULT column references for an UPDATE command.
+ */
+ private def resolveDefaultColumnsForUpdate(u: UpdateTable): LogicalPlan = {
+ // Return a more descriptive error message if the user tries to use a
DEFAULT column reference
+ // inside an UPDATE command's WHERE clause; this is not allowed.
+ u.condition.map { c: Expression =>
+ if (c.find(isExplicitDefaultColumn).isDefined) {
+ throw
QueryCompilationErrors.defaultReferencesNotAllowedInUpdateWhereClause()
+ }
+ }
+ val schema: StructType = getSchemaForTargetTable(u.table).getOrElse(return
u)
Review Comment:
SG, done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -91,30 +94,31 @@ case class ResolveDefaultColumns(
* [[insertsFromInlineTable]] method.
*/
private def resolveDefaultColumnsForInsertFromInlineTable(i:
InsertIntoStatement): LogicalPlan = {
+ val insertTableSchemaWithoutPartitionColumns: StructType =
+ getInsertTableSchemaWithoutPartitionColumns(i)
+ .getOrElse(return i)
Review Comment:
SG, done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -131,11 +135,41 @@ case class ResolveDefaultColumns(
val expanded: Project =
addMissingDefaultValuesForInsertFromProject(
project, insertTableSchemaWithoutPartitionColumns)
- val replaced: LogicalPlan =
+ val replaced: Option[LogicalPlan] =
replaceExplicitDefaultValuesForInputOfInsertInto(
analyzer, insertTableSchemaWithoutPartitionColumns, expanded)
- .getOrElse(return i)
- regenerated.copy(query = replaced)
+ replaced.map { r =>
+ regenerated.copy(query = r)
+ }.getOrElse(i)
+ }
+
+ /**
+ * Resolves DEFAULT column references for an UPDATE command.
+ */
+ private def resolveDefaultColumnsForUpdate(u: UpdateTable): LogicalPlan = {
+ // Return a more descriptive error message if the user tries to use a
DEFAULT column reference
+ // inside an UPDATE command's WHERE clause; this is not allowed.
+ u.condition.map { c: Expression =>
Review Comment:
Done.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -1035,6 +1060,41 @@ class PlanResolutionSuite extends AnalysisTest {
case _ => fail("Expect UpdateTable, but got:\n" + parsed4.treeString)
}
+
+ parsed5 match {
+ case UpdateTable(
+ AsDataSourceV2Relation(_),
+ Seq(Assignment(name: UnresolvedAttribute,
UnresolvedAttribute(Seq("DEFAULT"))),
+ Assignment(age: UnresolvedAttribute,
UnresolvedAttribute(Seq("DEFAULT")))),
+ None) =>
+ assert(name.name == "name")
+ assert(age.name == "age")
+
+ case _ => fail("Expect UpdateTable, but got:\n" + parsed5.treeString)
+ }
+
+ parsed6 match {
+ case UpdateTable(
+ AsDataSourceV2Relation(_),
+ Seq(Assignment(i: AttributeReference, AnsiCast(Literal(null, _),
IntegerType, _)),
Review Comment:
Yes: when resolving DEFAULT column references, the analyzer will insert
literal NULL values if the corresponding table does not define an explicit
default value for that column. This is intended.
I left a comment here to this effect in case it's confusing to any future
readers.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -131,11 +135,41 @@ case class ResolveDefaultColumns(
val expanded: Project =
addMissingDefaultValuesForInsertFromProject(
project, insertTableSchemaWithoutPartitionColumns)
- val replaced: LogicalPlan =
+ val replaced: Option[LogicalPlan] =
replaceExplicitDefaultValuesForInputOfInsertInto(
analyzer, insertTableSchemaWithoutPartitionColumns, expanded)
- .getOrElse(return i)
- regenerated.copy(query = replaced)
+ replaced.map { r =>
+ regenerated.copy(query = r)
+ }.getOrElse(i)
+ }
+
+ /**
+ * Resolves DEFAULT column references for an UPDATE command.
+ */
+ private def resolveDefaultColumnsForUpdate(u: UpdateTable): LogicalPlan = {
+ // Return a more descriptive error message if the user tries to use a
DEFAULT column reference
+ // inside an UPDATE command's WHERE clause; this is not allowed.
+ u.condition.map { c: Expression =>
+ if (c.find(isExplicitDefaultColumn).isDefined) {
+ throw
QueryCompilationErrors.defaultReferencesNotAllowedInUpdateWhereClause()
+ }
+ }
+ val schema: StructType = getSchemaForTargetTable(u.table).getOrElse(return
u)
Review Comment:
SG, done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -91,30 +94,31 @@ case class ResolveDefaultColumns(
* [[insertsFromInlineTable]] method.
*/
private def resolveDefaultColumnsForInsertFromInlineTable(i:
InsertIntoStatement): LogicalPlan = {
+ val insertTableSchemaWithoutPartitionColumns: StructType =
Review Comment:
Reverted this part of the change.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -91,30 +94,31 @@ case class ResolveDefaultColumns(
* [[insertsFromInlineTable]] method.
*/
private def resolveDefaultColumnsForInsertFromInlineTable(i:
InsertIntoStatement): LogicalPlan = {
+ val insertTableSchemaWithoutPartitionColumns: StructType =
Review Comment:
Reverted this part of the change.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -91,30 +94,31 @@ case class ResolveDefaultColumns(
* [[insertsFromInlineTable]] method.
*/
private def resolveDefaultColumnsForInsertFromInlineTable(i:
InsertIntoStatement): LogicalPlan = {
+ val insertTableSchemaWithoutPartitionColumns: StructType =
+ getInsertTableSchemaWithoutPartitionColumns(i)
+ .getOrElse(return i)
Review Comment:
SG, done.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -1035,6 +1060,41 @@ class PlanResolutionSuite extends AnalysisTest {
case _ => fail("Expect UpdateTable, but got:\n" + parsed4.treeString)
}
+
+ parsed5 match {
+ case UpdateTable(
+ AsDataSourceV2Relation(_),
+ Seq(Assignment(name: UnresolvedAttribute,
UnresolvedAttribute(Seq("DEFAULT"))),
+ Assignment(age: UnresolvedAttribute,
UnresolvedAttribute(Seq("DEFAULT")))),
+ None) =>
+ assert(name.name == "name")
+ assert(age.name == "age")
+
+ case _ => fail("Expect UpdateTable, but got:\n" + parsed5.treeString)
+ }
+
+ parsed6 match {
+ case UpdateTable(
+ AsDataSourceV2Relation(_),
Review Comment:
Done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -131,11 +135,41 @@ case class ResolveDefaultColumns(
val expanded: Project =
addMissingDefaultValuesForInsertFromProject(
project, insertTableSchemaWithoutPartitionColumns)
- val replaced: LogicalPlan =
+ val replaced: Option[LogicalPlan] =
replaceExplicitDefaultValuesForInputOfInsertInto(
analyzer, insertTableSchemaWithoutPartitionColumns, expanded)
- .getOrElse(return i)
- regenerated.copy(query = replaced)
+ replaced.map { r =>
+ regenerated.copy(query = r)
+ }.getOrElse(i)
+ }
+
+ /**
+ * Resolves DEFAULT column references for an UPDATE command.
+ */
+ private def resolveDefaultColumnsForUpdate(u: UpdateTable): LogicalPlan = {
+ // Return a more descriptive error message if the user tries to use a
DEFAULT column reference
+ // inside an UPDATE command's WHERE clause; this is not allowed.
+ u.condition.map { c: Expression =>
Review Comment:
Done.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -1035,6 +1060,41 @@ class PlanResolutionSuite extends AnalysisTest {
case _ => fail("Expect UpdateTable, but got:\n" + parsed4.treeString)
}
+
+ parsed5 match {
+ case UpdateTable(
+ AsDataSourceV2Relation(_),
Review Comment:
Done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -98,44 +101,74 @@ case class ResolveDefaultColumns(
node = node.children(0)
}
val table = node.asInstanceOf[UnresolvedInlineTable]
- val insertTableSchemaWithoutPartitionColumns: StructType =
+ val insertTableSchemaWithoutPartitionColumns: Option[StructType] =
getInsertTableSchemaWithoutPartitionColumns(i)
- .getOrElse(return i)
- val regenerated: InsertIntoStatement =
- regenerateUserSpecifiedCols(i, insertTableSchemaWithoutPartitionColumns)
- val expanded: UnresolvedInlineTable =
- addMissingDefaultValuesForInsertFromInlineTable(
- table, insertTableSchemaWithoutPartitionColumns)
- val replaced: LogicalPlan =
- replaceExplicitDefaultValuesForInputOfInsertInto(
- analyzer, insertTableSchemaWithoutPartitionColumns, expanded)
- .getOrElse(return i)
- node = replaced
- for (child <- children.reverse) {
- node = child.withNewChildren(Seq(node))
- }
- regenerated.copy(query = node)
+ insertTableSchemaWithoutPartitionColumns.map { schema: StructType =>
+ val regenerated: InsertIntoStatement =
+ regenerateUserSpecifiedCols(i, schema)
+ val expanded: UnresolvedInlineTable =
+ addMissingDefaultValuesForInsertFromInlineTable(table, schema)
+ val replaced: Option[LogicalPlan] =
+ replaceExplicitDefaultValuesForInputOfInsertInto(analyzer, schema,
expanded)
+ replaced.map { r: LogicalPlan =>
+ node = r
+ for (child <- children.reverse) {
+ node = child.withNewChildren(Seq(node))
+ }
+ regenerated.copy(query = node)
+ }.getOrElse(i)
+ }.getOrElse(i)
}
/**
* Resolves DEFAULT column references for an INSERT INTO command whose query
is a general
* projection.
*/
private def resolveDefaultColumnsForInsertFromProject(i:
InsertIntoStatement): LogicalPlan = {
- val insertTableSchemaWithoutPartitionColumns: StructType =
+ val insertTableSchemaWithoutPartitionColumns: Option[StructType] =
getInsertTableSchemaWithoutPartitionColumns(i)
- .getOrElse(return i)
- val regenerated: InsertIntoStatement =
- regenerateUserSpecifiedCols(i, insertTableSchemaWithoutPartitionColumns)
- val project: Project = i.query.asInstanceOf[Project]
- val expanded: Project =
- addMissingDefaultValuesForInsertFromProject(
- project, insertTableSchemaWithoutPartitionColumns)
- val replaced: LogicalPlan =
- replaceExplicitDefaultValuesForInputOfInsertInto(
- analyzer, insertTableSchemaWithoutPartitionColumns, expanded)
- .getOrElse(return i)
- regenerated.copy(query = replaced)
+ insertTableSchemaWithoutPartitionColumns.map { schema =>
+ val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(i,
schema)
+ val project: Project = i.query.asInstanceOf[Project]
+ val expanded: Project =
+ addMissingDefaultValuesForInsertFromProject(project, schema)
+ val replaced: Option[LogicalPlan] =
+ replaceExplicitDefaultValuesForInputOfInsertInto(analyzer, schema,
expanded)
+ replaced.map { r =>
+ regenerated.copy(query = r)
+ }.getOrElse(i)
+ }.getOrElse(i)
+ }
+
+ /**
+ * Resolves DEFAULT column references for an UPDATE command.
+ */
+ private def resolveDefaultColumnsForUpdate(u: UpdateTable): LogicalPlan = {
+ // Return a more descriptive error message if the user tries to use a
DEFAULT column reference
+ // inside an UPDATE command's WHERE clause; this is not allowed.
+ u.condition.foreach { c: Expression =>
+ if (c.find(isExplicitDefaultColumn).isDefined) {
+ throw
QueryCompilationErrors.defaultReferencesNotAllowedInUpdateWhereClause()
Review Comment:
SG, done.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -1035,6 +1060,41 @@ class PlanResolutionSuite extends AnalysisTest {
case _ => fail("Expect UpdateTable, but got:\n" + parsed4.treeString)
}
+
+ parsed5 match {
+ case UpdateTable(
+ AsDataSourceV2Relation(_),
+ Seq(Assignment(name: UnresolvedAttribute,
UnresolvedAttribute(Seq("DEFAULT"))),
+ Assignment(age: UnresolvedAttribute,
UnresolvedAttribute(Seq("DEFAULT")))),
+ None) =>
+ assert(name.name == "name")
+ assert(age.name == "age")
+
+ case _ => fail("Expect UpdateTable, but got:\n" + parsed5.treeString)
+ }
+
+ parsed6 match {
+ case UpdateTable(
+ AsDataSourceV2Relation(_),
+ Seq(Assignment(i: AttributeReference, AnsiCast(Literal(null, _),
IntegerType, _)),
+ Assignment(s: AttributeReference, AnsiCast(Literal(null, _),
StringType, _))),
+ None) =>
+ assert(i.name == "i")
+ assert(s.name == "s")
+
+ case _ => fail("Expect UpdateTable, but got:\n" + parsed6.treeString)
+ }
+
+ parsed7 match {
+ case UpdateTable(
+ _,
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]