holdenk commented on a change in pull request #29211:
URL: https://github.com/apache/spark/pull/29211#discussion_r459800980



##########
File path: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
##########
@@ -668,7 +668,7 @@ private[deploy] class Worker(
       finishedApps += id
       maybeCleanupApplication(id)
 
-    case DecommissionSelf =>
+    case WorkerDecommission(_, _) =>

Review comment:
       I don't have strong preferences (although this doesn't have `ed` at the 
end so I don't think it's implying a state).

##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured 
migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we 
want to minimize the
+      // overhead for executors that are not in decommissioning state as 
overall that will be
+      // more of the executors. For example, this will not catch a block which 
is already in
+      // the process of being put from a remote executor before migration 
starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking 
structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {

Review comment:
       For sure, good call.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1822,6 +1822,14 @@ private[spark] class BlockManager(
     }
   }
 
+  /*
+   *  Returns the last migration time and a boolean for if all blocks have 
been migrated.

Review comment:
       sounds good

##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured 
migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we 
want to minimize the
+      // overhead for executors that are not in decommissioning state as 
overall that will be
+      // more of the executors. For example, this will not catch a block which 
is already in
+      // the process of being put from a remote executor before migration 
starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking 
structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")
+          if (executor == null || executor.numRunningTasks == 0) {
+            if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+              logInfo("No running tasks, checking migrations")
+              val allBlocksMigrated = env.blockManager.lastMigrationInfo()
+              // We can only trust allBlocksMigrated boolean value if there 
were no tasks running
+              // since the start of computing it.
+              if (allBlocksMigrated._2 &&
+                (allBlocksMigrated._1 > lastTaskRunningTime)) {
+                logInfo("No running tasks, all blocks migrated, stopping.")
+                exitExecutor(0, "Finished decommissioning", notifyDriver = 
true)

Review comment:
       So exitExecutor calls System.exit, there isn't going to be a loop for us 
to break out of.

##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured 
migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we 
want to minimize the
+      // overhead for executors that are not in decommissioning state as 
overall that will be
+      // more of the executors. For example, this will not catch a block which 
is already in
+      // the process of being put from a remote executor before migration 
starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking 
structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")
+          if (executor == null || executor.numRunningTasks == 0) {
+            if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+              logInfo("No running tasks, checking migrations")
+              val allBlocksMigrated = env.blockManager.lastMigrationInfo()
+              // We can only trust allBlocksMigrated boolean value if there 
were no tasks running
+              // since the start of computing it.
+              if (allBlocksMigrated._2 &&
+                (allBlocksMigrated._1 > lastTaskRunningTime)) {
+                logInfo("No running tasks, all blocks migrated, stopping.")
+                exitExecutor(0, "Finished decommissioning", notifyDriver = 
true)
+              } else {
+                logInfo("All blocks not yet migrated.")
+              }
+            } else {
+              logInfo("No running tasks, no block migration configured, 
stopping.")
+              exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+            }
+            Thread.sleep(sleep_time)

Review comment:
       Good point

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -115,12 +124,19 @@ private[storage] class BlockManagerDecommissioner(
   // Shuffles which are either in queue for migrations or migrated
   private val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]()
 
+  // Shuffles which have migrated. This used to know when we are "done", being 
done can change
+  // if a new shuffle file is created by a running task.
+  private val numMigratedShuffles = new AtomicInteger(0)
+
   // Shuffles which are queued for migration & number of retries so far.
+  // Visible in storage for testing.
   private[storage] val shufflesToMigrate =
     new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]()
 
   // Set if we encounter an error attempting to migrate and stop.
   @volatile private var stopped = false
+  @volatile private var stoppedRDD = false

Review comment:
       Nice, sure.

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -327,4 +357,28 @@ private[storage] class BlockManagerDecommissioner(
     }
     logInfo("Stopped storage decommissioner")
   }
+
+  /*
+   *  Returns the last migration time and a boolean for if all blocks have 
been migrated.
+   *  If there are any tasks running since that time the boolean may be 
incorrect.
+   */
+  private[storage] def lastMigrationInfo(): (Long, Boolean) = {
+    if (stopped || (stoppedRDD && stoppedShuffle)) {
+      (System.nanoTime(), true)

Review comment:
       So if we are not doing any migrations all blocks are migrated as of 
right now.

##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured 
migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we 
want to minimize the
+      // overhead for executors that are not in decommissioning state as 
overall that will be
+      // more of the executors. For example, this will not catch a block which 
is already in
+      // the process of being put from a remote executor before migration 
starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking 
structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")

Review comment:
       I don't want to bound the number of loops because I think there are very 
legitimate cases where this could take a super long time and we just want to 
let it go. If theres a hard limit it can probably be set by the external 
cluster manager which will send a non-interruptable signal.

##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured 
migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we 
want to minimize the
+      // overhead for executors that are not in decommissioning state as 
overall that will be
+      // more of the executors. For example, this will not catch a block which 
is already in
+      // the process of being put from a remote executor before migration 
starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking 
structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")
+          if (executor == null || executor.numRunningTasks == 0) {
+            if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+              logInfo("No running tasks, checking migrations")
+              val allBlocksMigrated = env.blockManager.lastMigrationInfo()

Review comment:
       Sore

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -442,6 +442,15 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           case e: Exception =>
             logError(s"Unexpected error during decommissioning ${e.toString}", 
e)
         }
+        // Send decommission message to the executor (it could have originated 
on the executor
+        // but not necessarily.

Review comment:
       So it would not always be a duplicate. It _may_ be a duplicate message 
but it may not (if it was always a duplicate I'd just drop it).

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -59,7 +59,7 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
       .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle)
       // Just replicate blocks as fast as we can during testing, there isn't 
another
       // workload we need to worry about.
-      .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
+      .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)

Review comment:
       Just reduced the log spam in debugging this test but still allowed the 
test to complete quickly.

##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured 
migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we 
want to minimize the
+      // overhead for executors that are not in decommissioning state as 
overall that will be
+      // more of the executors. For example, this will not catch a block which 
is already in
+      // the process of being put from a remote executor before migration 
starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking 
structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread() {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {
+          logInfo("Checking to see if we can shutdown.")
+          if (executor == null || executor.numRunningTasks == 0) {
+            if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+              logInfo("No running tasks, checking migrations")
+              val allBlocksMigrated = env.blockManager.lastMigrationInfo()
+              // We can only trust allBlocksMigrated boolean value if there 
were no tasks running
+              // since the start of computing it.
+              if (allBlocksMigrated._2 &&
+                (allBlocksMigrated._1 > lastTaskRunningTime)) {
+                logInfo("No running tasks, all blocks migrated, stopping.")
+                exitExecutor(0, "Finished decommissioning", notifyDriver = 
true)
+              } else {
+                logInfo("All blocks not yet migrated.")

Review comment:
       We could, although I'd rather keep that logging inside of 
Decommissioner. I'm not sure we need it, personally I'd prefer to add the 
logging statement if we find ourselves wishing we had it during development.

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -20,7 +20,7 @@ package org.apache.spark.storage
 import scala.concurrent.duration._
 
 import org.mockito.{ArgumentMatchers => mc}
-import org.mockito.Mockito.{mock, times, verify, when}
+import org.mockito.Mockito.{atLeast => least, mock, times, verify, when}

Review comment:
       Theres something else which is somehow imported and named atLeast (I 
suspect from ScalaTest?) and I needed to rename it or the compiler got confused.

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -41,6 +42,12 @@ private[storage] class BlockManagerDecommissioner(
   private val maxReplicationFailuresForDecommission =
     conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
 
+  // Used for tracking if our migrations are complete.
+  @volatile private var lastRDDMigrationTime: Long = 0
+  @volatile private var lastShuffleMigrationTime: Long = 0
+  @volatile private var rddBlocksLeft: Boolean = true
+  @volatile private var shuffleBlocksLeft: Boolean = true
+

Review comment:
       We use this to determine if the blocks are migrated and these can be 
changed from seperate threads so we'd have to switch to synchronized.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1822,6 +1822,14 @@ private[spark] class BlockManager(
     }
   }
 
+  /*
+   *  Returns the last migration time and a boolean for if all blocks have 
been migrated.
+   *  If there are any tasks running since that time the boolean may be 
incorrect.
+   */
+  private[spark] def lastMigrationInfo(): (Long, Boolean) = {

Review comment:
       I'd really rather not. I think the level above us should only be 
concerned if the blocks are done migrating or not. If we expose that up a level 
the logic is going to get even more complicated cross-class boundaries.

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
##########
@@ -136,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages {
 
   // The message to check if `CoarseGrainedSchedulerBackend` thinks the 
executor is alive or not.
   case class IsExecutorAlive(executorId: String) extends 
CoarseGrainedClusterMessage
+
+  // Used to ask an executor to decommission it's self.

Review comment:
       In the future it could be sent from somewhere else though to (especially 
when we look at YARN like things).

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -91,10 +98,12 @@ private[storage] class BlockManagerDecommissioner(
                     null)// class tag, we don't need for shuffle
                   logDebug(s"Migrated sub block ${blockId}")
                 }
-                logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+                logDebug(s"Migrated ${shuffleBlockInfo} to ${peer}")
               } else {
                 logError(s"Skipping block ${shuffleBlockInfo} because it has 
failed ${retryCount}")
               }
+              logInfo(s"Migrated ${shuffleBlockInfo}")

Review comment:
       Actually lets drop this message.

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -88,5 +92,6 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite 
with Matchers {
     } finally {
         bmDecomManager.stop()
     }
+    bmDecomManager.stop()

Review comment:
       good point.

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -38,6 +38,9 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite 
with Matchers {
   private val sparkConf = new SparkConf(false)
     .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
     .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, true)
+    // Just replicate blocks as fast as we can during testing, there isn't 
another
+    // workload we need to worry about.
+    .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)

Review comment:
       Well compared the the default interval. What about if we say "quickly"?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to