agrawaldevesh commented on a change in pull request #29579:
URL: https://github.com/apache/spark/pull/29579#discussion_r479917376



##########
File path: core/src/main/scala/org/apache/spark/deploy/master/Master.scala
##########
@@ -909,9 +909,9 @@ private[deploy] class Master(
         exec.application.driver.send(ExecutorUpdated(
           exec.id, ExecutorState.DECOMMISSIONED,
           Some("worker decommissioned"), None,
-          // workerLost is being set to true here to let the driver know that 
the host (aka. worker)
+          // worker host is being set here to let the driver know that the 
host (aka. worker)

Review comment:
       nit: can you reword the comment to be more accurate now :-) 

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1989,15 +1989,15 @@ private[spark] class DAGScheduler(
    */
   private[scheduler] def handleExecutorLost(
       execId: String,
-      workerLost: Boolean): Unit = {
+      hostOpt: Option[String]): Unit = {

Review comment:
       Can you change this method's comment also if you decide to go with 
hostOpt instead of workerLost (perhaps you ought to consider my consider my 
comment on making workerLost itself be an Optional[String]). The comment still 
refers to "standalone worker"

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -489,17 +491,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       decomInfo: ExecutorDecommissionInfo): Boolean = {
 
     logInfo(s"Asking executor $executorId to decommissioning.")
-    try {
-      scheduler.executorDecommission(executorId, decomInfo)
-      if (driverEndpoint != null) {
-        logInfo("Propagating executor decommission to driver.")
-        driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))
-      }
-    } catch {
-      case e: Exception =>
-        logError(s"Unexpected error during decommissioning ${e.toString}", e)
-        return false
-    }
+    scheduler.executorDecommission(executorId, decomInfo)

Review comment:
       Agreed !

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -922,28 +910,8 @@ private[spark] class TaskSchedulerImpl(
     synchronized {
       // Don't bother noting decommissioning for executors that we don't know 
about
       if (executorIdToHost.contains(executorId)) {
-        val oldDecomStateOpt = executorsPendingDecommission.get(executorId)
-        val newDecomState = if (oldDecomStateOpt.isEmpty) {
-          // This is the first time we are hearing of decommissioning this 
executor,
-          // so create a brand new state.
-          ExecutorDecommissionState(
-            clock.getTimeMillis(),
-            decommissionInfo.isHostDecommissioned)
-        } else {
-          val oldDecomState = oldDecomStateOpt.get
-          if (!oldDecomState.isHostDecommissioned && 
decommissionInfo.isHostDecommissioned) {
-            // Only the cluster manager is allowed to send decommission 
messages with
-            // isHostDecommissioned set. So the new decommissionInfo is from 
the cluster
-            // manager and is thus authoritative. Flip isHostDecommissioned to 
true but keep the old
-            // decommission start time.
-            ExecutorDecommissionState(
-              oldDecomState.startTime,
-              isHostDecommissioned = true)
-          } else {
-            oldDecomState
-          }
-        }
-        executorsPendingDecommission(executorId) = newDecomState
+        executorsPendingDecommission(executorId) =
+          ExecutorDecommissionState(clock.getTimeMillis(), 
decommissionInfo.hostOpt)

Review comment:
       Agreed ! This cleanup helps.

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
##########
@@ -69,5 +69,8 @@ case class ExecutorProcessLost(
  *
  * This is used by the task scheduler to remove state associated with the 
executor, but
  * not yet fail any tasks that were running in the executor before the 
executor is "fully" lost.
+ *
+ * @param hostOpt it will be set by [[TaskSchedulerImpl]] when the host is 
decommissioned too
  */
-private [spark] object ExecutorDecommission extends 
ExecutorLossReason("Executor decommission.")
+private [spark] case class ExecutorDecommission(var hostOpt: Option[String] = 
None)

Review comment:
       I am not a fan of this change of making the hostOpt be a var instead of 
a val. I think you only need this for line 932 in TaskSchedulerImpl. I am sure 
you would be able to accommodate that use case in a different way. 
   
   The reason I don't like it is because other ExecutorLossReason's are 
"messages" (for example ExecutorProcessLost) and these messages tend to be 
immutable. I think it's a bit hacky to have ExecutorDecommission masquerading 
as a message but then make it be mutable.
   
   Even ExecutorDecommission is a message that the TaskSchedulerImpl enqueues 
into the event loop of the DAGScheduler. 

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -489,17 +491,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       decomInfo: ExecutorDecommissionInfo): Boolean = {
 
     logInfo(s"Asking executor $executorId to decommissioning.")
-    try {
-      scheduler.executorDecommission(executorId, decomInfo)
-      if (driverEndpoint != null) {
-        logInfo("Propagating executor decommission to driver.")
-        driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))
-      }
-    } catch {
-      case e: Exception =>
-        logError(s"Unexpected error during decommissioning ${e.toString}", e)
-        return false
-    }
+    scheduler.executorDecommission(executorId, decomInfo)
     // Send decommission message to the executor (it could have originated on 
the executor

Review comment:
       nit: While you are here, can you please also close the parenthesis here 
:-) 

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2366,11 +2366,12 @@ private[scheduler] class 
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
       dagScheduler.handleExecutorAdded(execId, host)
 
     case ExecutorLost(execId, reason) =>
-      val workerLost = reason match {
-        case ExecutorProcessLost(_, true, _) => true
-        case _ => false
+      val hostOpt = reason match {
+        case ExecutorProcessLost(_, host, _) => host
+        case ExecutorDecommission(host) => host

Review comment:
       nit: should this be ExecutorDecommission(hostOpt) ... to keep parity 
with the original name. 
   

##########
File path: core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
##########
@@ -188,7 +188,7 @@ private[deploy] object DeployMessages {
   }
 
   case class ExecutorUpdated(id: Int, state: ExecutorState, message: 
Option[String],
-    exitStatus: Option[Int], workerLost: Boolean)
+    exitStatus: Option[Int], hostOpt: Option[String])

Review comment:
       We can also leave the name as workerLost and just make it be an 
Optional[String] ? In the spirit of minimal code change ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to