mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501706435


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides 
interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, 
disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is 
usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is 
loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   I am still looking, but I believe this might be due to a race in 
`reportHeartBeat` in `Executor`.
   Would like more eyes on it though.
   
   My hypothesis is, something like this is happening:
   
   * scheduler is [started 
here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/SparkContext.scala#L611).
     * In local mode, this directly ends up creating an `Executor`
       * TaskSchedulerImpl.start -> LocalSchedulerBackend.start -> new 
LocalEndpoint -> new Executor
   * new Executor immediately starts a `heartbeater` - and the initial delay 
for the thread to start is random, after which it will periodically run.
   * `Executor.reportHeartBeat`, which is invoked, sends a `Heartbeat`.
   * If this `Heartbeat` is received before driver thread reaches 
[blockmanager.initialize](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/SparkContext.scala#L632),
 it will end up returning `HeartbeatResponse` with `reregisterBlockManager = 
true` (since registeration of local block manager happens within `initialize`).
   * This will trigger blockmanager reregister 
[here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/executor/Executor.scala#L1241).
   
   My hypothesis is that, this will result in an NPE - note, for both this PR 
(since `memoryManager` is null), and for current master (since `blockManagerId` 
is null) - when this condition is hit. But that will get ignored since it is 
part of heartbeat (runs within `logUncaughtExceptions`).
   
   But the side effect now is that, we end up initializing the lazy val 
memoryManager in BlockManager 
([here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L668)).
   
   
   
   (I did not find driver logs for the test run - that would have helped 
validate this hypothesis - we would have seen "re-registering with master" for 
this test, before the NPE is thrown)
   
   Thoughts ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to