sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r617990361



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, 
BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed 
executors to deny their
+    // block manager registration while their StopExecutor message is 
in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache 
taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       I found a code path which will break @attilapiros's proposed solution 
since the following holds true.
   
   > However, we will have to abstract blockManagerInfo: 
mutable.Map[BlockManagerId, BlockManagerInfo].
   Currently, ...
   
   Let's consider the following set of events:
   - A  `CoarseGrainedClusterMessage.RemoveExecutor` is issued
   - `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on 
`executorEndpoint` and then invokes `executorLost` on `TaskSchedulerImpl`
   
https://github.com/apache/spark/blob/e60939591336a2c38e5b7a6a36f45b8614b46fe5/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L430
   - TaskSchedulerImpl in its `executorLost` invokes `dagScheduler.executorLost`
   
https://github.com/apache/spark/blob/e60939591336a2c38e5b7a6a36f45b8614b46fe5/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L998-L1001
   - `DAGScheduler` while handling executorLost invokes 
`removeExecutorAndUnregisterOutputs` which internally invokes 
`blockManagerMaster.removeExecutor(execId)` (as you pointed out in your 
[comment 
below](https://github.com/apache/spark/pull/32114#issuecomment-819255421)) 
which further clears `blockManagerId` from `blockManagerInfo` in 
`BlockManagerMasterEndpoint`
   
https://github.com/apache/spark/blob/e60939591336a2c38e5b7a6a36f45b8614b46fe5/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2043
   - The Executor has not yet processed `StopExecutor`
   - Executor reports its Heartbeat
   - `HeartbeatReceiver` invokes `scheduler.executorHeartbeatReceived` to check 
if the BlockManager on the executor requires re-registration
   
https://github.com/apache/spark/blob/e60939591336a2c38e5b7a6a36f45b8614b46fe5/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala#L137
   - `TashSchedulerImpl` delegates this to `DAGScheduler`
   
https://github.com/apache/spark/blob/e60939591336a2c38e5b7a6a36f45b8614b46fe5/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L858
   - `DAGScheduler` asks `BlockManagerMasterHeartbeatEndpoint` if it knows the 
BlockManager
   
https://github.com/apache/spark/blob/e60939591336a2c38e5b7a6a36f45b8614b46fe5/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L300
   - `BlockManagerMasterHeartbeatEndpoint` returns false since it cannot find 
`blockManagerId` in `BlockManagerInfo` indicating the blockManager should 
re-register
   
https://github.com/apache/spark/blob/e60939591336a2c38e5b7a6a36f45b8614b46fe5/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterHeartbeatEndpoint.scala#L51
   - BlockManager re-registers which publishes the 
`SparkListenerBlockManagerAdded` causing the inconsistent book-keeping in 
`AppStatusStore`
   - Executor processes `StopExecutor` and exits.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to