Ngone51 commented on a change in pull request #29211:
URL: https://github.com/apache/spark/pull/29211#discussion_r466251813



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -442,6 +442,16 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           case e: Exception =>
             logError(s"Unexpected error during decommissioning ${e.toString}", 
e)
         }
+        // Send decommission message to the executor, this may be a duplicate 
since the executor

Review comment:
       So we need to prevent duplicate `DecommissionSelf` at the executor side? 
For now, I don't see we handle duplicate `DecommissionSelf` and it may create 
duplicate threads as a result.

##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +282,59 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured 
migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we 
want to minimize the
+      // overhead for executors that are not in decommissioning state as 
overall that will be
+      // more of the executors. For example, this will not catch a block which 
is already in
+      // the process of being put from a remote executor before migration 
starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking 
structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread("wait-for-blocks-to-migrate") {
+        override def run(): Unit = {
+          var lastTaskRunningTime = System.nanoTime()
+          val sleep_time = 1000 // 1s
+
+          while (true) {
+            logInfo("Checking to see if we can shutdown.")
+            Thread.sleep(sleep_time)
+            if (executor == null || executor.numRunningTasks == 0) {
+              if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+                logInfo("No running tasks, checking migrations")
+                val (migrationTime, allBlocksMigrated) = 
env.blockManager.lastMigrationInfo()
+                // We can only trust allBlocksMigrated boolean value if there 
were no tasks running
+                // since the start of computing it.
+                if (allBlocksMigrated && (migrationTime > 
lastTaskRunningTime)) {
+                  logInfo("No running tasks, all blocks migrated, stopping.")
+                  exitExecutor(0, "Finished decommissioning", notifyDriver = 
true)
+                } else {
+                  logInfo("All blocks not yet migrated.")
+                }
+              } else {
+                logInfo("No running tasks, no block migration configured, 
stopping.")
+                exitExecutor(0, "Finished decommissioning", notifyDriver = 
true)

Review comment:
       I'm wondering if we need to wait for tasks to finish if storage 
decommission is disabled. I mean, for a shuffle map task and result task with 
indirect result, their outputs still base on blocks. As a result, we'd spend 
time waiting for them to finish but get nothing good in return.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to