This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new b9282f7 [SPARK-33829][SQL][3.1] Renaming v2 tables should recreate the cache b9282f7 is described below commit b9282f7f40f2b65def40c23b16772788bc3489f0 Author: Terry Kim <yumin...@gmail.com> AuthorDate: Mon Dec 21 06:41:09 2020 +0000 [SPARK-33829][SQL][3.1] Renaming v2 tables should recreate the cache ### What changes were proposed in this pull request? Backport of #30825 Currently, renaming v2 tables does not invalidate/recreate the cache, leading to an incorrect behavior (cache not being used) when v2 tables are renamed. This PR fixes the behavior. ### Why are the changes needed? Fixing a bug since the cache associated with the renamed table is not being cleaned up/recreated. ### Does this PR introduce _any_ user-facing change? Yes, now when a v2 table is renamed, cache is correctly updated. ### How was this patch tested? Added a new test Closes #30856 from imback82/backport_30825. Authored-by: Terry Kim <yumin...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../datasources/v2/DataSourceV2Strategy.scala | 29 ++++++++++++++++------ .../execution/datasources/v2/RenameTableExec.scala | 19 +++++++++++++- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 19 ++++++++++++++ .../v2/jdbc/JDBCTableCatalogSuite.scala | 10 +++----- 4 files changed, 61 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 97dab4b..34bc80c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec} import org.apache.spark.sql.sources.{BaseRelation, TableScan} import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.storage.StorageLevel class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper { @@ -56,18 +57,24 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat session.sharedState.cacheManager.recacheByPlan(session, r) } - private def invalidateCache(r: ResolvedTable, recacheTable: Boolean = false)(): Unit = { + private def invalidateCache( + r: ResolvedTable, + recacheTable: Boolean = false)(): Option[StorageLevel] = { val v2Relation = DataSourceV2Relation.create(r.table, Some(r.catalog), Some(r.identifier)) val cache = session.sharedState.cacheManager.lookupCachedData(v2Relation) session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true) - if (recacheTable && cache.isDefined) { - // save the cache name and cache level for recreation - val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName + if (cache.isDefined) { val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel - // recache with the same name and cache level. - val ds = Dataset.ofRows(session, v2Relation) - session.sharedState.cacheManager.cacheQuery(ds, cacheName, cacheLevel) + if (recacheTable) { + val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName + // recache with the same name and cache level. + val ds = Dataset.ofRows(session, v2Relation) + session.sharedState.cacheManager.cacheQuery(ds, cacheName, cacheLevel) + } + Some(cacheLevel) + } else { + None } } @@ -268,7 +275,13 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat AlterTableExec(catalog, ident, changes) :: Nil case RenameTable(catalog, oldIdent, newIdent) => - RenameTableExec(catalog, oldIdent, newIdent) :: Nil + val tbl = ResolvedTable(catalog, oldIdent, catalog.loadTable(oldIdent)) + RenameTableExec( + catalog, + oldIdent, + newIdent, + invalidateCache(tbl), + session.sharedState.cacheManager.cacheQuery) :: Nil case AlterNamespaceSetProperties(ResolvedNamespace(catalog, ns), properties) => AlterNamespaceSetPropertiesExec(catalog.asNamespaceCatalog, ns, properties) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameTableExec.scala index a650607..3dbe72e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameTableExec.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution.datasources.v2 +import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.storage.StorageLevel /** * Physical plan node for renaming a table. @@ -27,14 +29,29 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} case class RenameTableExec( catalog: TableCatalog, oldIdent: Identifier, - newIdent: Identifier) extends V2CommandExec { + newIdent: Identifier, + invalidateCache: () => Option[StorageLevel], + cacheTable: (DataFrame, Option[String], StorageLevel) => Unit) + extends V2CommandExec { override def output: Seq[Attribute] = Seq.empty override protected def run(): Seq[InternalRow] = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper + + val optOldStorageLevel = invalidateCache() catalog.invalidateTable(oldIdent) + catalog.renameTable(oldIdent, newIdent) + optOldStorageLevel.foreach { oldStorageLevel => + val tbl = catalog.loadTable(newIdent) + val newRelation = DataSourceV2Relation.create(tbl, Some(catalog), Some(newIdent)) + cacheTable( + Dataset.ofRows(sqlContext.sparkSession, newRelation), + Some(newIdent.quoted), + oldStorageLevel) + } Seq.empty } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index a166c5d..972298a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.internal.connector.SimpleTableProvider import org.apache.spark.sql.sources.SimpleScanSource import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -862,6 +863,24 @@ class DataSourceV2SQLSuite } } + test("SPARK-33829: Renaming a table should recreate a cache while retaining the old cache info") { + withTable("testcat.ns.old", "testcat.ns.new") { + def getStorageLevel(tableName: String): StorageLevel = { + val table = spark.table(tableName) + val optCachedData = spark.sharedState.cacheManager.lookupCachedData(table) + assert(optCachedData.isDefined) + optCachedData.get.cachedRepresentation.cacheBuilder.storageLevel + } + sql("CREATE TABLE testcat.ns.old USING foo AS SELECT id, data FROM source") + sql("CACHE TABLE testcat.ns.old OPTIONS('storageLevel' 'MEMORY_ONLY')") + val oldStorageLevel = getStorageLevel("testcat.ns.old") + + sql("ALTER TABLE testcat.ns.old RENAME TO ns.new") + val newStorageLevel = getStorageLevel("testcat.ns.new") + assert(oldStorageLevel === newStorageLevel) + } + } + test("Relation: basic") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 9e9df7d..c03768d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -109,15 +109,11 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val exp1 = intercept[NoSuchTableException] { sql(s"ALTER TABLE h2.test.not_existing_table RENAME TO test.dst_table") } - assert(exp1.getMessage.contains( - "Failed table renaming from test.not_existing_table to test.dst_table")) - assert(exp1.cause.get.getMessage.contains("Table \"not_existing_table\" not found")) - val exp2 = intercept[NoSuchNamespaceException] { + assert(exp1.getMessage.contains("Table test.not_existing_table not found")) + val exp2 = intercept[NoSuchTableException] { sql(s"ALTER TABLE h2.bad_test.not_existing_table RENAME TO test.dst_table") } - assert(exp2.getMessage.contains( - "Failed table renaming from bad_test.not_existing_table to test.dst_table")) - assert(exp2.cause.get.getMessage.contains("Schema \"bad_test\" not found")) + assert(exp2.getMessage.contains("Table bad_test.not_existing_table not found")) // Rename to an existing table withTable("h2.test.dst_table") { withConnection { conn => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org