mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501706435


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides 
interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, 
disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is 
usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is 
loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   I am still looking, but I believe this might be due to a race in 
`reportHeartBeat` in `Executor`.
   Would like more eyes on it though.
   
   My hypothesis is, something like this is happening:
   
   * scheduler is [started 
here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/SparkContext.scala#L611).
     * In local mode, this directly ends up creating an `Executor`
       * TaskSchedulerImpl.start -> LocalSchedulerBackend.start -> new 
LocalEndpoint -> new Executor
   * new Executor immediately starts a `heartbeater` - and the initial delay 
for the thread to start is random, after which it will periodically run.
   * `Executor.reportHeartBeat`, which is invoked, sends a `Heartbeat`.
   * If this `Heartbeat` is received before driver thread reaches 
[blockmanager.initialize](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/SparkContext.scala#L632),
 it will end up returning `HeartbeatResponse` with `reregisterBlockManager = 
true` (since registeration of local block manager happens within `initialize`).
   
   And as part of this, we end up initializing the lazy val memoryManager in 
BlockManager.
   
   What I am unsure of is, this would normally have triggered an NPE anyway 
during reregisteration if this path is hit (without this PR) - not sure if that 
is getting ignored (since blockmanager id is null).
   
   (I did not find driver logs for the test run - that would have helped 
validate this hypothesis - we would have seen "re-registering with master" for 
this test, before the NPE is thrown)
   
   Thoughts ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to