holdenk commented on a change in pull request #31517:
URL: https://github.com/apache/spark/pull/31517#discussion_r666424023



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
##########
@@ -315,7 +301,7 @@ private ManagedBuffer getSortBasedShuffleBlockData(
           "shuffle_" + shuffleId + "_" + mapId + "_0.data"),
         shuffleIndexRecord.getOffset(),
         shuffleIndexRecord.getLength());
-    } catch (ExecutionException e) {
+    } catch (CompletionException e) {

Review comment:
       Can you tell me why we needed to change the exception? Is this just what 
Caffeine throws instead? Do we have test coverage for this?

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
##########
@@ -62,21 +61,25 @@ private[history] class ApplicationCache(
 
     /**
      * Removal event notifies the provider to detach the UI.
-     * @param rm removal notification
+     * @param key removal key
+     * @param value removal value
      */
-    override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+    override def onRemoval(key: CacheKey, value: CacheEntry,
+        cause: RemovalCause): Unit = {
       metrics.evictionCount.inc()
-      val key = rm.getKey
-      logDebug(s"Evicting entry ${key}")
-      operations.detachSparkUI(key.appId, key.attemptId, 
rm.getValue().loadedUI.ui)
+      logDebug(s"Evicting entry $key")
+      operations.detachSparkUI(key.appId, key.attemptId, value.loadedUI.ui)
     }
   }
 
   private val appCache: LoadingCache[CacheKey, CacheEntry] = {
-    CacheBuilder.newBuilder()
-        .maximumSize(retainedApplications)
-        .removalListener(removalListener)
-        .build(appLoader)
+    val builder = Caffeine.newBuilder()
+      .maximumSize(retainedApplications)
+      .removalListener(removalListener)
+      // SPARK-34309: Use custom Executor to compatible with
+      // the data eviction behavior of Guava cache
+      .executor((command: Runnable) => command.run())

Review comment:
       Is this going to run in the same thread? Is that what the old behaviour 
would have been?

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
##########
@@ -62,21 +61,25 @@ private[history] class ApplicationCache(
 
     /**
      * Removal event notifies the provider to detach the UI.
-     * @param rm removal notification
+     * @param key removal key
+     * @param value removal value
      */
-    override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+    override def onRemoval(key: CacheKey, value: CacheEntry,
+        cause: RemovalCause): Unit = {

Review comment:
       Maybe mention RemovalCause in the scaladoc?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to