cloud-fan commented on code in PR #52599:
URL: https://github.com/apache/spark/pull/52599#discussion_r2453791569


##########
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala:
##########
@@ -1853,4 +1865,606 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils
       assert(!spark.catalog.tableExists("SPARK_52684"))
     }
   }
+
+  test("cache DSv2 table with time travel") {
+    val t = "testcat.tbl"
+    val version = "v1"
+    withTable(t, "cached_tt") {
+      sql(s"CREATE TABLE $t (id int, data string) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+      // pin current version
+      pinTable("testcat", Identifier.of(Array(), "tbl"), version)
+
+      // cache pinned version
+      sql(s"CACHE TABLE cached_tt AS SELECT * FROM $t VERSION AS OF 
'$version'")
+      assertCached(sql("SELECT * FROM cached_tt"))
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version'"))
+      checkAnswer(sql("SELECT * FROM cached_tt"), Seq(Row(1, "a"), Row(2, 
"b")))
+
+      // add more data to base table
+      sql(s"INSERT INTO $t VALUES (3, 'c'), (4, 'd')")
+
+      // verify lookupCachedData finds time travel cache
+      val timeTravelDF = sql(s"SELECT * FROM $t VERSION AS OF '$version'")
+      assert(cacheManager.lookupCachedData(timeTravelDF).isDefined)
+
+      // verify base table is not cached
+      assertNotCached(sql(s"SELECT * FROM $t"))
+      assert(!spark.catalog.isCached(t))
+
+      // verify lookupCachedData does NOT match base table with time travel 
cache
+      val baseDF = sql(s"SELECT * FROM $t")
+      assert(cacheManager.lookupCachedData(baseDF).isEmpty)
+    }
+  }
+
+  test("uncache DSv2 table by name to invalidate base and time travel plans") {
+    val t = "testcat.tbl"
+    val version = "v1"
+    withTable(t, "cached_tt") {
+      sql(s"CREATE TABLE $t (id int, data string) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+      // pin current version
+      pinTable("testcat", Identifier.of(Array(), "tbl"), version)
+
+      // insert more data to base table
+      sql(s"INSERT INTO $t VALUES (3, 'c'), (2, 'b')")
+
+      // cache base table
+      sql(s"CACHE TABLE $t")
+      assertCached(sql(s"SELECT * FROM $t"))
+      assert(spark.catalog.isCached(t))
+
+      // cache pinned version
+      sql(s"CACHE TABLE cached_tt AS SELECT * FROM $t VERSION AS OF 
'$version'")
+      assertCached(sql("SELECT * FROM cached_tt"))
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version'"))
+
+      // verify lookupCachedData finds separate entries for base and time 
travel plans
+      val baseDF = sql(s"SELECT * FROM $t")
+      val timeTravelDF = sql(s"SELECT * FROM $t VERSION AS OF '$version'")
+      assert(cacheManager.lookupCachedData(baseDF).isDefined)
+      assert(cacheManager.lookupCachedData(timeTravelDF).isDefined)
+      assert(cacheManager.lookupCachedData(baseDF) != 
cacheManager.lookupCachedData(timeTravelDF))
+
+      // uncaching base table by name should affect ALL time-traveled cache 
entries
+      spark.catalog.uncacheTable(t)
+      assertNotCached(sql(s"SELECT * FROM $t"))
+      assertNotCached(sql("SELECT * FROM cached_tt"))
+
+      // verify lookupCachedData returns None after uncaching
+      assert(cacheManager.lookupCachedData(baseDF).isEmpty)
+      assert(cacheManager.lookupCachedData(timeTravelDF).isEmpty)
+    }
+  }
+
+  test("uncache DSv2 table with time travel by plan") {
+    val t = "testcat.tbl"
+    val version = "v1"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id int, data string) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+      // pin current version
+      pinTable("testcat", Identifier.of(Array(), "tbl"), version)
+
+      // cache pinned version
+      val timeTravelDF = sql(s"SELECT * FROM $t VERSION AS OF '$version'")
+      timeTravelDF.cache()
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version'"))
+
+      // verify base table is not affected
+      assertNotCached(sql(s"SELECT * FROM $t"))
+
+      // verify lookupCachedData finds the cache before uncaching
+      assert(cacheManager.lookupCachedData(timeTravelDF).isDefined)
+
+      // uncache pinned version by plan
+      cacheManager.uncacheQuery(timeTravelDF, cascade = false)
+      assertNotCached(sql(s"SELECT * FROM $t VERSION AS OF '$version'"))
+
+      // verify lookupCachedData returns None after uncaching
+      assert(cacheManager.lookupCachedData(timeTravelDF).isEmpty)
+    }
+  }
+
+  test("uncache DSv2 table by plan should not affect time travel") {
+    val t = "testcat.tbl"
+    val version = "v1"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id int, data string) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+      // pin current version
+      pinTable("testcat", Identifier.of(Array(), "tbl"), version)
+
+      // cache base table
+      val baseDF = sql(s"SELECT * FROM $t")
+      baseDF.cache()
+      assertCached(sql(s"SELECT * FROM $t"))
+
+      // cache pinned version
+      val timeTravelDF = sql(s"SELECT * FROM $t VERSION AS OF '$version'")
+      timeTravelDF.cache()
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version'"))
+
+      // uncache base table by plan
+      baseDF.unpersist(blocking = true)
+
+      // verify only base table plan is affected
+      assertNotCached(baseDF)
+      assertNotCached(sql(s"SELECT * FROM $t"))
+      assertCached(timeTravelDF)
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version'"))
+    }
+  }
+
+  test("look up DSv2 table by relation with multiple time travel versions") {
+    val t = "testcat.tbl"
+    val ident = Identifier.of(Array(), "tbl")
+    val version1 = "v1"
+    val version2 = "v2"
+    withTable(t, "cached_tt1", "cached_tt2") {
+      sql(s"CREATE TABLE $t (id int, data string) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+      // pin v1
+      pinTable("testcat", ident, version1)
+
+      sql(s"INSERT INTO $t VALUES (3, 'c'), (4, 'd')")
+
+      // pin v2
+      pinTable("testcat", ident, version2)
+
+      // cache base
+      sql(s"CACHE TABLE $t")
+      assertCached(sql(s"SELECT * FROM $t"))
+      checkAnswer(
+        sql(s"SELECT * FROM $t"),
+        Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"), Row(4, "d")))
+
+      // cache v1
+      sql(s"CACHE TABLE cached_tt1 AS SELECT * FROM $t VERSION AS OF 
'$version1'")
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version1'"))
+      checkAnswer(
+        sql(s"SELECT * FROM $t VERSION AS OF '$version1'"),
+        Seq(Row(1, "a"), Row(2, "b")))
+
+      // cache v2
+      sql(s"CACHE TABLE cached_tt2 AS SELECT * FROM $t VERSION AS OF 
'$version2'")
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version2'"))
+      checkAnswer(
+        sql(s"SELECT * FROM $t VERSION AS OF '$version2'"),
+        Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"), Row(4, "d")))
+
+      // verify lookupCachedData finds distinct entries for each version
+      val baseDF = sql(s"SELECT * FROM $t")
+      assert(cacheManager.lookupCachedData(baseDF).isDefined)
+      val v1DF = sql(s"SELECT * FROM $t VERSION AS OF '$version1'")
+      assert(cacheManager.lookupCachedData(v1DF).isDefined)
+      val v2DF = sql(s"SELECT * FROM $t VERSION AS OF '$version2'")
+      assert(cacheManager.lookupCachedData(v2DF).isDefined)
+
+      // look up cache using DataSourceV2Relation directly
+      val cat = catalog("testcat").asTableCatalog
+      val baseTable = cat.loadTable(ident)
+      val baseRelation = DataSourceV2Relation.create(baseTable, Some(cat), 
Some(ident))
+      assert(cacheManager.lookupCachedData(spark, baseRelation).isDefined)
+      val v1Table = cat.loadTable(ident, version1)
+      val v1Relation = baseRelation.copy(
+        table = v1Table,
+        timeTravelSpec = Some(AsOfVersion(version1)))
+      assert(cacheManager.lookupCachedData(spark, v1Relation).isDefined)
+      val v2Table = cat.loadTable(ident, version2)
+      val v2Relation = baseRelation.copy(
+        table = v2Table,
+        timeTravelSpec = Some(AsOfVersion(version2)))
+      assert(cacheManager.lookupCachedData(spark, v2Relation).isDefined)
+
+      // uncache using DataSourceV2Relation directly
+      sql(s"UNCACHE TABLE $t")
+      assert(cacheManager.lookupCachedData(spark, baseRelation).isEmpty)
+      assert(cacheManager.lookupCachedData(spark, v1Relation).isEmpty)
+      assert(cacheManager.lookupCachedData(spark, v2Relation).isEmpty)
+
+      // verify queries don't use cache
+      assertNotCached(sql(s"SELECT * FROM $t"))
+      assertNotCached(sql(s"SELECT * FROM $t VERSION AS OF '$version1'"))
+      assertNotCached(sql(s"SELECT * FROM $t VERSION AS OF '$version2'"))
+    }
+  }
+
+  test("uncache DSv2 table using SQL") {
+    val t = "testcat.tbl"
+    val ident = Identifier.of(Array(), "tbl")
+    val version1 = "v1"
+    val version2 = "v2"
+    withTable(t, "cached_tt1", "cached_tt2") {
+      sql(s"CREATE TABLE $t (id int, data string) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+      // pin v1
+      pinTable("testcat", ident, version1)
+
+      sql(s"INSERT INTO $t VALUES (3, 'c'), (4, 'd')")
+
+      // pin v2
+      pinTable("testcat", ident, version2)
+
+      // cache base and both pinned versions
+      sql(s"CACHE TABLE $t")
+      assertCached(sql(s"SELECT * FROM $t"))
+      sql(s"CACHE TABLE cached_tt1 AS SELECT * FROM $t VERSION AS OF 
'$version1'")
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version1'"))
+      sql(s"CACHE TABLE cached_tt2 AS SELECT * FROM $t VERSION AS OF 
'$version2'")
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version2'"))
+
+      // uncache all plans using SQL
+      sql(s"UNCACHE TABLE $t")
+
+      // verify queries don't use cache
+      assertNotCached(sql(s"SELECT * FROM $t"))
+      assertNotCached(sql(s"SELECT * FROM $t VERSION AS OF '$version1'"))
+      assertNotCached(sql(s"SELECT * FROM $t VERSION AS OF '$version2'"))
+    }
+  }
+
+  test("uncache DSv2 table using uncacheTableOrView") {
+    val t = "testcat.tbl"
+    val ident = Identifier.of(Array(), "tbl")
+    val version1 = "v1"
+    val version2 = "v2"
+    withTable(t, "cached_tt1", "cached_tt2") {
+      sql(s"CREATE TABLE $t (id int, data string) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+      // pin v1
+      pinTable("testcat", ident, version1)
+
+      sql(s"INSERT INTO $t VALUES (3, 'c'), (4, 'd')")
+
+      // pin v2
+      pinTable("testcat", ident, version2)
+
+      // cache base and both pinned versions
+      sql(s"CACHE TABLE $t")
+      assertCached(sql(s"SELECT * FROM $t"))
+      sql(s"CACHE TABLE cached_tt1 AS SELECT * FROM $t VERSION AS OF 
'$version1'")
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version1'"))
+      sql(s"CACHE TABLE cached_tt2 AS SELECT * FROM $t VERSION AS OF 
'$version2'")
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version2'"))
+
+      // uncache all plans using uncacheTableOrView
+      cacheManager.uncacheTableOrView(spark, Seq("testcat", "tbl"), cascade = 
true)
+
+      // verify queries don't use cache
+      assertNotCached(sql(s"SELECT * FROM $t"))
+      assertNotCached(sql(s"SELECT * FROM $t VERSION AS OF '$version1'"))
+      assertNotCached(sql(s"SELECT * FROM $t VERSION AS OF '$version2'"))
+    }
+  }
+
+  test("REFRESH TABLE refreshes time travel plans correctly") {
+    val t = "testcat.tbl"
+    val ident = Identifier.of(Array(), "tbl")
+    val version1 = "v1"
+    val version2 = "v2"
+    withTable(t, "cached_tt1", "cached_tt2") {
+      sql(s"CREATE TABLE $t (id int, data string) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+      // pin versions
+      pinTable("testcat", ident, version1)
+      pinTable("testcat", ident, version2)
+
+      // cache base and both versions
+      sql(s"CACHE TABLE $t")
+      assertCached(sql(s"SELECT * FROM $t"))
+      sql(s"CACHE TABLE cached_tt1 AS SELECT * FROM $t VERSION AS OF 
'$version1'")
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version1'"))
+      sql(s"CACHE TABLE cached_tt2 AS SELECT * FROM $t VERSION AS OF 
'$version2'")
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version2'"))
+
+      // must have 3 cache entries
+      assert(cacheManager.numCachedEntries == 3)
+      checkCacheLoading(sql(s"SELECT * FROM $t"), isLoaded = true)
+      checkCacheLoading(sql(s"SELECT * FROM cached_tt1"), isLoaded = true)
+      checkCacheLoading(sql(s"SELECT * FROM cached_tt2"), isLoaded = true)
+
+      // refresh table by name to invalidate all plans
+      sql(s"REFRESH TABLE $t")
+
+      // all entries must be refreshed
+      checkCacheLoading(sql(s"SELECT * FROM $t"), isLoaded = false)
+      checkCacheLoading(sql(s"SELECT * FROM cached_tt1"), isLoaded = false)
+      checkCacheLoading(sql(s"SELECT * FROM cached_tt2"), isLoaded = false)
+    }
+  }
+
+  test("recacheByTableName with time travel plans") {
+    val t = "testcat.tbl"
+    val ident = Identifier.of(Array(), "tbl")
+    val version1 = "v1"
+    val version2 = "v2"
+    withTable(t, "cached_tt1", "cached_tt2") {
+      sql(s"CREATE TABLE $t (id int, data string) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+      // pin versions
+      pinTable("testcat", ident, version1)
+      pinTable("testcat", ident, version2)
+
+      // cache base and both versions
+      sql(s"CACHE TABLE $t")
+      assertCached(sql(s"SELECT * FROM $t"))
+      sql(s"CACHE TABLE cached_tt1 AS SELECT * FROM $t VERSION AS OF 
'$version1'")
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version1'"))
+      sql(s"CACHE TABLE cached_tt2 AS SELECT * FROM $t VERSION AS OF 
'$version2'")
+      assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version2'"))
+
+      // must have 3 cache entries
+      assert(cacheManager.numCachedEntries == 3)
+      checkCacheLoading(sql(s"SELECT * FROM $t"), isLoaded = true)
+      checkCacheLoading(sql(s"SELECT * FROM cached_tt1"), isLoaded = true)
+      checkCacheLoading(sql(s"SELECT * FROM cached_tt2"), isLoaded = true)
+
+      // refresh base, keep pinned versions cached
+      cacheManager.recacheTableOrView(spark, Seq("testcat", "tbl"), 
includeTimeTravel = false)
+
+      // time travel entries must NOT be refreshed
+      checkCacheLoading(sql(s"SELECT * FROM $t"), isLoaded = false)
+      checkCacheLoading(sql(s"SELECT * FROM cached_tt1"), isLoaded = true)
+      checkCacheLoading(sql(s"SELECT * FROM cached_tt2"), isLoaded = true)
+
+      // refresh all
+      cacheManager.recacheTableOrView(spark, Seq("testcat", "tbl"))
+
+      // all plans must be refreshed
+      checkCacheLoading(sql(s"SELECT * FROM $t"), isLoaded = false)
+      checkCacheLoading(sql(s"SELECT * FROM cached_tt1"), isLoaded = false)
+      checkCacheLoading(sql(s"SELECT * FROM cached_tt2"), isLoaded = false)
+    }
+  }
+
+  private def checkCacheLoading(ds: Dataset[_], isLoaded: Boolean): Unit = {
+    cacheManager.lookupCachedData(ds) match {
+      case Some(entry) =>
+        
assert(entry.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded == 
isLoaded)
+      case _ =>
+        fail("dataset is not cached")
+    }
+  }
+
+  test("RENAME TABLE manages cache with time travel plans correctly") {
+    val t = "testcat.tbl"
+    val tRenamed = "testcat.tbl_renamed"
+    val ident = Identifier.of(Array(), "tbl")
+    val version1 = "v1"
+    val version2 = "v2"
+    withTable(t, tRenamed, "cached_tt1", "cached_tt2") {
+      sql(s"CREATE TABLE $t (id int, data string) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+      // pin versions
+      pinTable("testcat", ident, version1)
+      pinTable("testcat", ident, version2)

Review Comment:
   let's make these two versions have different data



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to