This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 7c58684 [SPARK-36380][SQL] Simplify the logical plan names for ALTER TABLE ... COLUMN 7c58684 is described below commit 7c586842d71064169aa77baf666a8566d9ed785e Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue Aug 3 10:43:00 2021 +0300 [SPARK-36380][SQL] Simplify the logical plan names for ALTER TABLE ... COLUMN ### What changes were proposed in this pull request? This a followup of the recent work such as https://github.com/apache/spark/pull/33200 For `ALTER TABLE` commands, the logical plans do not have the common `AlterTable` prefix in the name and just use names like `SetTableLocation`. This PR proposes to follow the same naming rule in `ALTER TABE ... COLUMN` commands. This PR also moves these AlterTable commands to a individual file and give them a base trait. ### Why are the changes needed? name simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing test Closes #33609 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> (cherry picked from commit 7cb9c1c2415a0984515e4d4733f816673e4ae3c8) Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 12 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 12 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 12 +- .../plans/logical/v2AlterTableCommands.scala | 230 +++++++++++++++++++++ .../sql/catalyst/plans/logical/v2Commands.scala | 193 +---------------- .../spark/sql/catalyst/parser/DDLParserSuite.scala | 54 ++--- .../catalyst/analysis/ResolveSessionCatalog.scala | 10 +- .../datasources/v2/DataSourceV2Strategy.scala | 26 +-- .../connector/V2CommandsCaseSensitivitySuite.scala | 26 +-- .../execution/command/PlanResolutionSuite.scala | 12 +- 10 files changed, 303 insertions(+), 284 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a9c085a..75fad11a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -269,7 +269,7 @@ class Analyzer(override val catalogManager: CatalogManager) ResolveRelations :: ResolveTables :: ResolvePartitionSpec :: - ResolveAlterTableColumnCommands :: + ResolveAlterTableCommands :: AddMetadataColumns :: DeduplicateRelations :: ResolveReferences :: @@ -3607,15 +3607,15 @@ class Analyzer(override val catalogManager: CatalogManager) * Rule to mostly resolve, normalize and rewrite column names based on case sensitivity * for alter table column commands. */ - object ResolveAlterTableColumnCommands extends Rule[LogicalPlan] { + object ResolveAlterTableCommands extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case a: AlterTableColumnCommand if a.table.resolved && hasUnresolvedFieldName(a) => + case a: AlterTableCommand if a.table.resolved && hasUnresolvedFieldName(a) => val table = a.table.asInstanceOf[ResolvedTable] a.transformExpressions { case u: UnresolvedFieldName => resolveFieldNames(table, u.name, u) } - case a @ AlterTableAddColumns(r: ResolvedTable, cols) if !a.resolved => + case a @ AddColumns(r: ResolvedTable, cols) if !a.resolved => // 'colsToAdd' keeps track of new columns being added. It stores a mapping from a // normalized parent name of fields to field names that belong to the parent. // For example, if we add columns "a.b.c", "a.b.d", and "a.c", 'colsToAdd' will become @@ -3668,7 +3668,7 @@ class Analyzer(override val catalogManager: CatalogManager) resolved.copyTagsFrom(a) resolved - case a @ AlterTableAlterColumn( + case a @ AlterColumn( table: ResolvedTable, ResolvedFieldName(path, field), dataType, _, _, position) => val newDataType = dataType.flatMap { dt => // Hive style syntax provides the column type, even if it may not have changed. @@ -3705,7 +3705,7 @@ class Analyzer(override val catalogManager: CatalogManager) }.getOrElse(throw QueryCompilationErrors.missingFieldError(fieldName, table, context.origin)) } - private def hasUnresolvedFieldName(a: AlterTableColumnCommand): Boolean = { + private def hasUnresolvedFieldName(a: AlterTableCommand): Boolean = { a.expressions.exists(_.find(_.isInstanceOf[UnresolvedFieldName]).isDefined) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 77f721c..09cd71b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -442,8 +442,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { case write: V2WriteCommand if write.resolved => write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) - case alter: AlterTableColumnCommand if alter.table.resolved => - checkAlterTableColumnCommand(alter) + case alter: AlterTableCommand => + checkAlterTableCommand(alter) case _ => // Falls back to the following checks } @@ -939,7 +939,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { /** * Validates the options used for alter table commands after table and columns are resolved. */ - private def checkAlterTableColumnCommand(alter: AlterTableColumnCommand): Unit = { + private def checkAlterTableCommand(alter: AlterTableCommand): Unit = { def checkColumnNotExists(op: String, fieldNames: Seq[String], struct: StructType): Unit = { if (struct.findNestedField(fieldNames, includeCollections = true).isDefined) { alter.failAnalysis(s"Cannot $op column, because ${fieldNames.quoted} " + @@ -948,7 +948,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { } alter match { - case AlterTableAddColumns(table: ResolvedTable, colsToAdd) => + case AddColumns(table: ResolvedTable, colsToAdd) => colsToAdd.foreach { colToAdd => checkColumnNotExists("add", colToAdd.name, table.schema) } @@ -957,10 +957,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { "in the user specified columns", alter.conf.resolver) - case AlterTableRenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) => + case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) => checkColumnNotExists("rename", col.path :+ newName, table.schema) - case a @ AlterTableAlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) => + case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) => val fieldName = col.name.quoted if (a.dataType.isDefined) { val field = CharVarcharUtils.getRawType(col.field.metadata) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index de5a84c..8f5c2f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3614,7 +3614,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg */ override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) { val colToken = if (ctx.COLUMN() != null) "COLUMN" else "COLUMNS" - AlterTableAddColumns( + AddColumns( createUnresolvedTable(ctx.multipartIdentifier, s"ALTER TABLE ... ADD $colToken"), ctx.columns.qualifiedColTypeWithPosition.asScala.map(typedVisit[QualifiedColType]).toSeq ) @@ -3630,7 +3630,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg */ override def visitRenameTableColumn( ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) { - AlterTableRenameColumn( + RenameColumn( createUnresolvedTable(ctx.table, "ALTER TABLE ... RENAME COLUMN"), UnresolvedFieldName(typedVisit[Seq[String]](ctx.from)), ctx.to.getText) @@ -3684,7 +3684,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg assert(Seq(dataType, nullable, comment, position).count(_.nonEmpty) == 1) - AlterTableAlterColumn( + AlterColumn( createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"), UnresolvedFieldName(typedVisit[Seq[String]](ctx.column)), dataType = dataType, @@ -3718,7 +3718,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg Some("please run ALTER COLUMN ... SET/DROP NOT NULL instead")) } - AlterTableAlterColumn( + AlterColumn( createUnresolvedTable(ctx.table, s"ALTER TABLE ... CHANGE COLUMN"), UnresolvedFieldName(columnNameParts), dataType = Option(ctx.colType().dataType()).map(typedVisit[DataType]), @@ -3733,7 +3733,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg if (ctx.partitionSpec != null) { operationNotAllowed("ALTER TABLE table PARTITION partition_spec REPLACE COLUMNS", ctx) } - AlterTableReplaceColumns( + ReplaceColumns( createUnresolvedTable(ctx.multipartIdentifier, "ALTER TABLE ... REPLACE COLUMNS"), ctx.columns.qualifiedColTypeWithPosition.asScala.map { colType => if (colType.NULL != null) { @@ -3766,7 +3766,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg override def visitDropTableColumns( ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) { val columnsToDrop = ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]]) - AlterTableDropColumns( + DropColumns( createUnresolvedTable( ctx.multipartIdentifier, "ALTER TABLE ... DROP COLUMNS"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala new file mode 100644 index 0000000..302a810 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.util.TypeUtils +import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types.DataType + +/** + * The base trait for commands that need to alter a v2 table with [[TableChange]]s. + */ +trait AlterTableCommand extends UnaryCommand { + def changes: Seq[TableChange] + def table: LogicalPlan + final override def child: LogicalPlan = table +} + +/** + * The logical plan that defines or changes the comment of an TABLE for v2 catalogs. + * + * {{{ + * COMMENT ON TABLE tableIdentifier IS ('text' | NULL) + * }}} + * + * where the `text` is the new comment written as a string literal; or `NULL` to drop the comment. + */ +case class CommentOnTable(table: LogicalPlan, comment: String) extends AlterTableCommand { + override def changes: Seq[TableChange] = { + Seq(TableChange.setProperty(TableCatalog.PROP_COMMENT, comment)) + } + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(table = newChild) +} + +/** + * The logical plan of the ALTER TABLE ... SET LOCATION command. + */ +case class SetTableLocation( + table: LogicalPlan, + partitionSpec: Option[TablePartitionSpec], + location: String) extends AlterTableCommand { + override def changes: Seq[TableChange] = { + if (partitionSpec.nonEmpty) { + throw QueryCompilationErrors.alterV2TableSetLocationWithPartitionNotSupportedError() + } + Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location)) + } + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(table = newChild) +} + +/** + * The logical plan of the ALTER TABLE ... SET TBLPROPERTIES command. + */ +case class SetTableProperties( + table: LogicalPlan, + properties: Map[String, String]) extends AlterTableCommand { + override def changes: Seq[TableChange] = { + properties.map { case (key, value) => + TableChange.setProperty(key, value) + }.toSeq + } + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(table = newChild) +} + +/** + * The logical plan of the ALTER TABLE ... UNSET TBLPROPERTIES command. + */ +case class UnsetTableProperties( + table: LogicalPlan, + propertyKeys: Seq[String], + ifExists: Boolean) extends AlterTableCommand { + override def changes: Seq[TableChange] = { + propertyKeys.map(key => TableChange.removeProperty(key)) + } + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(table = newChild) +} + +/** + * The logical plan of the ALTER TABLE ... ADD COLUMNS command. + */ +case class AddColumns( + table: LogicalPlan, + columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand { + columnsToAdd.foreach { c => + TypeUtils.failWithIntervalType(c.dataType) + } + + override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved) + + override def changes: Seq[TableChange] = { + columnsToAdd.map { col => + require(col.path.forall(_.resolved), + "FieldName should be resolved before it's converted to TableChange.") + require(col.position.forall(_.resolved), + "FieldPosition should be resolved before it's converted to TableChange.") + TableChange.addColumn( + col.name.toArray, + col.dataType, + col.nullable, + col.comment.orNull, + col.position.map(_.position).orNull) + } + } + + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(table = newChild) +} + +/** + * The logical plan of the ALTER TABLE ... REPLACE COLUMNS command. + */ +case class ReplaceColumns( + table: LogicalPlan, + columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand { + columnsToAdd.foreach { c => + TypeUtils.failWithIntervalType(c.dataType) + } + + override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved) + + override def changes: Seq[TableChange] = { + // REPLACE COLUMNS deletes all the existing columns and adds new columns specified. + require(table.resolved) + val deleteChanges = table.schema.fieldNames.map { name => + TableChange.deleteColumn(Array(name)) + } + val addChanges = columnsToAdd.map { col => + assert(col.path.isEmpty) + assert(col.position.isEmpty) + TableChange.addColumn( + col.name.toArray, + col.dataType, + col.nullable, + col.comment.orNull, + null) + } + deleteChanges ++ addChanges + } + + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(table = newChild) +} + +/** + * The logical plan of the ALTER TABLE ... DROP COLUMNS command. + */ +case class DropColumns( + table: LogicalPlan, + columnsToDrop: Seq[FieldName]) extends AlterTableCommand { + override def changes: Seq[TableChange] = { + columnsToDrop.map { col => + require(col.resolved, "FieldName should be resolved before it's converted to TableChange.") + TableChange.deleteColumn(col.name.toArray) + } + } + + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(table = newChild) +} + +/** + * The logical plan of the ALTER TABLE ... RENAME COLUMN command. + */ +case class RenameColumn( + table: LogicalPlan, + column: FieldName, + newName: String) extends AlterTableCommand { + override def changes: Seq[TableChange] = { + require(column.resolved, "FieldName should be resolved before it's converted to TableChange.") + Seq(TableChange.renameColumn(column.name.toArray, newName)) + } + + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(table = newChild) +} + +/** + * The logical plan of the ALTER TABLE ... ALTER COLUMN command. + */ +case class AlterColumn( + table: LogicalPlan, + column: FieldName, + dataType: Option[DataType], + nullable: Option[Boolean], + comment: Option[String], + position: Option[FieldPosition]) extends AlterTableCommand { + override def changes: Seq[TableChange] = { + require(column.resolved, "FieldName should be resolved before it's converted to TableChange.") + val colName = column.name.toArray + val typeChange = dataType.map { newDataType => + TableChange.updateColumnType(colName, newDataType) + } + val nullabilityChange = nullable.map { nullable => + TableChange.updateColumnNullability(colName, nullable) + } + val commentChange = comment.map { newComment => + TableChange.updateColumnComment(colName, newComment) + } + val positionChange = position.map { newPosition => + require(newPosition.resolved, + "FieldPosition should be resolved before it's converted to TableChange.") + TableChange.updateColumnPosition(colName, newPosition.position) + } + typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange + } + + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(table = newChild) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index d2b5909..195bb8c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -17,12 +17,12 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, NamedRelation, PartitionSpec, UnresolvedException} +import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, UnresolvedException} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable} import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema import org.apache.spark.sql.catalyst.trees.BinaryLike -import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils} +import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.write.Write @@ -676,21 +676,6 @@ case class CommentOnNamespace(child: LogicalPlan, comment: String) extends Unary } /** - * The logical plan that defines or changes the comment of an TABLE for v2 catalogs. - * - * {{{ - * COMMENT ON TABLE tableIdentifier IS ('text' | NULL) - * }}} - * - * where the `text` is the new comment written as a string literal; or `NULL` to drop the comment. - * - */ -case class CommentOnTable(child: LogicalPlan, comment: String) extends UnaryCommand { - override protected def withNewChildInternal(newChild: LogicalPlan): CommentOnTable = - copy(child = newChild) -} - -/** * The logical plan of the REFRESH FUNCTION command. */ case class RefreshFunction(child: LogicalPlan) extends UnaryCommand { @@ -1043,177 +1028,3 @@ case class UncacheTable( override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true) } - -/** - * The logical plan of the ALTER TABLE ... SET LOCATION command. - */ -case class SetTableLocation( - table: LogicalPlan, - partitionSpec: Option[TablePartitionSpec], - location: String) extends UnaryCommand { - override def child: LogicalPlan = table - override protected def withNewChildInternal(newChild: LogicalPlan): SetTableLocation = - copy(table = newChild) -} - -/** - * The logical plan of the ALTER TABLE ... SET TBLPROPERTIES command. - */ -case class SetTableProperties( - table: LogicalPlan, - properties: Map[String, String]) extends UnaryCommand { - override def child: LogicalPlan = table - override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = - copy(table = newChild) -} - -/** - * The logical plan of the ALTER TABLE ... UNSET TBLPROPERTIES command. - */ -case class UnsetTableProperties( - table: LogicalPlan, - propertyKeys: Seq[String], - ifExists: Boolean) extends UnaryCommand { - override def child: LogicalPlan = table - override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = - copy(table = newChild) -} - -trait AlterTableColumnCommand extends UnaryCommand { - def table: LogicalPlan - def changes: Seq[TableChange] - override def child: LogicalPlan = table -} - -/** - * The logical plan of the ALTER TABLE ... ADD COLUMNS command. - */ -case class AlterTableAddColumns( - table: LogicalPlan, - columnsToAdd: Seq[QualifiedColType]) extends AlterTableColumnCommand { - columnsToAdd.foreach { c => - TypeUtils.failWithIntervalType(c.dataType) - } - - override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved) - - override def changes: Seq[TableChange] = { - columnsToAdd.map { col => - require(col.path.forall(_.resolved), - "FieldName should be resolved before it's converted to TableChange.") - require(col.position.forall(_.resolved), - "FieldPosition should be resolved before it's converted to TableChange.") - TableChange.addColumn( - col.name.toArray, - col.dataType, - col.nullable, - col.comment.orNull, - col.position.map(_.position).orNull) - } - } - - override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = - copy(table = newChild) -} - -/** - * The logical plan of the ALTER TABLE ... REPLACE COLUMNS command. - */ -case class AlterTableReplaceColumns( - table: LogicalPlan, - columnsToAdd: Seq[QualifiedColType]) extends AlterTableColumnCommand { - columnsToAdd.foreach { c => - TypeUtils.failWithIntervalType(c.dataType) - } - - override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved) - - override def changes: Seq[TableChange] = { - // REPLACE COLUMNS deletes all the existing columns and adds new columns specified. - require(table.resolved) - val deleteChanges = table.schema.fieldNames.map { name => - TableChange.deleteColumn(Array(name)) - } - val addChanges = columnsToAdd.map { col => - assert(col.path.isEmpty) - assert(col.position.isEmpty) - TableChange.addColumn( - col.name.toArray, - col.dataType, - col.nullable, - col.comment.orNull, - null) - } - deleteChanges ++ addChanges - } - - override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = - copy(table = newChild) -} - -/** - * The logical plan of the ALTER TABLE ... DROP COLUMNS command. - */ -case class AlterTableDropColumns( - table: LogicalPlan, - columnsToDrop: Seq[FieldName]) extends AlterTableColumnCommand { - override def changes: Seq[TableChange] = { - columnsToDrop.map { col => - require(col.resolved, "FieldName should be resolved before it's converted to TableChange.") - TableChange.deleteColumn(col.name.toArray) - } - } - - override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = - copy(table = newChild) -} - -/** - * The logical plan of the ALTER TABLE ... RENAME COLUMN command. - */ -case class AlterTableRenameColumn( - table: LogicalPlan, - column: FieldName, - newName: String) extends AlterTableColumnCommand { - override def changes: Seq[TableChange] = { - require(column.resolved, "FieldName should be resolved before it's converted to TableChange.") - Seq(TableChange.renameColumn(column.name.toArray, newName)) - } - - override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = - copy(table = newChild) -} - -/** - * The logical plan of the ALTER TABLE ... ALTER COLUMN command. - */ -case class AlterTableAlterColumn( - table: LogicalPlan, - column: FieldName, - dataType: Option[DataType], - nullable: Option[Boolean], - comment: Option[String], - position: Option[FieldPosition]) extends AlterTableColumnCommand { - override def changes: Seq[TableChange] = { - require(column.resolved, "FieldName should be resolved before it's converted to TableChange.") - val colName = column.name.toArray - val typeChange = dataType.map { newDataType => - TableChange.updateColumnType(colName, newDataType) - } - val nullabilityChange = nullable.map { nullable => - TableChange.updateColumnNullability(colName, nullable) - } - val commentChange = comment.map { newComment => - TableChange.updateColumnComment(colName, newComment) - } - val positionChange = position.map { newPosition => - require(newPosition.resolved, - "FieldPosition should be resolved before it's converted to TableChange.") - TableChange.updateColumnPosition(colName, newPosition.position) - } - typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange - } - - override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = - copy(table = newChild) -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index ea35f8b..45f865f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -788,7 +788,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int"), - AlterTableAddColumns( + AddColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None), Seq(QualifiedColType(None, "x", IntegerType, true, None, None) ))) @@ -797,7 +797,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add multiple columns") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS x int, y string"), - AlterTableAddColumns( + AddColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None), Seq(QualifiedColType(None, "x", IntegerType, true, None, None), QualifiedColType(None, "y", StringType, true, None, None) @@ -807,7 +807,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with COLUMNS") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS x int"), - AlterTableAddColumns( + AddColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None), Seq(QualifiedColType(None, "x", IntegerType, true, None, None) ))) @@ -816,7 +816,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with COLUMNS (...)") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS (x int)"), - AlterTableAddColumns( + AddColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None), Seq(QualifiedColType(None, "x", IntegerType, true, None, None) ))) @@ -825,7 +825,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with COLUMNS (...) and COMMENT") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS (x int COMMENT 'doc')"), - AlterTableAddColumns( + AddColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None), Seq(QualifiedColType(None, "x", IntegerType, true, Some("doc"), None) ))) @@ -834,7 +834,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add non-nullable column") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int NOT NULL"), - AlterTableAddColumns( + AddColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None), Seq(QualifiedColType(None, "x", IntegerType, false, None, None) ))) @@ -843,7 +843,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with COMMENT") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int COMMENT 'doc'"), - AlterTableAddColumns( + AddColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None), Seq(QualifiedColType(None, "x", IntegerType, true, Some("doc"), None) ))) @@ -852,7 +852,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with position") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int FIRST"), - AlterTableAddColumns( + AddColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None), Seq(QualifiedColType( None, @@ -865,7 +865,7 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int AFTER y"), - AlterTableAddColumns( + AddColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None), Seq(QualifiedColType( None, @@ -880,7 +880,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with nested column name") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc'"), - AlterTableAddColumns( + AddColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None), Seq(QualifiedColType( Some(UnresolvedFieldName(Seq("x", "y"))), "z", IntegerType, true, Some("doc"), None) @@ -890,7 +890,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add multiple columns with nested column name") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc', a.b string FIRST"), - AlterTableAddColumns( + AddColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None), Seq( QualifiedColType( @@ -930,7 +930,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: rename column") { comparePlans( parsePlan("ALTER TABLE table_name RENAME COLUMN a.b.c TO d"), - AlterTableRenameColumn( + RenameColumn( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... RENAME COLUMN", None), UnresolvedFieldName(Seq("a", "b", "c")), "d")) @@ -939,7 +939,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column type using ALTER") { comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c TYPE bigint"), - AlterTableAlterColumn( + AlterColumn( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None), UnresolvedFieldName(Seq("a", "b", "c")), Some(LongType), @@ -958,7 +958,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column type") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c TYPE bigint"), - AlterTableAlterColumn( + AlterColumn( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None), UnresolvedFieldName(Seq("a", "b", "c")), Some(LongType), @@ -970,7 +970,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column comment") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c COMMENT 'new comment'"), - AlterTableAlterColumn( + AlterColumn( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None), UnresolvedFieldName(Seq("a", "b", "c")), None, @@ -982,7 +982,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column position") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c FIRST"), - AlterTableAlterColumn( + AlterColumn( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None), UnresolvedFieldName(Seq("a", "b", "c")), None, @@ -1008,7 +1008,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: SET/DROP NOT NULL") { comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c SET NOT NULL"), - AlterTableAlterColumn( + AlterColumn( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None), UnresolvedFieldName(Seq("a", "b", "c")), None, @@ -1018,7 +1018,7 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c DROP NOT NULL"), - AlterTableAlterColumn( + AlterColumn( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None), UnresolvedFieldName(Seq("a", "b", "c")), None, @@ -1030,7 +1030,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: drop column") { comparePlans( parsePlan("ALTER TABLE table_name DROP COLUMN a.b.c"), - AlterTableDropColumns( + DropColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", None), Seq(UnresolvedFieldName(Seq("a", "b", "c"))))) } @@ -1040,7 +1040,7 @@ class DDLParserSuite extends AnalysisTest { Seq(sql, sql.replace("COLUMN", "COLUMNS")).foreach { drop => comparePlans( parsePlan(drop), - AlterTableDropColumns( + DropColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", None), Seq(UnresolvedFieldName(Seq("x")), UnresolvedFieldName(Seq("y")), @@ -1055,7 +1055,7 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql1), - AlterTableAlterColumn( + AlterColumn( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None), UnresolvedFieldName(Seq("a", "b", "c")), Some(IntegerType), @@ -1065,7 +1065,7 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql2), - AlterTableAlterColumn( + AlterColumn( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None), UnresolvedFieldName(Seq("a", "b", "c")), Some(IntegerType), @@ -1075,7 +1075,7 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql3), - AlterTableAlterColumn( + AlterColumn( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None), UnresolvedFieldName(Seq("a", "b", "c")), Some(IntegerType), @@ -1099,19 +1099,19 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql1), - AlterTableReplaceColumns( + ReplaceColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None), Seq(QualifiedColType(None, "x", StringType, true, None, None)))) comparePlans( parsePlan(sql2), - AlterTableReplaceColumns( + ReplaceColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None), Seq(QualifiedColType(None, "x", StringType, true, Some("x1"), None)))) comparePlans( parsePlan(sql3), - AlterTableReplaceColumns( + ReplaceColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None), Seq( QualifiedColType(None, "x", StringType, true, Some("x1"), None), @@ -1120,7 +1120,7 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql4), - AlterTableReplaceColumns( + ReplaceColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None), Seq( QualifiedColType(None, "x", StringType, true, Some("x1"), None), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 3a2f525..80063cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -46,7 +46,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case AlterTableAddColumns(ResolvedV1TableIdentifier(ident), cols) => + case AddColumns(ResolvedV1TableIdentifier(ident), cols) => cols.foreach { c => assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand") if (!c.nullable) { @@ -55,10 +55,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } AlterTableAddColumnsCommand(ident.asTableIdentifier, cols.map(convertToStructField)) - case AlterTableReplaceColumns(ResolvedV1TableIdentifier(_), _) => + case ReplaceColumns(ResolvedV1TableIdentifier(_), _) => throw QueryCompilationErrors.replaceColumnsOnlySupportedWithV2TableError - case a @ AlterTableAlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, _, _) => + case a @ AlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, _, _) => if (a.column.name.length > 1) { throw QueryCompilationErrors.alterQualifiedColumnOnlySupportedWithV2TableError } @@ -87,10 +87,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) builder.build()) AlterTableChangeColumnCommand(ident.asTableIdentifier, colName, newColumn) - case AlterTableRenameColumn(ResolvedV1TableIdentifier(_), _, _) => + case RenameColumn(ResolvedV1TableIdentifier(_), _, _) => throw QueryCompilationErrors.renameColumnOnlySupportedWithV2TableError - case AlterTableDropColumns(ResolvedV1TableIdentifier(_), _) => + case DropColumns(ResolvedV1TableIdentifier(_), _) => throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError case SetTableProperties(ResolvedV1TableIdentifier(ident), props) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 3d69029..1a50c32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.toPrettySQL -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog} import org.apache.spark.sql.connector.read.LocalScan import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.connector.write.V1Write @@ -314,10 +314,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat ns, Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil - case CommentOnTable(ResolvedTable(catalog, identifier, _, _), comment) => - val changes = TableChange.setProperty(TableCatalog.PROP_COMMENT, comment) - AlterTableExec(catalog, identifier, Seq(changes)) :: Nil - case CreateNamespace(catalog, namespace, ifNotExists, properties) => CreateNamespaceExec(catalog, namespace, ifNotExists, properties) :: Nil @@ -424,25 +420,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } UncacheTableExec(r.table, cascade = !isTempView(r.table)) :: Nil - case SetTableLocation(table: ResolvedTable, partitionSpec, location) => - if (partitionSpec.nonEmpty) { - throw QueryCompilationErrors.alterV2TableSetLocationWithPartitionNotSupportedError() - } - val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location)) - AlterTableExec(table.catalog, table.identifier, changes) :: Nil - - case SetTableProperties(table: ResolvedTable, props) => - val changes = props.map { case (key, value) => - TableChange.setProperty(key, value) - }.toSeq - AlterTableExec(table.catalog, table.identifier, changes) :: Nil - - // TODO: v2 `UNSET TBLPROPERTIES` should respect the ifExists flag. - case UnsetTableProperties(table: ResolvedTable, keys, _) => - val changes = keys.map(key => TableChange.removeProperty(key)) - AlterTableExec(table.catalog, table.identifier, changes) :: Nil - - case a: AlterTableColumnCommand if a.table.resolved => + case a: AlterTableCommand if a.table.resolved => val table = a.table.asInstanceOf[ResolvedTable] AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala index 763cd6a..1d6e0a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition} -import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddColumns, AlterTableAlterColumn, AlterTableColumnCommand, AlterTableDropColumns, AlterTableRenameColumn, CreateTableAsSelect, LogicalPlan, QualifiedColType, ReplaceTableAsSelect} +import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceTableAsSelect} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition @@ -140,7 +140,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes Seq("POINT.Z", "poInt.z", "poInt.Z").foreach { ref => val field = ref.split("\\.") alterTableTest( - AlterTableAddColumns( + AddColumns( table, Seq(QualifiedColType( Some(UnresolvedFieldName(field.init)), field.last, LongType, true, None, None))), @@ -152,7 +152,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("AlterTable: add column resolution - positional") { Seq("ID", "iD").foreach { ref => alterTableTest( - AlterTableAddColumns( + AddColumns( table, Seq(QualifiedColType( None, @@ -168,7 +168,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("AlterTable: add column resolution - column position referencing new column") { alterTableTest( - AlterTableAddColumns( + AddColumns( table, Seq(QualifiedColType( None, @@ -191,7 +191,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("AlterTable: add column resolution - nested positional") { Seq("X", "Y").foreach { ref => alterTableTest( - AlterTableAddColumns( + AddColumns( table, Seq(QualifiedColType( Some(UnresolvedFieldName(Seq("point"))), @@ -207,7 +207,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("AlterTable: add column resolution - column position referencing new nested column") { alterTableTest( - AlterTableAddColumns( + AddColumns( table, Seq(QualifiedColType( Some(UnresolvedFieldName(Seq("point"))), @@ -229,7 +229,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("SPARK-36372: Adding duplicate columns should not be allowed") { alterTableTest( - AlterTableAddColumns( + AddColumns( table, Seq(QualifiedColType( Some(UnresolvedFieldName(Seq("point"))), @@ -252,7 +252,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("AlterTable: drop column resolution") { Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref => alterTableTest( - AlterTableDropColumns(table, Seq(UnresolvedFieldName(ref))), + DropColumns(table, Seq(UnresolvedFieldName(ref))), Seq("Missing field " + ref.quoted) ) } @@ -261,7 +261,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("AlterTable: rename column resolution") { Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref => alterTableTest( - AlterTableRenameColumn(table, UnresolvedFieldName(ref), "newName"), + RenameColumn(table, UnresolvedFieldName(ref), "newName"), Seq("Missing field " + ref.quoted) ) } @@ -270,7 +270,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("AlterTable: drop column nullability resolution") { Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref => alterTableTest( - AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, Some(true), None, None), + AlterColumn(table, UnresolvedFieldName(ref), None, Some(true), None, None), Seq("Missing field " + ref.quoted) ) } @@ -279,7 +279,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("AlterTable: change column type resolution") { Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref => alterTableTest( - AlterTableAlterColumn(table, UnresolvedFieldName(ref), Some(StringType), None, None, None), + AlterColumn(table, UnresolvedFieldName(ref), Some(StringType), None, None, None), Seq("Missing field " + ref.quoted) ) } @@ -288,14 +288,14 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("AlterTable: change column comment resolution") { Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref => alterTableTest( - AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, None, Some("comment"), None), + AlterColumn(table, UnresolvedFieldName(ref), None, None, Some("comment"), None), Seq("Missing field " + ref.quoted) ) } } private def alterTableTest( - alter: AlterTableColumnCommand, + alter: AlterTableCommand, error: Seq[String], expectErrorOnCaseSensitive: Boolean = true): Unit = { Seq(true, false).foreach { caseSensitive => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 25a8c4e..2c2f833 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, import org.apache.spark.sql.catalyst.expressions.{AnsiCast, AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} +import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.FakeV2Provider import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, V1Table} @@ -1132,7 +1132,7 @@ class PlanResolutionSuite extends AnalysisTest { "ALTER COLUMN with qualified column is only supported with v2 tables")) } else { parsed1 match { - case AlterTableAlterColumn( + case AlterColumn( _: ResolvedTable, column: ResolvedFieldName, Some(LongType), @@ -1144,7 +1144,7 @@ class PlanResolutionSuite extends AnalysisTest { } parsed2 match { - case AlterTableAlterColumn( + case AlterColumn( _: ResolvedTable, column: ResolvedFieldName, None, @@ -1198,14 +1198,14 @@ class PlanResolutionSuite extends AnalysisTest { test("alter table: hive style change column") { Seq("v2Table", "testcat.tab").foreach { tblName => parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i int COMMENT 'an index'") match { - case AlterTableAlterColumn( + case AlterColumn( _: ResolvedTable, _: ResolvedFieldName, None, None, Some(comment), None) => assert(comment == "an index") case _ => fail("expect AlterTableAlterColumn with comment change only") } parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i long COMMENT 'an index'") match { - case AlterTableAlterColumn( + case AlterColumn( _: ResolvedTable, _: ResolvedFieldName, Some(dataType), None, Some(comment), None) => assert(comment == "an index") assert(dataType == LongType) @@ -1241,7 +1241,7 @@ class PlanResolutionSuite extends AnalysisTest { val catalog = if (isSessionCatalog) v2SessionCatalog else testCat val tableIdent = if (isSessionCatalog) "v2Table" else "tab" parsed match { - case AlterTableAlterColumn(r: ResolvedTable, _, _, _, _, _) => + case AlterColumn(r: ResolvedTable, _, _, _, _, _) => assert(r.catalog == catalog) assert(r.identifier.name() == tableIdent) case Project(_, AsDataSourceV2Relation(r)) => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org