holdenk commented on a change in pull request #31102:
URL: https://github.com/apache/spark/pull/31102#discussion_r555961806



##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -166,67 +170,65 @@ private[storage] class BlockManagerDecommissioner(
   private val migrationPeers =
     mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]()
 
-  private lazy val rddBlockMigrationExecutor =
-    ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-rdd")
+  private val rddBlockMigrationExecutor =
+    if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {
+      
Some(ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-rdd"))
+    } else None
 
   private val rddBlockMigrationRunnable = new Runnable {
     val sleepInterval = 
conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
 
     override def run(): Unit = {
-      assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED))
-      while (!stopped && !stoppedRDD && !Thread.interrupted()) {
-        logInfo("Iterating on migrating from the block manager.")
-        // Validate we have peers to migrate to.
-        val peers = bm.getPeers(false)
-        // If we have no peers give up.
-        if (peers.isEmpty) {
-          stopped = true
+      logInfo("Attempting to migrate all RDD blocks")
+      while (!stopped && !stoppedRDD) {
+        // Validate if we have peers to migrate to. Otherwise, give up 
migration.
+        if (bm.getPeers(false).isEmpty) {
+          logWarning("No available peers to receive RDD blocks, stop 
migration.")
           stoppedRDD = true
-        }
-        try {
-          val startTime = System.nanoTime()
-          logDebug("Attempting to replicate all cached RDD blocks")
-          rddBlocksLeft = decommissionRddCacheBlocks()
-          lastRDDMigrationTime = startTime
-          logInfo("Attempt to replicate all cached blocks done")
-          logInfo(s"Waiting for ${sleepInterval} before refreshing 
migrations.")
-          Thread.sleep(sleepInterval)
-        } catch {
-          case e: InterruptedException =>
-            logInfo("Interrupted during RDD migration, stopping")
-            stoppedRDD = true
-          case NonFatal(e) =>
-            logError("Error occurred replicating RDD for block manager 
decommissioning.",
-              e)
-            stoppedRDD = true
+        } else {
+          try {
+            val startTime = System.nanoTime()
+            logInfo("Attempting to migrate all cached RDD blocks")
+            rddBlocksLeft = decommissionRddCacheBlocks()
+            lastRDDMigrationTime = startTime
+            logInfo(s"Finished current round RDD blocks migration, " +
+              s"waiting for ${sleepInterval}ms before the next round 
migration.")
+            Thread.sleep(sleepInterval)
+          } catch {
+            case _: InterruptedException if stopped =>
+              logInfo("Stop RDD blocks migration.")
+            case NonFatal(e) =>
+              logError("Error occurred during RDD blocks migration.", e)
+              stoppedRDD = true
+          }
         }
       }
     }
   }
 
-  private lazy val shuffleBlockMigrationRefreshExecutor =
-    
ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-shuffle")
+  private val shuffleBlockMigrationRefreshExecutor =
+    if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
+      
Some(ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-shuffle"))
+    } else None
 
   private val shuffleBlockMigrationRefreshRunnable = new Runnable {
     val sleepInterval = 
conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
 
     override def run(): Unit = {
-      assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))
-      while (!stopped && !stoppedShuffle && !Thread.interrupted()) {
+      logInfo("Attempting to migrate all shuffle blocks")
+      while (!stopped && !stoppedShuffle) {
         try {
-          logDebug("Attempting to replicate all shuffle blocks")
           val startTime = System.nanoTime()
-          shuffleBlocksLeft = refreshOffloadingShuffleBlocks()
+          shuffleBlocksLeft = refreshMigratableShuffleBlocks()
           lastShuffleMigrationTime = startTime
-          logInfo("Done starting workers to migrate shuffle blocks")
+          logInfo(s"Finished current round refreshing migratable shuffle 
blocks, " +
+            s"waiting for ${sleepInterval}ms before the next round 
refreshing.")
           Thread.sleep(sleepInterval)
         } catch {
-          case e: InterruptedException =>
-            logInfo("Interrupted during migration, will not refresh 
migrations.")
-            stoppedShuffle = true
+          case _: InterruptedException if stopped =>

Review comment:
       Ok so the point of checking the keepRunning flag is to only log when it 
isn't a Spark commanded shutdown, gotcha.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to