holdenk commented on a change in pull request #28817:
URL: https://github.com/apache/spark/pull/28817#discussion_r439781237
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -258,26 +262,65 @@ private[spark] class CoarseGrainedExecutorBackend(
System.exit(code)
}
- private def decommissionSelf(): Boolean = {
- logInfo("Decommissioning self w/sync")
- try {
- decommissioned = true
- // Tell master we are are decommissioned so it stops trying to schedule
us
- if (driver.nonEmpty) {
- driver.get.askSync[Boolean](DecommissionExecutor(executorId))
+ private var previousAllBlocksMigrated = false
+ private def shutdownIfDone(): Unit = {
+ val numRunningTasks = executor.numRunningTasks
+ logInfo(s"Checking to see if we can shutdown have ${numRunningTasks}
running tasks.")
+ if (executor.numRunningTasks == 0) {
+ if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+ val allBlocksMigrated = env.blockManager.decommissionManager match {
+ case Some(m) => m.allBlocksMigrated
+ case None => false // We haven't started migrations yet.
+ }
+ if (allBlocksMigrated && previousAllBlocksMigrated) {
+ logInfo("No running tasks, all blocks migrated, stopping.")
+ exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+ }
+ previousAllBlocksMigrated = allBlocksMigrated
} else {
- logError("No driver to message decommissioning.")
+ logInfo("No running tasks, no block migration configured, stopping.")
+ exitExecutor(0, "Finished decommissioning", notifyDriver = true)
}
- if (executor != null) {
- executor.decommission()
+ } else {
+ // If there's a running task it could store blocks.
Review comment:
If a task is scheduled before we are asked to decom. You can verify this
is covered by taking the logic out and watching the tests fail :) (There's an
ungly thread sleep in the tests to make this possible).
Since the block migrations are not atomic, I do think we need the 2x logic,
unfortunately, think of this situation:
1) Task launches on executor
2) Executor asked to decomission
3) All blocks currently stored on executor are migrated
4) Task stores a block
5) We check numTasks & see all blocks are migrated, then exit without
migrating the block stored by task #4.
Now that being said that's probably a corner case, and arguably not super
important since we're really only doing best effort, but I think for the
overhead of one extra boolean it's worth it to cover this corner case.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]