This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new b9282f7  [SPARK-33829][SQL][3.1] Renaming v2 tables should recreate 
the cache
b9282f7 is described below

commit b9282f7f40f2b65def40c23b16772788bc3489f0
Author: Terry Kim <yumin...@gmail.com>
AuthorDate: Mon Dec 21 06:41:09 2020 +0000

    [SPARK-33829][SQL][3.1] Renaming v2 tables should recreate the cache
    
    ### What changes were proposed in this pull request?
    
    Backport of #30825
    
    Currently, renaming v2 tables does not invalidate/recreate the cache, 
leading to an incorrect behavior (cache not being used) when v2 tables are 
renamed. This PR fixes the behavior.
    
    ### Why are the changes needed?
    
    Fixing a bug since the cache associated with the renamed table is not being 
cleaned up/recreated.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, now when a v2 table is renamed, cache is correctly updated.
    
    ### How was this patch tested?
    
    Added a new test
    
    Closes #30856 from imback82/backport_30825.
    
    Authored-by: Terry Kim <yumin...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../datasources/v2/DataSourceV2Strategy.scala      | 29 ++++++++++++++++------
 .../execution/datasources/v2/RenameTableExec.scala | 19 +++++++++++++-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 19 ++++++++++++++
 .../v2/jdbc/JDBCTableCatalogSuite.scala            | 10 +++-----
 4 files changed, 61 insertions(+), 16 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 97dab4b..34bc80c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -31,6 +31,7 @@ import 
org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 import org.apache.spark.sql.sources.{BaseRelation, TableScan}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.storage.StorageLevel
 
 class DataSourceV2Strategy(session: SparkSession) extends Strategy with 
PredicateHelper {
 
@@ -56,18 +57,24 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
     session.sharedState.cacheManager.recacheByPlan(session, r)
   }
 
