Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/21594#discussion_r197586970
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -105,24 +105,58 @@ class CacheManager extends Logging {
}
/**
- * Un-cache all the cache entries that refer to the given plan.
+ * Un-cache the given plan or all the cache entries that refer to the
given plan.
+ * @param query The [[Dataset]] to be un-cached.
+ * @param cascade If true, un-cache all the cache entries that refer
to the given
+ * [[Dataset]]; otherwise un-cache the given
[[Dataset]] only.
+ * @param blocking Whether to block until all blocks are deleted.
*/
- def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit =
writeLock {
- uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
+ def uncacheQuery(query: Dataset[_],
+ cascade: Boolean, blocking: Boolean = true): Unit = writeLock {
+ uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
}
/**
- * Un-cache all the cache entries that refer to the given plan.
+ * Un-cache the given plan or all the cache entries that refer to the
given plan.
+ * @param spark The Spark session.
+ * @param plan The plan to be un-cached.
+ * @param cascade If true, un-cache all the cache entries that refer
to the given
+ * plan; otherwise un-cache the given plan only.
+ * @param blocking Whether to block until all blocks are deleted.
*/
- def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking:
Boolean): Unit = writeLock {
+ def uncacheQuery(spark: SparkSession, plan: LogicalPlan,
+ cascade: Boolean, blocking: Boolean): Unit = writeLock {
+ val shouldRemove: LogicalPlan => Boolean =
+ if (cascade) {
+ _.find(_.sameResult(plan)).isDefined
+ } else {
+ _.sameResult(plan)
+ }
val it = cachedData.iterator()
while (it.hasNext) {
val cd = it.next()
- if (cd.plan.find(_.sameResult(plan)).isDefined) {
+ if (shouldRemove(cd.plan)) {
cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
it.remove()
}
}
+ // Re-compile dependent cached queries after removing the cached query.
+ if (!cascade) {
+ val it = cachedData.iterator()
+ val needToRecache =
scala.collection.mutable.ArrayBuffer.empty[CachedData]
+ while (it.hasNext) {
+ val cd = it.next()
+ if (cd.plan.find(_.sameResult(plan)).isDefined) {
+ it.remove()
+ val plan =
spark.sessionState.executePlan(AnalysisBarrier(cd.plan)).executedPlan
+ val newCache = InMemoryRelation(
+ cacheBuilder =
cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
+ logicalPlan = cd.plan)
+ needToRecache += cd.copy(cachedRepresentation = newCache)
+ }
+ }
+ needToRecache.foreach(cachedData.add)
--- End diff --
create a private function from line 144 and line 158?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]