brkyvz commented on a change in pull request #27350: [SPARK-30615][SQL]
Introduce Analyzer rule for V2 AlterTable column change resolution
URL: https://github.com/apache/spark/pull/27350#discussion_r373093617
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3002,6 +3004,161 @@ class Analyzer(
}
}
}
+
+ /** Rule to mostly resolve, normalize and rewrite column names based on case
sensitivity. */
+ object ResolveAlterTableChanges extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+ case a @ AlterTable(_, _, t: NamedRelation, changes) if t.resolved =>
+ val schema = t.schema
+ val normalizedChanges = changes.flatMap {
+ case add: AddColumn =>
+ val parent = add.fieldNames().init
+ if (parent.nonEmpty) {
+ // Adding a nested field, need to normalize the parent column
and position
+ val target = schema.findNestedField(parent, includeCollections =
true, conf.resolver)
+ if (target.isEmpty) {
+ // Leave unresolved. Throws error in CheckAnalysis
+ Some(add)
+ } else {
+ val (normalizedName, sf) = target.get
+ sf.dataType match {
+ case struct: StructType =>
+ val pos = findColumnPosition(add.position(),
parent.quoted, struct)
+ Some(TableChange.addColumn(
+ (normalizedName ++ Seq(sf.name,
add.fieldNames().last)).toArray,
+ add.dataType(),
+ add.isNullable,
+ add.comment,
+ pos))
+
+ case other =>
+ throw new AnalysisException(
+ s"Columns can only be added to struct types. Found
${other.simpleString}.")
+ }
+ }
+ } else {
+ // Adding to the root. Just need to normalize position
+ val pos = findColumnPosition(add.position(), "root", schema)
+ Some(TableChange.addColumn(
+ add.fieldNames(),
+ add.dataType(),
+ add.isNullable,
+ add.comment,
+ pos))
+ }
+
+ case typeChange: UpdateColumnType =>
+ // Hive style syntax provides the column type, even if it may not
have changed
+ val fieldOpt = schema.findNestedField(
+ typeChange.fieldNames(), includeCollections = true,
conf.resolver)
+
+ if (fieldOpt.isEmpty) {
+ // We couldn't resolve the field. Leave it to CheckAnalysis
+ Some(typeChange)
+ } else {
+ val (fieldNames, field) = fieldOpt.get
+ if (field.dataType == typeChange.newDataType()) {
+ // The user didn't want the field to change, so remove this
change
+ None
+ } else {
+ Some(TableChange.updateColumnType(
+ (fieldNames :+ field.name).toArray,
typeChange.newDataType()))
+ }
+ }
+ case n: UpdateColumnNullability =>
+ // Need to resolve column
+ resolveFieldNames(
+ schema,
+ n.fieldNames(),
+ TableChange.updateColumnNullability(_,
n.nullable())).orElse(Some(n))
+
+ case position: UpdateColumnPosition =>
+ position.position() match {
+ case after: After =>
+ // Need to resolve column as well as position reference
+ val fieldOpt = schema.findNestedField(
+ position.fieldNames(), includeCollections = true,
conf.resolver)
+
+ if (fieldOpt.isEmpty) {
+ Some(position)
+ } else {
+ val (normalizedPath, field) = fieldOpt.get
+ val targetCol = schema.findNestedField(
+ normalizedPath :+ after.column(), includeCollections =
true, conf.resolver)
+ if (targetCol.isEmpty) {
+ // Leave unchanged to CheckAnalysis
+ Some(position)
+ } else {
+ Some(TableChange.updateColumnPosition(
+ (normalizedPath :+ field.name).toArray,
+ ColumnPosition.after(targetCol.get._2.name)))
+ }
+ }
+ case _ =>
+ // Need to resolve column
+ resolveFieldNames(
+ schema,
+ position.fieldNames(),
+ TableChange.updateColumnPosition(_,
position.position())).orElse(Some(position))
+ }
+
+ case comment: UpdateColumnComment =>
+ resolveFieldNames(
+ schema,
+ comment.fieldNames(),
+ TableChange.updateColumnComment(_,
comment.newComment())).orElse(Some(comment))
+
+ case rename: RenameColumn =>
+ resolveFieldNames(
+ schema,
+ rename.fieldNames(),
+ TableChange.renameColumn(_,
rename.newName())).orElse(Some(rename))
+
+ case delete: DeleteColumn =>
+ resolveFieldNames(schema, delete.fieldNames(),
TableChange.deleteColumn)
+ .orElse(Some(delete))
+
+ case column: ColumnChange =>
+ // This is informational for future developers
+ throw new UnsupportedOperationException(
+ "Please add an implementation for a column change here")
+ case other => Some(other)
Review comment:
This is for other changes like SetProperties
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]