holdenk commented on a change in pull request #29211:
URL: https://github.com/apache/spark/pull/29211#discussion_r461800882



##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +282,52 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured 
migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we 
want to minimize the
+      // overhead for executors that are not in decommissioning state as 
overall that will be
+      // more of the executors. For example, this will not catch a block which 
is already in
+      // the process of being put from a remote executor before migration 
starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking 
structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread("wait-for-blocks-to-migrate") {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {

Review comment:
       So the executor would shut down (we already test for that), it's just 
the RPC call would have been blocking rather than non-blocking.

##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +282,52 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured 
migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we 
want to minimize the
+      // overhead for executors that are not in decommissioning state as 
overall that will be
+      // more of the executors. For example, this will not catch a block which 
is already in
+      // the process of being put from a remote executor before migration 
starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking 
structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread("wait-for-blocks-to-migrate") {
+        var lastTaskRunningTime = System.nanoTime()
+        val sleep_time = 1000 // 1s
+
+        while (true) {

Review comment:
       Yeah. I'll double check by disabling the shutdown and seeing if the test 
fails in CI while I'm doing my other adventures today. (Then I'll either 
re-enable the shutdown code or work on getting it to fail first then re-enable 
it).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to