Github user harishreedharan commented on a diff in the pull request:
https://github.com/apache/spark/pull/6508#discussion_r31385896
--- Diff:
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -443,9 +466,27 @@ private[spark] class ExecutorAllocationManager(
private def onExecutorIdle(executorId: String): Unit = synchronized {
if (executorIds.contains(executorId)) {
if (!removeTimes.contains(executorId) &&
!executorsPendingToRemove.contains(executorId)) {
+
+ val hasCachedBlocks =
+ executorsWithCachedBlocks.contains(executorId) ||
+
executorEndpoints.get(executorId).exists(_.askWithRetry[Boolean](HasCachedBlocks))
+
+ if (hasCachedBlocks) executorsWithCachedBlocks += executorId
--- End diff --
I was wondering about the eviction part. In the case where the timeout is
finite I think we should be ok. The other option is to have an RPC call that
informs us of this (I am not too fond of this, but we might be able to do this
without too many changes by having the BMM call the ExecutorAllocationManager
when the BM reports a status change)
Also I can switch the cached blocks map to a loading cache or something to
keep its size finite.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]