Github user maryannxue commented on a diff in the pull request:
https://github.com/apache/spark/pull/21594#discussion_r197600101
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -105,24 +105,58 @@ class CacheManager extends Logging {
}
/**
- * Un-cache all the cache entries that refer to the given plan.
+ * Un-cache the given plan or all the cache entries that refer to the
given plan.
+ * @param query The [[Dataset]] to be un-cached.
+ * @param cascade If true, un-cache all the cache entries that refer
to the given
+ * [[Dataset]]; otherwise un-cache the given
[[Dataset]] only.
+ * @param blocking Whether to block until all blocks are deleted.
*/
- def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit =
writeLock {
- uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
+ def uncacheQuery(query: Dataset[_],
+ cascade: Boolean, blocking: Boolean = true): Unit = writeLock {
+ uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
}
/**
- * Un-cache all the cache entries that refer to the given plan.
+ * Un-cache the given plan or all the cache entries that refer to the
given plan.
+ * @param spark The Spark session.
+ * @param plan The plan to be un-cached.
+ * @param cascade If true, un-cache all the cache entries that refer
to the given
+ * plan; otherwise un-cache the given plan only.
+ * @param blocking Whether to block until all blocks are deleted.
*/
- def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking:
Boolean): Unit = writeLock {
+ def uncacheQuery(spark: SparkSession, plan: LogicalPlan,
+ cascade: Boolean, blocking: Boolean): Unit = writeLock {
+ val shouldRemove: LogicalPlan => Boolean =
+ if (cascade) {
+ _.find(_.sameResult(plan)).isDefined
+ } else {
+ _.sameResult(plan)
+ }
val it = cachedData.iterator()
while (it.hasNext) {
val cd = it.next()
- if (cd.plan.find(_.sameResult(plan)).isDefined) {
+ if (shouldRemove(cd.plan)) {
cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
it.remove()
}
}
+ // Re-compile dependent cached queries after removing the cached query.
+ if (!cascade) {
+ val it = cachedData.iterator()
+ val needToRecache =
scala.collection.mutable.ArrayBuffer.empty[CachedData]
+ while (it.hasNext) {
+ val cd = it.next()
+ if (cd.plan.find(_.sameResult(plan)).isDefined) {
+ it.remove()
+ val plan =
spark.sessionState.executePlan(AnalysisBarrier(cd.plan)).executedPlan
+ val newCache = InMemoryRelation(
+ cacheBuilder =
cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
+ logicalPlan = cd.plan)
+ needToRecache += cd.copy(cachedRepresentation = newCache)
+ }
+ }
+ needToRecache.foreach(cachedData.add)
--- End diff --
It's almost the same logic as "recache", except that it tries to reuse the
cached buffer here. It would be nice to integrate these two, but it would look
so clean given the inconvenience of copying a CacheBuilder. I'll try though.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]