agrawaldevesh commented on a change in pull request #28817:
URL: https://github.com/apache/spark/pull/28817#discussion_r439777235
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -258,26 +262,65 @@ private[spark] class CoarseGrainedExecutorBackend(
System.exit(code)
}
- private def decommissionSelf(): Boolean = {
- logInfo("Decommissioning self w/sync")
- try {
- decommissioned = true
- // Tell master we are are decommissioned so it stops trying to schedule
us
- if (driver.nonEmpty) {
- driver.get.askSync[Boolean](DecommissionExecutor(executorId))
+ private var previousAllBlocksMigrated = false
+ private def shutdownIfDone(): Unit = {
+ val numRunningTasks = executor.numRunningTasks
+ logInfo(s"Checking to see if we can shutdown have ${numRunningTasks}
running tasks.")
+ if (executor.numRunningTasks == 0) {
+ if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+ val allBlocksMigrated = env.blockManager.decommissionManager match {
+ case Some(m) => m.allBlocksMigrated
+ case None => false // We haven't started migrations yet.
+ }
+ if (allBlocksMigrated && previousAllBlocksMigrated) {
+ logInfo("No running tasks, all blocks migrated, stopping.")
+ exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+ }
+ previousAllBlocksMigrated = allBlocksMigrated
} else {
- logError("No driver to message decommissioning.")
+ logInfo("No running tasks, no block migration configured, stopping.")
+ exitExecutor(0, "Finished decommissioning", notifyDriver = true)
}
- if (executor != null) {
- executor.decommission()
+ } else {
+ // If there's a running task it could store blocks.
Review comment:
I think this logic of previousAllBlocks and allBlocks migrated is a bit
confusing. Its not clear why the previous state has to be considered. I wonder
if the following code can make this "history" aspect a bit clearer:
```
val allBlocksMigrated = !env.conf.get(STORAGE_DECOMMISSION_ENABLED) ||
env.blockManager.decommissionManager.map(_.allBlocksMigrated).orElse(false)
val exitCondition = allBlocksMigrated && numRunningTasks == 0
if (exitCondition) { exitExecutor(...) }
```
Also, should we really be checking for numRunningTasks here ? What if some
race condition caused some tasks to be scheduled onto us while we were marked
for decom ?
Finally, should there be a timeout for how much time the executor will stay
alive in decommissioned state ?
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -258,26 +262,65 @@ private[spark] class CoarseGrainedExecutorBackend(
System.exit(code)
}
- private def decommissionSelf(): Boolean = {
- logInfo("Decommissioning self w/sync")
- try {
- decommissioned = true
- // Tell master we are are decommissioned so it stops trying to schedule
us
- if (driver.nonEmpty) {
- driver.get.askSync[Boolean](DecommissionExecutor(executorId))
+ private var previousAllBlocksMigrated = false
+ private def shutdownIfDone(): Unit = {
+ val numRunningTasks = executor.numRunningTasks
+ logInfo(s"Checking to see if we can shutdown have ${numRunningTasks}
running tasks.")
+ if (executor.numRunningTasks == 0) {
+ if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+ val allBlocksMigrated = env.blockManager.decommissionManager match {
+ case Some(m) => m.allBlocksMigrated
+ case None => false // We haven't started migrations yet.
+ }
+ if (allBlocksMigrated && previousAllBlocksMigrated) {
+ logInfo("No running tasks, all blocks migrated, stopping.")
+ exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+ }
+ previousAllBlocksMigrated = allBlocksMigrated
} else {
- logError("No driver to message decommissioning.")
+ logInfo("No running tasks, no block migration configured, stopping.")
+ exitExecutor(0, "Finished decommissioning", notifyDriver = true)
}
- if (executor != null) {
- executor.decommission()
+ } else {
+ // If there's a running task it could store blocks.
+ previousAllBlocksMigrated = false
+ }
+ }
+
+ private def decommissionSelf(): Boolean = {
+ if (!decommissioned) {
+ logInfo("Decommissioning self w/sync")
Review comment:
Perhaps we should expand what 'w/sync' stands for in the log message ?
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -2039,8 +2047,11 @@ private[spark] class BlockManager(
* Class to handle block manager decommissioning retries
* It creates a Thread to retry offloading all RDD cache blocks
*/
- private class BlockManagerDecommissionManager(conf: SparkConf) {
+ private[spark] class BlockManagerDecommissionManager(conf: SparkConf) {
@volatile private var stopped = false
+ // Since running tasks can add more blocks this can change.
Review comment:
Just to make sure I am totally understanding this: You mean that the
running tasks that were already running when the decommissioning was started at
the executor ? Because, I think we refuse launching new tasks when the
decommissioning has started, so the new blocks being written must be written by
already running tasks. Did I get this right ?
Also, just to confirm I am still following along: I don't see this case
handled in the existing BlockManagerSuite: I believe we are not testing writing
new blocks while the decom/offload is in progress.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
##########
@@ -52,6 +52,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class UpdateDelegationTokens(tokens: Array[Byte])
extends CoarseGrainedClusterMessage
+ case object DecommissionSelf extends CoarseGrainedClusterMessage // Mark as
decommissioned.
Review comment:
IMHO, the DecommissionSelf naming is a bit ambiguous: "Who is self here"
? The sender or the receiver ?
This message is now send from the driver to the executor: So perhaps we
should just repurpose DecommissionExecutor with a check for the executorId ?
Not a big deal but trying to reduce the number of message types introduced
by this feature ;)
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -258,26 +262,65 @@ private[spark] class CoarseGrainedExecutorBackend(
System.exit(code)
}
- private def decommissionSelf(): Boolean = {
- logInfo("Decommissioning self w/sync")
- try {
- decommissioned = true
- // Tell master we are are decommissioned so it stops trying to schedule
us
- if (driver.nonEmpty) {
- driver.get.askSync[Boolean](DecommissionExecutor(executorId))
+ private var previousAllBlocksMigrated = false
Review comment:
Should this variable be marked volatile ?
##########
File path: core/src/main/scala/org/apache/spark/executor/Executor.scala
##########
@@ -233,6 +233,7 @@ private[spark] class Executor(
* Mark an executor for decommissioning and avoid launching new tasks.
*/
private[spark] def decommission(): Unit = {
+ logInfo("Executor asked to decommission. Starting shutdown thread.")
Review comment:
I think this comment looks stale. It should probably be moved to the
CoarseGrainedBackendExecutor. Its also not clear to me what the `decommission`
flag does in the Executor besides just logging.
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1887,7 +1891,7 @@ private[spark] class BlockManager(
* but rather shadows them.
* Requires an Indexed based shuffle resolver.
Review comment:
I think that the comment needs to be updated to reflect what the return
Boolean indicates.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]