cloud-fan commented on code in PR #54136:
URL: https://github.com/apache/spark/pull/54136#discussion_r2885109415


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -314,6 +318,32 @@ private[spark] class BlockManager(
     shuffleManager.shuffleBlockResolver.asInstanceOf[MigratableResolver]
   }
 
+  // Timeout waiting for ShuffleManager initialization when receiving shuffle 
migration requests
+  private val shuffleManagerInitWaitingTimeoutMs =
+    conf.get(config.STORAGE_SHUFFLE_MANAGER_INIT_WAITING_TIMEOUT)
+
+  /**
+   * Wait for the ShuffleManager to be initialized before handling shuffle 
migration requests.
+   * This is necessary because BlockManager is registered with the driver 
before ShuffleManager
+   * is initialized in Executor, which could cause NPE if shuffle migration 
requests are received
+   * before ShuffleManager is ready.
+   *
+   * @throws ShuffleManagerNotInitializedException if ShuffleManager is not 
initialized within
+   *         the configured timeout
+   */
+  private def waitForShuffleManagerInit(): Unit = {
+    if (!SparkEnv.get.isShuffleManagerInitialized) {
+      logInfo(log"Waiting for ShuffleManager initialization before handling 
shuffle operations")
+
+      if 
(!SparkEnv.get.waitForShuffleManagerInit(shuffleManagerInitWaitingTimeoutMs)) {
+        logWarning(log"ShuffleManager not initialized within " +
+          log"${MDC(TIMEOUT, shuffleManagerInitWaitingTimeoutMs)}ms " +
+          log"while handling shuffle operations}")

Review Comment:
   nit: Stray `}` at the end of this string — `operations}"` should be 
`operations"`. The log output currently produces:
   ```
   ShuffleManager not initialized within 30000ms while handling shuffle 
operations}
   ```



##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -155,33 +161,44 @@ private[storage] class BlockManagerDecommissioner(
                 } else if (e.getCause != null && e.getCause.getMessage != null
                   && e.getCause.getMessage
                   .contains(blockSavedOnDecommissionedBlockManagerException)) {
-                  isTargetDecommissioned = true
+                  // Target is decommissioned, don't penalize the block.
                   keepRunning = false
+                  needRetry = true
+                  newRetryCount = retryCount
+                } else if (e.getCause != null && e.getCause.getMessage != null
+                  && e.getCause.getMessage
+                  .contains(shuffleManagerNotInitializedException)) {
+                  // Target executor's ShuffleManager is not yet initialized.
+                  // This is transient, so requeue without incrementing 
failure count
+                  // and keep the migration thread running for this peer.
+                  logWarning(log"Target executor's ShuffleManager not 
initialized for " +
+                    log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}. Will 
retry.")
+                  needRetry = true
+                  newRetryCount = retryCount

Review Comment:
   Since `newRetryCount = retryCount` (never incremented), `allowRetry` will 
always accept this block back into the queue. If the target executor's 
ShuffleManager permanently fails to initialize, this creates an infinite retry 
loop — each attempt involves a network round-trip with a 30s server-side wait, 
and the migration thread for this peer is stuck forever.
   
   In practice the risk is low (the target usually initializes or gets killed, 
producing `IOException` which breaks the loop), but there's no theoretical 
termination guarantee.
   
   Consider either:
   - Still incrementing the retry count (perhaps at a slower rate or with a 
separate higher cap)
   - Adding a small sleep before retry on the client side, as @Ngone51 also 
suggested



##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -224,6 +248,8 @@ class SparkEnv (
     Preconditions.checkState(null == _shuffleManager,
       "Shuffle manager already initialized to %s", _shuffleManager)
     _shuffleManager = ShuffleManager.create(conf, executorId == 
SparkContext.DRIVER_IDENTIFIER)
+    // Signal that the ShuffleManager has been initialized
+    shuffleManagerInitLatch.countDown()

Review Comment:
   If `ShuffleManager.create()` throws, the latch is never counted down. Any 
thread waiting in `waitForShuffleManagerInit` will block for the full 30s 
timeout before getting `ShuffleManagerNotInitializedException`, even though the 
executor is already failing.
   
   Consider using `try`/`finally` so waiting threads fail fast:
   ```scala
   try {
     _shuffleManager = ShuffleManager.create(conf, executorId == 
SparkContext.DRIVER_IDENTIFIER)
   } finally {
     shuffleManagerInitLatch.countDown()
   }
   ```
   The caller in `BlockManager.waitForShuffleManagerInit` already handles the 
null-after-latch case via the lazy val resolution path.



##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -196,7 +196,11 @@ private[spark] class BlockManager(
   // We initialize the ShuffleManager later in SparkContext and Executor, to 
allow
   // user jars to define custom ShuffleManagers, as such `_shuffleManager` 
will be null here
   // (except for tests) and we ask for the instance from the SparkEnv.
-  private lazy val shuffleManager = 
Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)
+  private lazy val shuffleManager = {
+    // Wait for ShuffleManager to be initialized before handling shuffle 
operations.
+    waitForShuffleManagerInit()

Review Comment:
   `waitForShuffleManagerInit()` applies to both driver and executor 
`BlockManager` instances. On the driver, `_shuffleManager` is also `null` in 
the constructor (see `SparkEnv.create`). While the driver calls 
`initializeShuffleManager()` before any shuffle operations in normal flow, this 
adds a 30s hang risk if code ordering changes in the future.
   
   Evidence this matters: the test suite had to add 
`when(env.isShuffleManagerInitialized).thenReturn(true)` in `beforeEach` to 
work around this.
   
   Consider guarding to only apply on the executor side:
   ```scala
   private lazy val shuffleManager = {
     if (_shuffleManager == null && !isDriver) {
       waitForShuffleManagerInit()
     }
     Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to