aokolnychyi commented on code in PR #53143:
URL: https://github.com/apache/spark/pull/53143#discussion_r2547611248


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala:
##########
@@ -352,22 +353,35 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
     }
     needToRecache.foreach { cd =>
       cd.cachedRepresentation.cacheBuilder.clearCache()
+      tryRebuildCacheEntry(spark, cd).foreach { entry =>
+        this.synchronized {
+          if (lookupCachedDataInternal(entry.plan).nonEmpty) {
+            logWarning("While recaching, data was already added to cache.")
+          } else {
+            cachedData = entry +: cachedData
+            CacheManager.logCacheOperation(log"Re-cached Dataframe cache 
entry:" +
+              log"${MDC(DATAFRAME_CACHE_ENTRY, entry)}")
+          }
+        }
+      }
+    }
+  }
+
+  private def tryRebuildCacheEntry(
+      spark: SparkSession,
+      cd: CachedData): Option[CachedData] = {
+    try {
       val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
       val (newKey, newCache) = sessionWithConfigsOff.withActive {
         val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan)

Review Comment:
   @dongjoon-hyun, following up on the question 
[here](https://github.com/apache/spark/pull/53143#pullrequestreview-3489746102).
 I see only one place that potentially calls recache on read and it is related 
to AQE. Usually, only write or REFRESH operations trigger reaching.
   
   ```
     private def buildBuffers(): RDD[CachedBatch] = {
       val cb = try {
         if (supportsColumnarInput) {
           serializer.convertColumnarBatchToCachedBatch(
             cachedPlan.executeColumnar(),
             cachedPlan.output,
             storageLevel,
             cachedPlan.conf)
         } else {
           serializer.convertInternalRowToCachedBatch(
             cachedPlan.execute(),
             cachedPlan.output,
             storageLevel,
             cachedPlan.conf)
         }
       } catch {
         case e: Throwable if cachedPlan.isInstanceOf[AdaptiveSparkPlanExec] =>
           // SPARK-49982: during RDD execution, AQE will execute all stages 
except ResultStage. If any
           // failure happen, the failure will be cached and the next SQL cache 
caller will hit the
           // negative cache. Therefore we need to recache the plan.
           val session = cachedPlan.session
           session.sharedState.cacheManager.recacheByPlan(session, logicalPlan)
           throw e
       }
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to