sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640310177



##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -728,15 +739,34 @@ class BlockManagerMasterEndpoint(
   private def getExecutorEndpointRef(executorId: String): 
Option[RpcEndpointRef] = {
     for (
       blockManagerId <- blockManagerIdByExecutor.get(executorId);
-      info <- blockManagerInfo.get(blockManagerId)
+      info <- aliveBlockManagerInfo(blockManagerId)
     ) yield {
       info.storageEndpoint
     }
   }
 
   override def onStop(): Unit = {
     askThreadPool.shutdownNow()
+    blockManagerInfoCleaner.shutdownNow()
+  }
+
+  private def cleanBlockManagerInfo(): Unit = {
+    logDebug("Cleaning blockManagerInfo")
+    val now = System.currentTimeMillis()
+    val expiredBmIds = blockManagerInfo.filter { case (_, bmInfo) =>
+      // bmInfo.executorRemovalTs.get cannot be None when BM is not alive
+      !bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > 
executorTimeoutMs
+    }.keys
+    expiredBmIds.foreach { bmId =>
+      logInfo(s"Cleaning expired $bmId from blockManagerInfo")
+      blockManagerInfo.remove(bmId)
+    }
   }
+
+  @inline private def aliveBlockManagerInfo(bmId: BlockManagerId): 
Option[BlockManagerInfo] =
+    blockManagerInfo.get(bmId).filter(_.isAlive)
+
+  @inline private def allAliveBlockManagerInfos() = 
blockManagerInfo.values.filter(_.isAlive)

Review comment:
       Done.

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -336,13 +345,16 @@ class BlockManagerMasterEndpoint(
   private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
     val info = blockManagerInfo(blockManagerId)
 
+    // Not removing info from the blockManagerInfo map, but only updating the 
removal timestamp of

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to