mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r638460320



##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -878,4 +903,15 @@ private[spark] class BlockManagerInfo(
   def clear(): Unit = {
     _blocks.clear()
   }
+
+  def executorRemovalTs: Option[Long] = _executorRemovalTs
+
+  def isAlive: Boolean = _executorRemovalTs.isEmpty
+
+  def updateExecutorRemovalTs(): Unit = {
+    if (!isAlive) {
+      logWarning(s"executorRemovalTs is already set to 
${_executorRemovalTs.get}")
+    }
+    _executorRemovalTs = Some(System.currentTimeMillis())

Review comment:
       Update `_executorRemovalTs` in `else` ?

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -728,14 +738,28 @@ class BlockManagerMasterEndpoint(
   private def getExecutorEndpointRef(executorId: String): 
Option[RpcEndpointRef] = {
     for (
       blockManagerId <- blockManagerIdByExecutor.get(executorId);
-      info <- blockManagerInfo.get(blockManagerId)
+      info <- blockManagerInfo.get(blockManagerId).filter(_.isAlive)
     ) yield {
       info.storageEndpoint
     }
   }
 
   override def onStop(): Unit = {
     askThreadPool.shutdownNow()
+    blockManagerInfoCleaner.shutdownNow()
+  }
+
+  private def cleanBlockManagerInfo(): Unit = {
+    logDebug("Cleaning blockManagerInfo")
+    val now = System.currentTimeMillis()
+    val (expiredEntries, _) = blockManagerInfo.partition { case (_, bmInfo) =>
+      // bmInfo.executorRemovalTs.get cannot be None when BM is not alive
+      !bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > 
executorTimeoutMs
+    }

Review comment:
       `filter(<>).keys` instead of `partition` ?

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       Does this actually have an impact ?

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       Looking more, @Ngone51 do we have a leak w.r.t 
`blockStatusByShuffleService` in this class ?
   Is bmInfo.removeBlock helping to reduce impact of the leak ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to