attilapiros commented on a change in pull request #31102:
URL: https://github.com/apache/spark/pull/31102#discussion_r560859355



##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -66,86 +66,102 @@ private[storage] class BlockManagerDecommissioner(
    * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
    */
   private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
-    @volatile var running = true
+    @volatile var keepRunning = true
+
+    private def allowRetry(shuffleBlock: ShuffleBlockInfo, failureNum: Int): 
Boolean = {
+      if (failureNum < maxReplicationFailuresForDecommission) {
+        logInfo(s"Add $shuffleBlock back to migration queue for " +
+          s"retry ($failureNum / $maxReplicationFailuresForDecommission)")
+        // The block needs to retry so we should not mark it as finished
+        shufflesToMigrate.add((shuffleBlock, failureNum))
+      } else {
+        logWarning(s"Give up migrating $shuffleBlock since it's been " +
+          s"failed for $maxReplicationFailuresForDecommission times")
+        false
+      }
+    }
+
+    private def nextShuffleBlockToMigrate(): (ShuffleBlockInfo, Int) = {
+      while (!Thread.currentThread().isInterrupted) {
+        Option(shufflesToMigrate.poll()) match {
+          case Some(head) => return head
+          // Nothing to do right now, but maybe a transfer will fail or a new 
block
+          // will finish being committed.
+          case None => Thread.sleep(1000)
+        }
+      }
+      throw new InterruptedException()
+    }
+
     override def run(): Unit = {
-      var migrating: Option[(ShuffleBlockInfo, Int)] = None
-      logInfo(s"Starting migration thread for ${peer}")
+      logInfo(s"Starting shuffle block migration thread for $peer")
       // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
-      try {
-        while (running && !Thread.interrupted()) {
-          migrating = Option(shufflesToMigrate.poll())
-          migrating match {
-            case None =>
-              logDebug("Nothing to migrate")
-              // Nothing to do right now, but maybe a transfer will fail or a 
new block
-              // will finish being committed.
-              val SLEEP_TIME_SECS = 1
-              Thread.sleep(SLEEP_TIME_SECS * 1000L)
-            case Some((shuffleBlockInfo, retryCount)) =>
-              if (retryCount < maxReplicationFailuresForDecommission) {
-                val blocks = 
bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
-                if (blocks.isEmpty) {
-                  logInfo(s"Ignore empty shuffle block $shuffleBlockInfo")
+      while (keepRunning) {
+        try {
+          val (shuffleBlockInfo, retryCount) = nextShuffleBlockToMigrate()
+          val blocks = 
bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+          if (blocks.size < 2) {
+            logInfo(s"Ignore empty shuffle block $shuffleBlockInfo")
+          } else {
+            logInfo(s"Got migration sub-blocks $blocks. Trying to migrate 
$shuffleBlockInfo " +
+              s"to $peer ($retryCount / 
$maxReplicationFailuresForDecommission)")
+            // Migrate the components of the blocks.
+            try {
+              blocks.foreach { case (blockId, buffer) =>
+                logDebug(s"Migrating sub-block ${blockId}")
+                bm.blockTransferService.uploadBlockSync(
+                  peer.host,
+                  peer.port,
+                  peer.executorId,
+                  blockId,
+                  buffer,
+                  StorageLevel.DISK_ONLY,
+                  null) // class tag, we don't need for shuffle
+                logDebug(s"Migrated sub-block $blockId")
+              }
+              logInfo(s"Migrated $shuffleBlockInfo to $peer")
+            } catch {
+              case e: IOException =>
+                // If a block got deleted before netty opened the file handle, 
then trying to
+                // load the blocks now will fail. This is most likely to occur 
if we start
+                // migrating blocks and then the shuffle TTL cleaner kicks in. 
However this
+                // could also happen with manually managed shuffles or a GC 
event on the
+                // driver a no longer referenced RDD with shuffle files.
+                if 
(bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo).size < 2) {
+                  logWarning(s"Skipping block $shuffleBlockInfo, block 
deleted.")
+                } else if (fallbackStorage.isDefined) {
+                  fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
                 } else {
-                  logInfo(s"Got migration sub-blocks ${blocks}")
-                  logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer} " +
-                    s"($retryCount / $maxReplicationFailuresForDecommission)")
-
-                  // Migrate the components of the blocks.
-                  try {
-                    blocks.foreach { case (blockId, buffer) =>
-                      logDebug(s"Migrating sub-block ${blockId}")
-                      bm.blockTransferService.uploadBlockSync(
-                        peer.host,
-                        peer.port,
-                        peer.executorId,
-                        blockId,
-                        buffer,
-                        StorageLevel.DISK_ONLY,
-                        null) // class tag, we don't need for shuffle
-                      logDebug(s"Migrated sub block ${blockId}")
-                    }
-                    logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
-                  } catch {
-                    case e: IOException =>
-                      // If a block got deleted before netty opened the file 
handle, then trying to
-                      // load the blocks now will fail. This is most likely to 
occur if we start
-                      // migrating blocks and then the shuffle TTL cleaner 
kicks in. However this
-                      // could also happen with manually managed shuffles or a 
GC event on the
-                      // driver a no longer referenced RDD with shuffle files.
-                      if 
(bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo).isEmpty) {
-                        logWarning(s"Skipping block ${shuffleBlockInfo}, block 
deleted.")
-                      } else if (fallbackStorage.isDefined) {
-                        fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
-                      } else {
-                        throw e
-                      }
-                  }
+                  logError(s"Error occurred during migrating 
$shuffleBlockInfo", e)
+                  keepRunning = false
                 }
-              } else {
-                logError(s"Skipping block ${shuffleBlockInfo} because it has 
failed ${retryCount}")
-              }
+              case e: Exception =>
+                logError(s"Error occurred during migrating $shuffleBlockInfo", 
e)
+                keepRunning = false
+            }
+          }
+          if (keepRunning) {
+            numMigratedShuffles.incrementAndGet()
+          } else {
+            logWarning(s"Stop migrating shuffle blocks to $peer")
+            // Do not mark the block as migrated if it still needs retry
+            if (!allowRetry(shuffleBlockInfo, retryCount + 1)) {
               numMigratedShuffles.incrementAndGet()
+            }
           }
+        } catch {
+          case _: InterruptedException if !keepRunning =>

Review comment:
       In case of `InterruptedException` we should not check the `keepRunning` 
flag but to stop the thread as soon as possible (especially as it is fatal 
exception so the next case `NonFatal(e)` won't match for it). Of course we can 
log a line that we are stopped without requested by the `keepRunning` flag.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to