cloud-fan commented on code in PR #52599:
URL: https://github.com/apache/spark/pull/52599#discussion_r2453789801
##########
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala:
##########
@@ -1853,4 +1865,606 @@ class CachedTableSuite extends QueryTest with
SQLTestUtils
assert(!spark.catalog.tableExists("SPARK_52684"))
}
}
+
+ test("cache DSv2 table with time travel") {
+ val t = "testcat.tbl"
+ val version = "v1"
+ withTable(t, "cached_tt") {
+ sql(s"CREATE TABLE $t (id int, data string) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+ // pin current version
+ pinTable("testcat", Identifier.of(Array(), "tbl"), version)
+
+ // cache pinned version
+ sql(s"CACHE TABLE cached_tt AS SELECT * FROM $t VERSION AS OF
'$version'")
+ assertCached(sql("SELECT * FROM cached_tt"))
+ assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version'"))
+ checkAnswer(sql("SELECT * FROM cached_tt"), Seq(Row(1, "a"), Row(2,
"b")))
+
+ // add more data to base table
+ sql(s"INSERT INTO $t VALUES (3, 'c'), (4, 'd')")
+
+ // verify lookupCachedData finds time travel cache
+ val timeTravelDF = sql(s"SELECT * FROM $t VERSION AS OF '$version'")
+ assert(cacheManager.lookupCachedData(timeTravelDF).isDefined)
+
+ // verify base table is not cached
+ assertNotCached(sql(s"SELECT * FROM $t"))
+ assert(!spark.catalog.isCached(t))
+
+ // verify lookupCachedData does NOT match base table with time travel
cache
+ val baseDF = sql(s"SELECT * FROM $t")
+ assert(cacheManager.lookupCachedData(baseDF).isEmpty)
+ }
+ }
+
+ test("uncache DSv2 table by name to invalidate base and time travel plans") {
+ val t = "testcat.tbl"
+ val version = "v1"
+ withTable(t, "cached_tt") {
+ sql(s"CREATE TABLE $t (id int, data string) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+ // pin current version
+ pinTable("testcat", Identifier.of(Array(), "tbl"), version)
+
+ // insert more data to base table
+ sql(s"INSERT INTO $t VALUES (3, 'c'), (2, 'b')")
+
+ // cache base table
+ sql(s"CACHE TABLE $t")
+ assertCached(sql(s"SELECT * FROM $t"))
+ assert(spark.catalog.isCached(t))
+
+ // cache pinned version
+ sql(s"CACHE TABLE cached_tt AS SELECT * FROM $t VERSION AS OF
'$version'")
+ assertCached(sql("SELECT * FROM cached_tt"))
+ assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version'"))
+
+ // verify lookupCachedData finds separate entries for base and time
travel plans
+ val baseDF = sql(s"SELECT * FROM $t")
+ val timeTravelDF = sql(s"SELECT * FROM $t VERSION AS OF '$version'")
+ assert(cacheManager.lookupCachedData(baseDF).isDefined)
+ assert(cacheManager.lookupCachedData(timeTravelDF).isDefined)
+ assert(cacheManager.lookupCachedData(baseDF) !=
cacheManager.lookupCachedData(timeTravelDF))
+
+ // uncaching base table by name should affect ALL time-traveled cache
entries
+ spark.catalog.uncacheTable(t)
+ assertNotCached(sql(s"SELECT * FROM $t"))
+ assertNotCached(sql("SELECT * FROM cached_tt"))
+
+ // verify lookupCachedData returns None after uncaching
+ assert(cacheManager.lookupCachedData(baseDF).isEmpty)
+ assert(cacheManager.lookupCachedData(timeTravelDF).isEmpty)
+ }
+ }
+
+ test("uncache DSv2 table with time travel by plan") {
+ val t = "testcat.tbl"
+ val version = "v1"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id int, data string) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+ // pin current version
+ pinTable("testcat", Identifier.of(Array(), "tbl"), version)
+
+ // cache pinned version
+ val timeTravelDF = sql(s"SELECT * FROM $t VERSION AS OF '$version'")
+ timeTravelDF.cache()
+ assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version'"))
+
+ // verify base table is not affected
+ assertNotCached(sql(s"SELECT * FROM $t"))
+
+ // verify lookupCachedData finds the cache before uncaching
+ assert(cacheManager.lookupCachedData(timeTravelDF).isDefined)
+
+ // uncache pinned version by plan
+ cacheManager.uncacheQuery(timeTravelDF, cascade = false)
+ assertNotCached(sql(s"SELECT * FROM $t VERSION AS OF '$version'"))
+
+ // verify lookupCachedData returns None after uncaching
+ assert(cacheManager.lookupCachedData(timeTravelDF).isEmpty)
+ }
+ }
+
+ test("uncache DSv2 table by plan should not affect time travel") {
+ val t = "testcat.tbl"
+ val version = "v1"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id int, data string) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+ // pin current version
+ pinTable("testcat", Identifier.of(Array(), "tbl"), version)
+
+ // cache base table
+ val baseDF = sql(s"SELECT * FROM $t")
+ baseDF.cache()
+ assertCached(sql(s"SELECT * FROM $t"))
+
+ // cache pinned version
+ val timeTravelDF = sql(s"SELECT * FROM $t VERSION AS OF '$version'")
+ timeTravelDF.cache()
+ assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version'"))
+
+ // uncache base table by plan
+ baseDF.unpersist(blocking = true)
+
+ // verify only base table plan is affected
+ assertNotCached(baseDF)
+ assertNotCached(sql(s"SELECT * FROM $t"))
+ assertCached(timeTravelDF)
+ assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version'"))
+ }
+ }
+
+ test("look up DSv2 table by relation with multiple time travel versions") {
+ val t = "testcat.tbl"
+ val ident = Identifier.of(Array(), "tbl")
+ val version1 = "v1"
+ val version2 = "v2"
+ withTable(t, "cached_tt1", "cached_tt2") {
+ sql(s"CREATE TABLE $t (id int, data string) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+ // pin v1
+ pinTable("testcat", ident, version1)
+
+ sql(s"INSERT INTO $t VALUES (3, 'c'), (4, 'd')")
+
+ // pin v2
+ pinTable("testcat", ident, version2)
+
+ // cache base
+ sql(s"CACHE TABLE $t")
+ assertCached(sql(s"SELECT * FROM $t"))
+ checkAnswer(
+ sql(s"SELECT * FROM $t"),
+ Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"), Row(4, "d")))
+
+ // cache v1
+ sql(s"CACHE TABLE cached_tt1 AS SELECT * FROM $t VERSION AS OF
'$version1'")
+ assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version1'"))
+ checkAnswer(
+ sql(s"SELECT * FROM $t VERSION AS OF '$version1'"),
+ Seq(Row(1, "a"), Row(2, "b")))
+
+ // cache v2
+ sql(s"CACHE TABLE cached_tt2 AS SELECT * FROM $t VERSION AS OF
'$version2'")
+ assertCached(sql(s"SELECT * FROM $t VERSION AS OF '$version2'"))
+ checkAnswer(
+ sql(s"SELECT * FROM $t VERSION AS OF '$version2'"),
+ Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"), Row(4, "d")))
+
+ // verify lookupCachedData finds distinct entries for each version
+ val baseDF = sql(s"SELECT * FROM $t")
+ assert(cacheManager.lookupCachedData(baseDF).isDefined)
+ val v1DF = sql(s"SELECT * FROM $t VERSION AS OF '$version1'")
+ assert(cacheManager.lookupCachedData(v1DF).isDefined)
+ val v2DF = sql(s"SELECT * FROM $t VERSION AS OF '$version2'")
+ assert(cacheManager.lookupCachedData(v2DF).isDefined)
+
+ // look up cache using DataSourceV2Relation directly
+ val cat = catalog("testcat").asTableCatalog
+ val baseTable = cat.loadTable(ident)
+ val baseRelation = DataSourceV2Relation.create(baseTable, Some(cat),
Some(ident))
+ assert(cacheManager.lookupCachedData(spark, baseRelation).isDefined)
+ val v1Table = cat.loadTable(ident, version1)
+ val v1Relation = baseRelation.copy(
+ table = v1Table,
+ timeTravelSpec = Some(AsOfVersion(version1)))
+ assert(cacheManager.lookupCachedData(spark, v1Relation).isDefined)
+ val v2Table = cat.loadTable(ident, version2)
+ val v2Relation = baseRelation.copy(
+ table = v2Table,
+ timeTravelSpec = Some(AsOfVersion(version2)))
+ assert(cacheManager.lookupCachedData(spark, v2Relation).isDefined)
+
+ // uncache using DataSourceV2Relation directly
+ sql(s"UNCACHE TABLE $t")
Review Comment:
I can't connect the dots between this SQL UNCACHE TABLE and `uncache using
DataSourceV2Relation directly`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]