agrawaldevesh commented on a change in pull request #29579:
URL: https://github.com/apache/spark/pull/29579#discussion_r483110363



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -92,8 +92,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // Executors that have been lost, but for which we don't yet know the real 
exit reason.
   private val executorsPendingLossReason = new HashSet[String]
 
-  // Executors which are being decommissioned
-  protected val executorsPendingDecommission = new HashSet[String]
+  // Executors which are being decommissioned. Maps from executorId to
+  // workerHost(it's defined when the worker is also decommissioned)

Review comment:
       super nit: space after workerHost. 
   
   I think workerHost is already an Option and thus already matches the value 
type of the executorsPendingDecommission map. Thus, we can perhaps drop the 
parenthesis clause entirely ?

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -394,10 +395,15 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
             addressToExecutorId -= executorInfo.executorAddress
             executorDataMap -= executorId
             executorsPendingLossReason -= executorId
+            val killedByDriver = 
executorsPendingToRemove.remove(executorId).getOrElse(false)
             val decommissioned = 
executorsPendingDecommission.remove(executorId)
-            executorsPendingToRemove
-              .remove(executorId).filter(killedByDriver => 
killedByDriver).map(_ => ExecutorKilled)
-              .getOrElse(if (decommissioned) ExecutorDecommission() else 
reason)
+            if (killedByDriver) {
+              ExecutorKilled
+            } else if (decommissioned.isDefined) {
+              ExecutorDecommission(decommissioned.get)
+            } else {
+              reason
+            }

Review comment:
       :-) I can read !. 

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
##########
@@ -20,12 +20,12 @@ package org.apache.spark.scheduler
 /**
  * Message providing more detail when an executor is being decommissioned.
  * @param message Human readable reason for why the decommissioning is 
happening.
- * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in 
other places) is
- *                             being decommissioned too. Used to infer if the 
shuffle data might
- *                             be lost even if the external shuffle service is 
enabled.
+ * @param workerHost When workerHost is defined. It means the host (aka the 
`node` or `worker`

Review comment:
       nit: When workerHost is defined, it means ...
   
   (comma instead of full stop)

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -394,10 +395,15 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
             addressToExecutorId -= executorInfo.executorAddress
             executorDataMap -= executorId
             executorsPendingLossReason -= executorId
+            val killedByDriver = 
executorsPendingToRemove.remove(executorId).getOrElse(false)
             val decommissioned = 
executorsPendingDecommission.remove(executorId)

Review comment:
       Rename decommissioned to workerHostOpt and perhaps give it an explicit 
type: Option[Option[String]]. Its no longer a simple boolean.

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
##########
@@ -175,15 +175,15 @@ private[spark] class StandaloneAppClient(
           cores))
         listener.executorAdded(fullId, workerId, hostPort, cores, memory)
 
-      case ExecutorUpdated(id, state, message, exitStatus, workerLost) =>
+      case ExecutorUpdated(id, state, message, exitStatus, workerHost) =>

Review comment:
       Personally, I would still be okay with workerLost being an 
Option[String] instead of a Boolean. Obviously, had it been called 
"workerIsLost" then we would have to rename it. But I am also fine with the new 
name workerHost as well. I don't particularly think that the name workerLost 
must connote a boolean.
   
   This ExecutorUpdated message is a case in point where the "lost" part is 
meaningful because it refers to the "worker that is lost" as opposed to some 
random worker-host. 
   
   But no strong feelings on this and I am happy with the choice workerHost.

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
##########
@@ -70,7 +71,7 @@ case class ExecutorProcessLost(
  * This is used by the task scheduler to remove state associated with the 
executor, but
  * not yet fail any tasks that were running in the executor before the 
executor is "fully" lost.
  *
- * @param hostOpt it will be set by [[TaskSchedulerImpl]] when the host is 
decommissioned too
+ * @param workerHost it's defined when the worker is decommissioned too

Review comment:
       nit: "it's" -> "it is"
   
   Also, should we explicitly bring out the word 'host' here ? "It is defined 
when the worker _host_ is decommissioned too"

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
##########
@@ -165,10 +165,13 @@ private[spark] class StandaloneSchedulerBackend(
   }
 
   override def executorRemoved(
-      fullId: String, message: String, exitStatus: Option[Int], hostOpt: 
Option[String]): Unit = {
+      fullId: String,
+      message: String,
+      exitStatus: Option[Int],
+       workerHost: Option[String]): Unit = {

Review comment:
       nit: Extra space before workerHost




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to