cloud-fan commented on a change in pull request #31206:
URL: https://github.com/apache/spark/pull/31206#discussion_r560630871
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
##########
@@ -512,49 +512,27 @@ class CatalogImpl(sparkSession: SparkSession) extends
Catalog {
* For Hive metastore table, the metadata is refreshed. For data source
tables, the schema will
* not be inferred and refreshed.
*
- * If this table is cached as an InMemoryRelation, drop the original cached
version and make the
- * new version cached lazily.
+ * If this table is cached as an InMemoryRelation, re-cache the table and
its dependents lazily.
*
- * In addition, refreshing a table also invalidate all caches that have
reference to the table
+ * In addition, refreshing a table also clear all caches that have reference
to the table
* in a cascading manner. This is to prevent incorrect result from the
otherwise staled caches.
*
* @group cachemgmt
* @since 2.0.0
*/
override def refreshTable(tableName: String): Unit = {
val tableIdent =
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
- val tableMetadata =
sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)
- val table = sparkSession.table(tableIdent)
+ val relation = sparkSession.table(tableIdent).queryExecution.analyzed
- if (tableMetadata.tableType == CatalogTableType.VIEW) {
- // Temp or persistent views: refresh (or invalidate) any metadata/data
cached
- // in the plan recursively.
- table.queryExecution.analyzed.refresh()
- } else {
- // Non-temp tables: refresh the metadata cache.
- sessionCatalog.refreshTable(tableIdent)
- }
-
- // If this table is cached as an InMemoryRelation, drop the original
- // cached version and make the new version cached lazily.
- val cache = sparkSession.sharedState.cacheManager.lookupCachedData(table)
-
- // uncache the logical plan.
- // note this is a no-op for the table itself if it's not cached, but will
invalidate all
- // caches referencing this table.
- sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true)
-
- if (cache.nonEmpty) {
- // save the cache name and cache level for recreation
- val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName
- val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel
-
- // creates a new logical plan since the old table refers to old relation
which
- // should be refreshed
- val newTable = sparkSession.table(tableIdent)
+ relation.refresh()
+ sessionCatalog.invalidateCachedTable(tableIdent)
- // recache with the same name and cache level.
- sparkSession.sharedState.cacheManager.cacheQuery(newTable, cacheName,
cacheLevel)
+ // Re-caches the logical plan of the relation.
+ // Note this is a no-op for the relation itself if it's not cached, but
will clear all
+ // caches referencing this relation. If this relation is cached as an
InMemoryRelation,
+ // this will clear the relation cache and caches of all its dependants.
+ relation.children.foreach { child =>
Review comment:
can we make it clearer in the code?
```
assert(relation.isInstanceOf[SubqueryAlias])
recacheByPlan(relation.asInstanceOf[SubqueryAlias].child)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]