This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 73edf31 [SPARK-36849][SQL] Migrate UseStatement to v2 command framework 73edf31 is described below commit 73edf31c63608c6416594a4c8f4a087f10dcd7a2 Author: dohongdayi <dohongd...@126.com> AuthorDate: Mon Oct 11 13:37:55 2021 +0800 [SPARK-36849][SQL] Migrate UseStatement to v2 command framework What changes were proposed in this pull request? Migrate `UseStatement` to v2 command framework, add `SetNamespaceCommand` Why are the changes needed? Migrate to the standard V2 framework Does this PR introduce any user-facing change? no How was this patch tested? existing tests Closes #34127 from dohongdayi/use_branch. Lead-authored-by: dohongdayi <dohongd...@126.com> Co-authored-by: Herbert Liao <herbe...@synnex.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 3 +- .../sql/catalyst/analysis/ResolveCatalogs.scala | 9 ------ .../spark/sql/catalyst/parser/AstBuilder.scala | 4 +-- .../sql/catalyst/plans/logical/statements.scala | 5 ---- .../sql/catalyst/plans/logical/v2Commands.scala | 11 ++++---- .../spark/sql/execution/SparkSqlParser.scala | 8 ++++++ .../execution/command/SetNamespaceCommand.scala | 33 ++++++++++++++++++++++ .../datasources/v2/DataSourceV2Strategy.scala | 6 ++-- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 6 ++++ 9 files changed, 61 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 886810e..3cd39a9 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -106,7 +106,8 @@ singleTableSchema statement : query #statementDefault | ctes? dmlStatementNoWith #dmlStatement - | USE NAMESPACE? multipartIdentifier #use + | USE multipartIdentifier #use + | USE NAMESPACE multipartIdentifier #useNamespace | SET CATALOG (identifier | STRING) #setCatalog | CREATE namespace (IF NOT EXISTS)? multipartIdentifier (commentSpec | diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index e9204ad..efc1ab2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -82,15 +82,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) convertTableProperties(c), writeOptions = c.writeOptions, orCreate = c.orCreate) - - case UseStatement(isNamespaceSet, nameParts) => - if (isNamespaceSet) { - SetCatalogAndNamespace(catalogManager, None, Some(nameParts)) - } else { - val CatalogAndNamespace(catalog, ns) = nameParts - val namespace = if (ns.nonEmpty) Some(ns) else None - SetCatalogAndNamespace(catalogManager, Some(catalog.name()), namespace) - } } object NonSessionCatalogAndTable { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 6a24a9d..1968142 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3565,11 +3565,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create a [[UseStatement]] logical plan. + * Create a [[SetCatalogAndNamespace]] command. */ override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) { val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier) - UseStatement(ctx.NAMESPACE != null, nameParts) + SetCatalogAndNamespace(UnresolvedDBObjectName(nameParts, isNamespace = true)) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 0373c25..c502981 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -272,8 +272,3 @@ case class InsertIntoStatement( override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoStatement = copy(query = newChild) } - -/** - * A USE statement, as parsed from SQL. - */ -case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends LeafParsedStatement diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index d548aec..e52dc02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -622,12 +622,13 @@ object ShowViews { } /** - * The logical plan of the USE/USE NAMESPACE command. + * The logical plan of the USE command. */ -case class SetCatalogAndNamespace( - catalogManager: CatalogManager, - catalogName: Option[String], - namespace: Option[Seq[String]]) extends LeafCommand +case class SetCatalogAndNamespace(child: LogicalPlan) extends UnaryCommand { + override protected def withNewChildInternal(newChild: LogicalPlan): SetCatalogAndNamespace = { + copy(child = newChild) + } +} /** * The logical plan of the REFRESH TABLE command. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 21aed5f..78d5354 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -240,6 +240,14 @@ class SparkSqlAstBuilder extends AstBuilder { } /** + * Create a [[SetNamespaceCommand]] logical command. + */ + override def visitUseNamespace(ctx: UseNamespaceContext): LogicalPlan = withOrigin(ctx) { + val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier) + SetNamespaceCommand(nameParts) + } + + /** * Create a [[SetCatalogCommand]] logical command. */ override def visitSetCatalog(ctx: SetCatalogContext): LogicalPlan = withOrigin(ctx) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetNamespaceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetNamespaceCommand.scala new file mode 100644 index 0000000..cef18f7 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetNamespaceCommand.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Attribute + +/** + * The command for `USE NAMESPACE XXX` + */ +case class SetNamespaceCommand(namespace: Seq[String]) extends LeafRunnableCommand { + override def output: Seq[Attribute] = Seq.empty + + override def run(sparkSession: SparkSession): Seq[Row] = { + sparkSession.sessionState.catalogManager.setCurrentNamespace(namespace.toArray) + Seq.empty + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index db61d61..56e7abc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -332,8 +332,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case ShowTables(ResolvedNamespace(catalog, ns), pattern, output) => ShowTablesExec(output, catalog.asTableCatalog, ns, pattern) :: Nil - case SetCatalogAndNamespace(catalogManager, catalogName, ns) => - SetCatalogAndNamespaceExec(catalogManager, catalogName, ns) :: Nil + case SetCatalogAndNamespace(ResolvedDBObjectName(catalog, ns)) => + val catalogManager = session.sessionState.catalogManager + val namespace = if (ns.nonEmpty) Some(ns) else None + SetCatalogAndNamespaceExec(catalogManager, Some(catalog.name()), namespace) :: Nil case ShowTableProperties(rt: ResolvedTable, propertyKey, output) => ShowTablePropertiesExec(output, rt.table, propertyKey) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index a2e147c..70bd8ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1337,6 +1337,7 @@ class DataSourceV2SQLSuite sql("CREATE TABLE testcat2.ns2.ns2_2.table (id bigint) USING foo") sql("CREATE TABLE testcat2.ns3.ns3_3.table (id bigint) USING foo") sql("CREATE TABLE testcat2.testcat.table (id bigint) USING foo") + sql("CREATE TABLE testcat2.testcat.ns1.ns1_1.table (id bigint) USING foo") // Catalog is resolved to 'testcat'. sql("USE testcat.ns1.ns1_1") @@ -1358,6 +1359,11 @@ class DataSourceV2SQLSuite assert(catalogManager.currentCatalog.name() == "testcat2") assert(catalogManager.currentNamespace === Array("testcat")) + // Only the namespace is changed (explicit). + sql("USE NAMESPACE testcat.ns1.ns1_1") + assert(catalogManager.currentCatalog.name() == "testcat2") + assert(catalogManager.currentNamespace === Array("testcat", "ns1", "ns1_1")) + // Catalog is resolved to `testcat`. sql("USE testcat") assert(catalogManager.currentCatalog.name() == "testcat") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org