turboFei commented on a change in pull request #31431:
URL: https://github.com/apache/spark/pull/31431#discussion_r568502937
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
##########
@@ -1374,7 +1375,55 @@ case class RefreshTableCommand(tableIdent:
TableIdentifier)
override def run(sparkSession: SparkSession): Seq[Row] = {
// Refresh the given table's metadata. If this table is cached as an
InMemoryRelation,
// drop the original cached version and make the new version cached lazily.
- sparkSession.catalog.refreshTable(tableIdent.quotedString)
+ // If this table is a view, also refresh its underlying tables.
+ refreshAllUnderlyingTables(tableIdent, sparkSession)
Seq.empty[Row]
}
+
+ private def refreshAllUnderlyingTables(
+ tableIdent: TableIdentifier, sparkSession: SparkSession): Unit = {
+ val tableName = tableIdent.unquotedString
+ val catalog = sparkSession.sessionState.catalog
+ val table = catalog.getTempViewOrPermanentTableMetadata(tableIdent)
+ sparkSession.catalog.refreshTable(tableName)
+
+ if (table.tableType == CatalogTableType.VIEW) {
+ if (table.viewText.isDefined) {
+ val parser = sparkSession.sessionState.sqlParser
+ val unresolvedPlan = parser.parsePlan(table.viewText.get)
+ findOutUnderlyingTables(unresolvedPlan).foreach { t =>
+ refreshAllUnderlyingTables(t, sparkSession)
+ }
+ } else {
+ catalog.getTempView(tableName)
+ .orElse(catalog.getGlobalTempView(tableName))
+ .map(findOutUnderlyingTables).foreach { tables =>
Review comment:
In branch-2.3, the logical plan for temporary view is analyzed logical
plan.
In master branch, it is unresolved logical plan.
So I also enable refresh underlying tables for temporary view.
##########
File path:
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -2581,6 +2581,48 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
}
}
}
+
+ test("SPARK-34322: When refreshing a view, also refresh its underlying
tables") {
+ withTempDir { dir1 =>
+ withTempDir { dir2 =>
+ withTable("ta", "tb") {
+ withView("v") {
+ withTempView("tv1", "tv2") {
+ sql(s"create table ta(id int) using parquet")
+ sql(s"create table tb(id int) using parquet")
+ sql("create view v as select * from ta")
+ sql("create temporary view tv1 as select * from tb")
+ sql("create temporary view tv2 as select * from tv1")
+ sql("insert into table ta values(1)")
+ sql("cache table ta")
+ sql("insert into table tb values(1)")
+ sql("cache table tb")
+
+ val qualifiedTaName = QualifiedTableName("default", "ta")
+ val cachedTa =
spark.sessionState.catalog.getCachedTable(qualifiedTaName)
+
+ val qualifiedTbName = QualifiedTableName("default", "tb")
+ val cachedTb =
spark.sessionState.catalog.getCachedTable(qualifiedTaName)
+
+ val newSession = spark.newSession()
+ newSession.sql(s"alter table ta set location
'${dir1.getAbsolutePath}'")
+ newSession.sql("insert into table ta values(2)")
+ newSession.sql(s"alter table tb set location
'${dir2.getAbsolutePath}'")
+ newSession.sql("insert into table tb values(2)")
+ newSession.sessionState.catalog.cacheTable(qualifiedTaName,
cachedTa)
Review comment:
because the new session shared the catalog with original spark session.
And alter table set location command will invalid cached table, so I have to
cache old cache info manually.
##########
File path:
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -2581,6 +2581,48 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
}
}
}
+
+ test("SPARK-34322: When refreshing a view, also refresh its underlying
tables") {
+ withTempDir { dir1 =>
+ withTempDir { dir2 =>
+ withTable("ta", "tb") {
+ withView("v") {
+ withTempView("tv1", "tv2") {
+ sql(s"create table ta(id int) using parquet")
+ sql(s"create table tb(id int) using parquet")
+ sql("create view v as select * from ta")
+ sql("create temporary view tv1 as select * from tb")
+ sql("create temporary view tv2 as select * from tv1")
+ sql("insert into table ta values(1)")
+ sql("cache table ta")
+ sql("insert into table tb values(1)")
+ sql("cache table tb")
+
+ val qualifiedTaName = QualifiedTableName("default", "ta")
+ val cachedTa =
spark.sessionState.catalog.getCachedTable(qualifiedTaName)
+
+ val qualifiedTbName = QualifiedTableName("default", "tb")
+ val cachedTb =
spark.sessionState.catalog.getCachedTable(qualifiedTaName)
+
+ val newSession = spark.newSession()
+ newSession.sql(s"alter table ta set location
'${dir1.getAbsolutePath}'")
+ newSession.sql("insert into table ta values(2)")
+ newSession.sql(s"alter table tb set location
'${dir2.getAbsolutePath}'")
+ newSession.sql("insert into table tb values(2)")
+ newSession.sessionState.catalog.cacheTable(qualifiedTaName,
cachedTa)
Review comment:
because the new session share the catalog with original spark session.
And alter table set location command will invalid cached table, so I have to
cache old cache info manually.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]