-  private def invalidateCache(r: ResolvedTable, recacheTable: Boolean = 
false)(): Unit = {
+  private def invalidateCache(
+      r: ResolvedTable,
+      recacheTable: Boolean = false)(): Option[StorageLevel] = {
     val v2Relation = DataSourceV2Relation.create(r.table, Some(r.catalog), 
Some(r.identifier))
     val cache = session.sharedState.cacheManager.lookupCachedData(v2Relation)
     session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade 
= true)
-    if (recacheTable && cache.isDefined) {
-      // save the cache name and cache level for recreation
-      val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName
+    if (cache.isDefined) {
       val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel
 
-      // recache with the same name and cache level.
-      val ds = Dataset.ofRows(session, v2Relation)
-      session.sharedState.cacheManager.cacheQuery(ds, cacheName, cacheLevel)
+      if (recacheTable) {
+        val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName
+        // recache with the same name and cache level.
+        val ds = Dataset.ofRows(session, v2Relation)
+        session.sharedState.cacheManager.cacheQuery(ds, cacheName, cacheLevel)
+      }
+      Some(cacheLevel)
+    } else {
+      None
     }
   }
 
@@ -268,7 +275,13 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       AlterTableExec(catalog, ident, changes) :: Nil
 
     case RenameTable(catalog, oldIdent, newIdent) =>
-      RenameTableExec(catalog, oldIdent, newIdent) :: Nil
+      val tbl = ResolvedTable(catalog, oldIdent, catalog.loadTable(oldIdent))
+      RenameTableExec(
+        catalog,
+        oldIdent,
+        newIdent,
+        invalidateCache(tbl),
+        session.sharedState.cacheManager.cacheQuery) :: Nil
 
     case AlterNamespaceSetProperties(ResolvedNamespace(catalog, ns), 
properties) =>
       AlterNamespaceSetPropertiesExec(catalog.asNamespaceCatalog, ns, 
properties) :: Nil
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameTableExec.scala
index a650607..3dbe72e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameTableExec.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import org.apache.spark.sql.{DataFrame, Dataset}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.storage.StorageLevel
 
 /**
  * Physical plan node for renaming a table.
@@ -27,14 +29,29 @@ import org.apache.spark.sql.connector.catalog.{Identifier, 
TableCatalog}
 case class RenameTableExec(
     catalog: TableCatalog,
     oldIdent: Identifier,
-    newIdent: Identifier) extends V2CommandExec {
+    newIdent: Identifier,
+    invalidateCache: () => Option[StorageLevel],
+    cacheTable: (DataFrame, Option[String], StorageLevel) => Unit)
+  extends V2CommandExec {
 
   override def output: Seq[Attribute] = Seq.empty
 
   override protected def run(): Seq[InternalRow] = {
+    import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+
+    val optOldStorageLevel = invalidateCache()
     catalog.invalidateTable(oldIdent)
+
     catalog.renameTable(oldIdent, newIdent)
 
+    optOldStorageLevel.foreach { oldStorageLevel =>
+      val tbl = catalog.loadTable(newIdent)
+      val newRelation = DataSourceV2Relation.create(tbl, Some(catalog), 
Some(newIdent))
+      cacheTable(
+        Dataset.ofRows(sqlContext.sparkSession, newRelation),
+        Some(newIdent.quoted),
+        oldStorageLevel)
+    }
     Seq.empty
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index a166c5d..972298a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -37,6 +37,7 @@ import 
org.apache.spark.sql.internal.connector.SimpleTableProvider
 import org.apache.spark.sql.sources.SimpleScanSource
 import org.apache.spark.sql.types.{BooleanType, LongType, StringType, 
StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
@@ -862,6 +863,24 @@ class DataSourceV2SQLSuite
     }
   }
 
+  test("SPARK-33829: Renaming a table should recreate a cache while retaining 
the old cache info") {
+    withTable("testcat.ns.old", "testcat.ns.new") {
+      def getStorageLevel(tableName: String): StorageLevel = {
+        val table = spark.table(tableName)
+        val optCachedData = 
spark.sharedState.cacheManager.lookupCachedData(table)
+        assert(optCachedData.isDefined)
+        optCachedData.get.cachedRepresentation.cacheBuilder.storageLevel
+      }
+      sql("CREATE TABLE testcat.ns.old USING foo AS SELECT id, data FROM 
source")
+      sql("CACHE TABLE testcat.ns.old OPTIONS('storageLevel' 'MEMORY_ONLY')")
+      val oldStorageLevel = getStorageLevel("testcat.ns.old")
+
+      sql("ALTER TABLE testcat.ns.old RENAME TO ns.new")
+      val newStorageLevel = getStorageLevel("testcat.ns.new")
+      assert(oldStorageLevel === newStorageLevel)
+    }
+  }
+
   test("Relation: basic") {
     val t1 = "testcat.ns1.ns2.tbl"
     withTable(t1) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
index 9e9df7d..c03768d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
@@ -109,15 +109,11 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
     val exp1 = intercept[NoSuchTableException] {
       sql(s"ALTER TABLE h2.test.not_existing_table RENAME TO test.dst_table")
     }
-    assert(exp1.getMessage.contains(
-      "Failed table renaming from test.not_existing_table to test.dst_table"))
-    assert(exp1.cause.get.getMessage.contains("Table \"not_existing_table\" 
not found"))
-    val exp2 = intercept[NoSuchNamespaceException] {
+    assert(exp1.getMessage.contains("Table test.not_existing_table not found"))
+    val exp2 = intercept[NoSuchTableException] {
       sql(s"ALTER TABLE h2.bad_test.not_existing_table RENAME TO 
test.dst_table")
     }
-    assert(exp2.getMessage.contains(
-      "Failed table renaming from bad_test.not_existing_table to 
test.dst_table"))
-    assert(exp2.cause.get.getMessage.contains("Schema \"bad_test\" not found"))
+    assert(exp2.getMessage.contains("Table bad_test.not_existing_table not 
found"))
     // Rename to an existing table
     withTable("h2.test.dst_table") {
       withConnection { conn =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to