Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21594#discussion_r197600101
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
    @@ -105,24 +105,58 @@ class CacheManager extends Logging {
       }
     
       /**
    -   * Un-cache all the cache entries that refer to the given plan.
    +   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
    +   * @param query     The [[Dataset]] to be un-cached.
    +   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
    +   *                  [[Dataset]]; otherwise un-cache the given 
[[Dataset]] only.
    +   * @param blocking  Whether to block until all blocks are deleted.
        */
    -  def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = 
writeLock {
    -    uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
    +  def uncacheQuery(query: Dataset[_],
    +      cascade: Boolean, blocking: Boolean = true): Unit = writeLock {
    +    uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
       }
     
       /**
    -   * Un-cache all the cache entries that refer to the given plan.
    +   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
    +   * @param spark     The Spark session.
    +   * @param plan      The plan to be un-cached.
    +   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
    +   *                  plan; otherwise un-cache the given plan only.
    +   * @param blocking  Whether to block until all blocks are deleted.
        */
    -  def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: 
Boolean): Unit = writeLock {
    +  def uncacheQuery(spark: SparkSession, plan: LogicalPlan,
    +      cascade: Boolean, blocking: Boolean): Unit = writeLock {
    +    val shouldRemove: LogicalPlan => Boolean =
    +      if (cascade) {
    +        _.find(_.sameResult(plan)).isDefined
    +      } else {
    +        _.sameResult(plan)
    +      }
         val it = cachedData.iterator()
         while (it.hasNext) {
           val cd = it.next()
    -      if (cd.plan.find(_.sameResult(plan)).isDefined) {
    +      if (shouldRemove(cd.plan)) {
             cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
             it.remove()
           }
         }
    +    // Re-compile dependent cached queries after removing the cached query.
    +    if (!cascade) {
    +      val it = cachedData.iterator()
    +      val needToRecache = 
scala.collection.mutable.ArrayBuffer.empty[CachedData]
    +      while (it.hasNext) {
    +        val cd = it.next()
    +        if (cd.plan.find(_.sameResult(plan)).isDefined) {
    +          it.remove()
    +          val plan = 
spark.sessionState.executePlan(AnalysisBarrier(cd.plan)).executedPlan
    +          val newCache = InMemoryRelation(
    +            cacheBuilder = 
cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
    +            logicalPlan = cd.plan)
    +          needToRecache += cd.copy(cachedRepresentation = newCache)
    +        }
    +      }
    +      needToRecache.foreach(cachedData.add)
    --- End diff --
    
    It's almost the same logic as "recache", except that it tries to reuse the 
cached buffer here. It would be nice to integrate these two, but it would look 
so clean given the inconvenience of copying a CacheBuilder. I'll try though.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to