dtenedor commented on code in PR #36077:
URL: https://github.com/apache/spark/pull/36077#discussion_r846343302
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3398,6 +3399,8 @@ class Analyzer(override val catalogManager:
CatalogManager)
SchemaUtils.checkColumnNameDuplication(
i.userSpecifiedCols, "in the column list", resolver)
+ i.setTagValue(USER_SPECIFIED_COLUMNS_RESOLVED, true)
Review Comment:
Sure, NP. Removed the tree tag.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -53,25 +53,55 @@ case class ResolveDefaultColumns(
// This field stores the enclosing INSERT INTO command, once we find one.
var enclosingInsert: Option[InsertIntoStatement] = None
+ // This field stores the schema of the target table of the above command.
+ var insertTableSchemaWithoutPartitionColumns: Option[StructType] = None
- override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsWithPruning(
- (_ => SQLConf.get.enableDefaultColumns), ruleId) {
- case i@InsertIntoStatement(_, _, _, _, _, _)
- if i.query.collectFirst { case u: UnresolvedInlineTable => u }.isDefined
=>
- enclosingInsert = Some(i)
- i
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ // Initialize by clearing our reference to the enclosing INSERT INTO
command.
+ enclosingInsert = None
+ insertTableSchemaWithoutPartitionColumns = None
+ // Traverse the logical query plan in preorder (top-down).
+ plan.resolveOperatorsWithPruning(
+ (_ => SQLConf.get.enableDefaultColumns), ruleId) {
+ case i@InsertIntoStatement(_, _, _, _, _, _)
+ if i.query.collectFirst { case u: UnresolvedInlineTable => u
}.isDefined &&
+ !i.getTagValue(USER_SPECIFIED_COLUMNS_RESOLVED).getOrElse(false) =>
+ enclosingInsert = Some(i)
+ insertTableSchemaWithoutPartitionColumns =
getInsertTableSchemaWithoutPartitionColumns
+ val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(i)
+ regenerated
+
+ case table: UnresolvedInlineTable
+ if enclosingInsert.isDefined &&
+ table.rows.nonEmpty && table.rows.forall(_.size ==
table.rows(0).size) =>
+ val expanded: UnresolvedInlineTable =
addMissingDefaultColumnValues(table).getOrElse(table)
+ val replaced: LogicalPlan =
+ replaceExplicitDefaultColumnValues(analyzer,
expanded).getOrElse(table)
+ replaced
+
+ case i@InsertIntoStatement(_, _, _, project: Project, _, _)
+ if !i.getTagValue(USER_SPECIFIED_COLUMNS_RESOLVED).getOrElse(false) =>
+ enclosingInsert = Some(i)
+ insertTableSchemaWithoutPartitionColumns =
getInsertTableSchemaWithoutPartitionColumns
+ val expanded: Project =
addMissingDefaultColumnValues(project).getOrElse(project)
+ val replaced: Option[LogicalPlan] =
replaceExplicitDefaultColumnValues(analyzer, expanded)
+ val updated: InsertIntoStatement =
+ if (replaced.isDefined) i.copy(query = replaced.get) else i
+ val regenerated: InsertIntoStatement =
regenerateUserSpecifiedCols(updated)
+ regenerated
+ }
+ }
- case table: UnresolvedInlineTable
- if enclosingInsert.isDefined &&
- table.rows.nonEmpty && table.rows.forall(_.size == table.rows(0).size)
=>
- val expanded: UnresolvedInlineTable =
addMissingDefaultColumnValues(table).getOrElse(table)
- replaceExplicitDefaultColumnValues(analyzer, expanded).getOrElse(table)
-
- case i@InsertIntoStatement(_, _, _, project: Project, _, _) =>
- enclosingInsert = Some(i)
- val expanded: Project =
addMissingDefaultColumnValues(project).getOrElse(project)
- val replaced: Option[LogicalPlan] =
replaceExplicitDefaultColumnValues(analyzer, expanded)
- if (replaced.isDefined) i.copy(query = replaced.get) else i
+ // Helper method to regenerate user-specified columns of an
InsertIntoStatement based on the names
+ // in the insertTableSchemaWithoutPartitionColumns field of this class.
+ private def regenerateUserSpecifiedCols(i: InsertIntoStatement):
InsertIntoStatement = {
+ insertTableSchemaWithoutPartitionColumns =
getInsertTableSchemaWithoutPartitionColumns
Review Comment:
Good point, removed this.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala:
##########
@@ -60,6 +61,9 @@ object ResolveDefaultColumns {
val DEFAULTS_IN_EXPRESSIONS_ERROR = "Failed to execute INSERT INTO command
because the " +
"VALUES list contains a DEFAULT column reference as part of another
expression; this is " +
"not allowed"
+ // This is a TreeNodeTag indicating that the ResolveUserSpecifiedColumns
rule has applied on a
+ // particular InsertIntoStatement of interest.
+ val USER_SPECIFIED_COLUMNS_RESOLVED =
TreeNodeTag[Boolean]("UserSpecifiedColumnsResolved")
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]