brkyvz commented on a change in pull request #27350: [SPARK-30615][SQL]
Introduce Analyzer rule for V2 AlterTable column change resolution
URL: https://github.com/apache/spark/pull/27350#discussion_r373093861
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3002,6 +3004,161 @@ class Analyzer(
}
}
}
+
+ /** Rule to mostly resolve, normalize and rewrite column names based on case
sensitivity. */
+ object ResolveAlterTableChanges extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+ case a @ AlterTable(_, _, t: NamedRelation, changes) if t.resolved =>
+ val schema = t.schema
+ val normalizedChanges = changes.flatMap {
+ case add: AddColumn =>
+ val parent = add.fieldNames().init
+ if (parent.nonEmpty) {
+ // Adding a nested field, need to normalize the parent column
and position
+ val target = schema.findNestedField(parent, includeCollections =
true, conf.resolver)
+ if (target.isEmpty) {
+ // Leave unresolved. Throws error in CheckAnalysis
+ Some(add)
+ } else {
+ val (normalizedName, sf) = target.get
+ sf.dataType match {
+ case struct: StructType =>
+ val pos = findColumnPosition(add.position(),
parent.quoted, struct)
+ Some(TableChange.addColumn(
+ (normalizedName ++ Seq(sf.name,
add.fieldNames().last)).toArray,
+ add.dataType(),
+ add.isNullable,
+ add.comment,
+ pos))
+
+ case other =>
+ throw new AnalysisException(
+ s"Columns can only be added to struct types. Found
${other.simpleString}.")
+ }
+ }
+ } else {
+ // Adding to the root. Just need to normalize position
+ val pos = findColumnPosition(add.position(), "root", schema)
+ Some(TableChange.addColumn(
+ add.fieldNames(),
+ add.dataType(),
+ add.isNullable,
+ add.comment,
+ pos))
+ }
+
+ case typeChange: UpdateColumnType =>
+ // Hive style syntax provides the column type, even if it may not
have changed
+ val fieldOpt = schema.findNestedField(
+ typeChange.fieldNames(), includeCollections = true,
conf.resolver)
+
+ if (fieldOpt.isEmpty) {
+ // We couldn't resolve the field. Leave it to CheckAnalysis
+ Some(typeChange)
+ } else {
+ val (fieldNames, field) = fieldOpt.get
+ if (field.dataType == typeChange.newDataType()) {
+ // The user didn't want the field to change, so remove this
change
+ None
+ } else {
+ Some(TableChange.updateColumnType(
+ (fieldNames :+ field.name).toArray,
typeChange.newDataType()))
+ }
+ }
+ case n: UpdateColumnNullability =>
+ // Need to resolve column
+ resolveFieldNames(
+ schema,
+ n.fieldNames(),
+ TableChange.updateColumnNullability(_,
n.nullable())).orElse(Some(n))
+
+ case position: UpdateColumnPosition =>
+ position.position() match {
+ case after: After =>
+ // Need to resolve column as well as position reference
+ val fieldOpt = schema.findNestedField(
+ position.fieldNames(), includeCollections = true,
conf.resolver)
+
+ if (fieldOpt.isEmpty) {
+ Some(position)
+ } else {
+ val (normalizedPath, field) = fieldOpt.get
+ val targetCol = schema.findNestedField(
+ normalizedPath :+ after.column(), includeCollections =
true, conf.resolver)
+ if (targetCol.isEmpty) {
+ // Leave unchanged to CheckAnalysis
+ Some(position)
+ } else {
+ Some(TableChange.updateColumnPosition(
+ (normalizedPath :+ field.name).toArray,
+ ColumnPosition.after(targetCol.get._2.name)))
+ }
+ }
+ case _ =>
+ // Need to resolve column
+ resolveFieldNames(
+ schema,
+ position.fieldNames(),
+ TableChange.updateColumnPosition(_,
position.position())).orElse(Some(position))
+ }
+
+ case comment: UpdateColumnComment =>
+ resolveFieldNames(
+ schema,
+ comment.fieldNames(),
+ TableChange.updateColumnComment(_,
comment.newComment())).orElse(Some(comment))
+
+ case rename: RenameColumn =>
+ resolveFieldNames(
+ schema,
+ rename.fieldNames(),
+ TableChange.renameColumn(_,
rename.newName())).orElse(Some(rename))
+
+ case delete: DeleteColumn =>
+ resolveFieldNames(schema, delete.fieldNames(),
TableChange.deleteColumn)
+ .orElse(Some(delete))
+
+ case column: ColumnChange =>
+ // This is informational for future developers
+ throw new UnsupportedOperationException(
+ "Please add an implementation for a column change here")
+ case other => Some(other)
+ }
+
+ a.copy(changes = normalizedChanges)
+ }
+
+ /**
+ * Returns the table change if the field can be resolved, returns None if
the column is not
+ * found. An error will be thrown in CheckAnalysis for columns that can't
be resolved.
+ */
+ private def resolveFieldNames(
+ schema: StructType,
+ fieldNames: Array[String],
+ copy: Array[String] => TableChange): Option[TableChange] = {
+ val fieldOpt = schema.findNestedField(
+ fieldNames, includeCollections = true, conf.resolver)
+ fieldOpt.map { case (path, field) => copy((path :+ field.name).toArray) }
+ }
+
+ private def findColumnPosition(
+ position: ColumnPosition,
+ field: String,
+ struct: StructType): ColumnPosition = {
+ position match {
+ case null => null
Review comment:
when you're adding a column without a position (so to the end)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]