Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21594#discussion_r197586970
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
    @@ -105,24 +105,58 @@ class CacheManager extends Logging {
       }
     
       /**
    -   * Un-cache all the cache entries that refer to the given plan.
    +   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
    +   * @param query     The [[Dataset]] to be un-cached.
    +   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
    +   *                  [[Dataset]]; otherwise un-cache the given 
[[Dataset]] only.
    +   * @param blocking  Whether to block until all blocks are deleted.
        */
    -  def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = 
writeLock {
    -    uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
    +  def uncacheQuery(query: Dataset[_],
    +      cascade: Boolean, blocking: Boolean = true): Unit = writeLock {
    +    uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
       }
     
       /**
    -   * Un-cache all the cache entries that refer to the given plan.
    +   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
    +   * @param spark     The Spark session.
    +   * @param plan      The plan to be un-cached.
    +   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
    +   *                  plan; otherwise un-cache the given plan only.
    +   * @param blocking  Whether to block until all blocks are deleted.
        */
    -  def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: 
Boolean): Unit = writeLock {
    +  def uncacheQuery(spark: SparkSession, plan: LogicalPlan,
    +      cascade: Boolean, blocking: Boolean): Unit = writeLock {
    +    val shouldRemove: LogicalPlan => Boolean =
    +      if (cascade) {
    +        _.find(_.sameResult(plan)).isDefined
    +      } else {
    +        _.sameResult(plan)
    +      }
         val it = cachedData.iterator()
         while (it.hasNext) {
           val cd = it.next()
    -      if (cd.plan.find(_.sameResult(plan)).isDefined) {
    +      if (shouldRemove(cd.plan)) {
             cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
             it.remove()
           }
         }
    +    // Re-compile dependent cached queries after removing the cached query.
    +    if (!cascade) {
    +      val it = cachedData.iterator()
    +      val needToRecache = 
scala.collection.mutable.ArrayBuffer.empty[CachedData]
    +      while (it.hasNext) {
    +        val cd = it.next()
    +        if (cd.plan.find(_.sameResult(plan)).isDefined) {
    +          it.remove()
    +          val plan = 
spark.sessionState.executePlan(AnalysisBarrier(cd.plan)).executedPlan
    +          val newCache = InMemoryRelation(
    +            cacheBuilder = 
cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
    +            logicalPlan = cd.plan)
    +          needToRecache += cd.copy(cachedRepresentation = newCache)
    +        }
    +      }
    +      needToRecache.foreach(cachedData.add)
    --- End diff --
    
    create a private function from line 144 and line 158?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to