Ngone51 commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1039080517


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(
 
     val time = System.currentTimeMillis()
     executorIdToLocalDirs.put(id.executorId, localDirs)
-    if (!blockManagerInfo.contains(id)) {
+    // SPARK-41360: For the block manager re-registration, we should only 
allow it when
+    // the executor is recognized as active by the scheduler backend. 
Otherwise, this kind
+    // of re-registration from the terminating/stopped executor is meaningless 
and harmful.
+    lazy val isExecutorAlive =
+      
driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
+    if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {

Review Comment:
   > If this is an issue only for terminating executors, we can detect that in 
executor side and propagate it in the registration request right ? 
   
   What do you mean? The terminating executor can detect itself as being 
terminating?
   
   Actually, not only the terminating executors,  executors lost due to long GC 
or executors who failed to be killed by the driver (where the executor could be 
an orphan rather than terminated ) are also applied here as long as the case is 
considered executor lost by the driver.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to