imback82 commented on a change in pull request #33200:
URL: https://github.com/apache/spark/pull/33200#discussion_r674350772
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3574,18 +3568,67 @@ class Analyzer(override val catalogManager:
CatalogManager)
/**
* Rule to mostly resolve, normalize and rewrite column names based on case
sensitivity
- * for alter table commands.
+ * for alter table column commands.
*/
- object ResolveAlterTableCommands extends Rule[LogicalPlan] {
+ object ResolveAlterTableColumnCommands extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
- case a: AlterTableCommand if a.table.resolved &&
hasUnresolvedFieldName(a) =>
+ case a: AlterTableColumnCommand if a.table.resolved &&
hasUnresolvedFieldName(a) =>
val table = a.table.asInstanceOf[ResolvedTable]
a.transformExpressions {
- case u: UnresolvedFieldName => resolveFieldNames(table, u.name, u)
+ case u: UnresolvedFieldName => resolveFieldNames(table, u.name,
u.origin)
+ }
+
+ case a @ AlterTableAddColumns(r: ResolvedTable, cols) if
hasUnresolvedColumns(cols) =>
+ // 'colsToAdd' keeps track of new columns being added. It stores a
mapping from a
+ // normalized parent name of fields to field names that belong to the
parent.
+ // For example, if we add columns "a.b.c", "a.b.d", and "a.c",
'colsToAdd' will become
+ // Map(Seq("a", "b") -> Seq("c", "d"), Seq("a") -> Seq("c")).
+ val colsToAdd = mutable.Map.empty[Seq[String], Seq[String]]
+ def addColumn(
+ col: QualifiedColType,
+ parent: ResolvedFieldName): QualifiedColType = {
+ val parentSchema = parent.field.dataType match {
+ case s: StructType => s
+ case _ => throw QueryCompilationErrors.invalidFieldName(
+ col.name, col.path.name, col.path.origin)
+ }
+ val fieldsAdded = colsToAdd.getOrElse(parent.name, Nil)
+ val resolvedPosition = col.position.map {
+ case u: UnresolvedFieldPosition => u.position match {
+ case after: After =>
+ val allFields = parentSchema.fieldNames ++ fieldsAdded
+ allFields.find(n => conf.resolver(n, after.column())) match {
+ case Some(colName) =>
+ ResolvedFieldPosition(ColumnPosition.after(colName))
+ case None =>
+ val parentName = if (parent.isRoot) "root" else
parent.name.quoted
Review comment:
Before `parentName` was the unresolved name `col.path.name.quoted`, let
me know if this needs to be retained.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
##########
@@ -1099,21 +1066,83 @@ case class UnsetTableProperties(
copy(table = newChild)
}
-trait AlterTableCommand extends UnaryCommand {
+trait AlterTableColumnCommand extends UnaryCommand {
def table: LogicalPlan
- def operation: String
def changes: Seq[TableChange]
override def child: LogicalPlan = table
}
+/**
+ * The logical plan of the ALTER TABLE ... ADD COLUMNS command.
+ */
+case class AlterTableAddColumns(
+ table: LogicalPlan,
+ columnsToAdd: Seq[QualifiedColType]) extends AlterTableColumnCommand {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Util._
+ columnsToAdd.foreach { c =>
+ failNullType(c.dataType)
+ TypeUtils.failWithIntervalType(c.dataType)
+ }
+
+ override def changes: Seq[TableChange] = {
+ columnsToAdd.map { col =>
+ require(col.position.forall(_.resolved),
+ "FieldPosition should be resolved before it's converted to
TableChange.")
+ TableChange.addColumn(
+ col.name.toArray,
+ col.dataType,
+ col.nullable,
+ col.comment.orNull,
+ col.position.map(_.position).orNull)
+ }
+ }
+
+ override protected def withNewChildInternal(newChild: LogicalPlan):
LogicalPlan =
+ copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... REPLACE COLUMNS command.
+ */
+case class AlterTableReplaceColumns(
+ table: LogicalPlan,
+ columnsToAdd: Seq[QualifiedColType]) extends AlterTableColumnCommand {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Util._
+ columnsToAdd.foreach { c =>
+ failNullType(c.dataType)
+ TypeUtils.failWithIntervalType(c.dataType)
+ }
+
+ override def changes: Seq[TableChange] = {
+ // REPLACE COLUMNS deletes all the existing columns and adds new columns
specified.
+ require(table.resolved)
+ val deleteChanges = table.schema.fieldNames.map { name =>
+ TableChange.deleteColumn(Array(name))
+ }
+ val addChanges = columnsToAdd.map { col =>
+ // Cannot add nested columns when replacing columns.
+ assert(col.path.name.isEmpty)
+ assert(col.position.isEmpty)
+ TableChange.addColumn(
+ Array(col.colName),
Review comment:
Note that `col.path` is not being used.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
##########
@@ -163,8 +163,12 @@ case class ResolvedPartitionSpec(
ident: InternalRow,
location: Option[String] = None) extends PartitionSpec
-case class ResolvedFieldName(path: Seq[String], field: StructField) extends
FieldName {
- def name: Seq[String] = path :+ field.name
+case class ResolvedFieldName(
+ path: Seq[String],
+ field: StructField,
+ isRoot: Boolean = false) extends FieldName {
+ require(!isRoot || path.isEmpty, "path should be empty if the field is a
root.")
+ def name: Seq[String] = if (isRoot) Nil else path :+ field.name
Review comment:
This is needed so that `ResolvedFieldName.name` doesn't return "root" as
name.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
##########
@@ -1099,21 +1066,83 @@ case class UnsetTableProperties(
copy(table = newChild)
}
-trait AlterTableCommand extends UnaryCommand {
+trait AlterTableColumnCommand extends UnaryCommand {
def table: LogicalPlan
- def operation: String
def changes: Seq[TableChange]
override def child: LogicalPlan = table
}
+/**
+ * The logical plan of the ALTER TABLE ... ADD COLUMNS command.
+ */
+case class AlterTableAddColumns(
+ table: LogicalPlan,
+ columnsToAdd: Seq[QualifiedColType]) extends AlterTableColumnCommand {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Util._
+ columnsToAdd.foreach { c =>
+ failNullType(c.dataType)
+ TypeUtils.failWithIntervalType(c.dataType)
+ }
+
+ override def changes: Seq[TableChange] = {
+ columnsToAdd.map { col =>
+ require(col.position.forall(_.resolved),
+ "FieldPosition should be resolved before it's converted to
TableChange.")
+ TableChange.addColumn(
+ col.name.toArray,
+ col.dataType,
+ col.nullable,
+ col.comment.orNull,
+ col.position.map(_.position).orNull)
+ }
+ }
+
+ override protected def withNewChildInternal(newChild: LogicalPlan):
LogicalPlan =
+ copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... REPLACE COLUMNS command.
+ */
+case class AlterTableReplaceColumns(
+ table: LogicalPlan,
+ columnsToAdd: Seq[QualifiedColType]) extends AlterTableColumnCommand {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Util._
+ columnsToAdd.foreach { c =>
+ failNullType(c.dataType)
+ TypeUtils.failWithIntervalType(c.dataType)
+ }
+
+ override def changes: Seq[TableChange] = {
+ // REPLACE COLUMNS deletes all the existing columns and adds new columns
specified.
+ require(table.resolved)
+ val deleteChanges = table.schema.fieldNames.map { name =>
+ TableChange.deleteColumn(Array(name))
+ }
+ val addChanges = columnsToAdd.map { col =>
+ // Cannot add nested columns when replacing columns.
+ assert(col.path.name.isEmpty)
Review comment:
@cloud-fan I don't think we can specify nested column for "REPLACE
COLUMNS" since we are dropping all columns and adding new ones. So, in the
current code, `col.path` is not used and will remain as `UnresolvedFieldName`.
In this case, should we make `col.path` as `Option[FieldName]`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]