This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8c02823b49a [SPARK-39583][SQL] Make RefreshTable be compatible with 3 layer namespace 8c02823b49a is described below commit 8c02823b49a6f28005236e4965a25e664d73ebea Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Thu Jun 30 22:10:18 2022 +0800 [SPARK-39583][SQL] Make RefreshTable be compatible with 3 layer namespace ### What changes were proposed in this pull request? Make RefreshTable be compatible with 3 layer namespace ### Why are the changes needed? This is a part of the effort to make Catalog API support 3l namespace ### Does this PR introduce _any_ user-facing change? Yes. The API will support 3l namespace but maintain backwards compatibility ### How was this patch tested? UT Closes #36983 from amaliujia/refreshtable. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/internal/CatalogImpl.scala | 23 +++++++++++++++------- .../apache/spark/sql/internal/CatalogSuite.scala | 21 ++++++++++++++++++++ 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 97226736691..98220f3b229 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, SupportsNamespaces, TableCatalog} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, IdentifierHelper, TransformHelper} import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.StructType import org.apache.spark.storage.StorageLevel @@ -700,17 +701,25 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * @since 2.0.0 */ override def refreshTable(tableName: String): Unit = { - val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) - val relation = sparkSession.table(tableIdent).queryExecution.analyzed + val relation = sparkSession.table(tableName).queryExecution.analyzed relation.refresh() // Temporary and global temporary views are not supposed to be put into the relation cache - // since they are tracked separately. - if (!sessionCatalog.isTempView(tableIdent)) { - sessionCatalog.invalidateCachedTable(tableIdent) + // since they are tracked separately. V1 and V2 plans are cache invalidated accordingly. + relation match { + case SubqueryAlias(_, v: View) if !v.isTempView => + sessionCatalog.invalidateCachedTable(v.desc.identifier) + case SubqueryAlias(_, r: LogicalRelation) => + sessionCatalog.invalidateCachedTable(r.catalogTable.get.identifier) + case SubqueryAlias(_, h: HiveTableRelation) => + sessionCatalog.invalidateCachedTable(h.tableMeta.identifier) + case SubqueryAlias(_, r: DataSourceV2Relation) => + r.catalog.get.asTableCatalog.invalidateTable(r.identifier.get) + case SubqueryAlias(_, v: View) if v.isTempView => + case _ => + throw QueryCompilationErrors.unexpectedTypeOfRelationError(relation, tableName) } - // Re-caches the logical plan of the relation. // Note this is a no-op for the relation itself if it's not cached, but will clear all // caches referencing this relation. If this relation is cached as an InMemoryRelation, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 13f6965a8e8..07c21fff712 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.internal import java.io.File +import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfter import org.apache.spark.sql.{AnalysisException, DataFrame} @@ -795,4 +796,24 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(spark.catalog.currentCatalog().equals("spark_catalog")) assert(spark.catalog.listCatalogs().collect().map(c => c.name).toSet == Set("testcat")) } + + test("SPARK-39583: Make RefreshTable be compatible with 3 layer namespace") { + withTempDir { dir => + val tableName = "spark_catalog.default.my_table" + + sql(s""" + | CREATE TABLE ${tableName}(col STRING) USING TEXT + | LOCATION '${dir.getAbsolutePath}' + |""".stripMargin) + sql(s"""INSERT INTO ${tableName} SELECT 'abc'""".stripMargin) + spark.catalog.cacheTable(tableName) + assert(spark.table(tableName).collect().length == 1) + + FileUtils.deleteDirectory(dir) + assert(spark.table(tableName).collect().length == 1) + + spark.catalog.refreshTable(tableName) + assert(spark.table(tableName).collect().length == 0) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org