agrawaldevesh commented on a change in pull request #29579:
URL: https://github.com/apache/spark/pull/29579#discussion_r483110363
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -92,8 +92,9 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
// Executors that have been lost, but for which we don't yet know the real
exit reason.
private val executorsPendingLossReason = new HashSet[String]
- // Executors which are being decommissioned
- protected val executorsPendingDecommission = new HashSet[String]
+ // Executors which are being decommissioned. Maps from executorId to
+ // workerHost(it's defined when the worker is also decommissioned)
Review comment:
super nit: space after workerHost.
I think workerHost is already an Option and thus already matches the value
type of the executorsPendingDecommission map. Thus, we can perhaps drop the
parenthesis clause entirely ?
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -394,10 +395,15 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingLossReason -= executorId
+ val killedByDriver =
executorsPendingToRemove.remove(executorId).getOrElse(false)
val decommissioned =
executorsPendingDecommission.remove(executorId)
- executorsPendingToRemove
- .remove(executorId).filter(killedByDriver =>
killedByDriver).map(_ => ExecutorKilled)
- .getOrElse(if (decommissioned) ExecutorDecommission() else
reason)
+ if (killedByDriver) {
+ ExecutorKilled
+ } else if (decommissioned.isDefined) {
+ ExecutorDecommission(decommissioned.get)
+ } else {
+ reason
+ }
Review comment:
:-) I can read !.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
##########
@@ -20,12 +20,12 @@ package org.apache.spark.scheduler
/**
* Message providing more detail when an executor is being decommissioned.
* @param message Human readable reason for why the decommissioning is
happening.
- * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in
other places) is
- * being decommissioned too. Used to infer if the
shuffle data might
- * be lost even if the external shuffle service is
enabled.
+ * @param workerHost When workerHost is defined. It means the host (aka the
`node` or `worker`
Review comment:
nit: When workerHost is defined, it means ...
(comma instead of full stop)
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -394,10 +395,15 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingLossReason -= executorId
+ val killedByDriver =
executorsPendingToRemove.remove(executorId).getOrElse(false)
val decommissioned =
executorsPendingDecommission.remove(executorId)
Review comment:
Rename decommissioned to workerHostOpt and perhaps give it an explicit
type: Option[Option[String]]. Its no longer a simple boolean.
##########
File path:
core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
##########
@@ -175,15 +175,15 @@ private[spark] class StandaloneAppClient(
cores))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)
- case ExecutorUpdated(id, state, message, exitStatus, workerLost) =>
+ case ExecutorUpdated(id, state, message, exitStatus, workerHost) =>
Review comment:
Personally, I would still be okay with workerLost being an
Option[String] instead of a Boolean. Obviously, had it been called
"workerIsLost" then we would have to rename it. But I am also fine with the new
name workerHost as well. I don't particularly think that the name workerLost
must connote a boolean.
This ExecutorUpdated message is a case in point where the "lost" part is
meaningful because it refers to the "worker that is lost" as opposed to some
random worker-host.
But no strong feelings on this and I am happy with the choice workerHost.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
##########
@@ -70,7 +71,7 @@ case class ExecutorProcessLost(
* This is used by the task scheduler to remove state associated with the
executor, but
* not yet fail any tasks that were running in the executor before the
executor is "fully" lost.
*
- * @param hostOpt it will be set by [[TaskSchedulerImpl]] when the host is
decommissioned too
+ * @param workerHost it's defined when the worker is decommissioned too
Review comment:
nit: "it's" -> "it is"
Also, should we explicitly bring out the word 'host' here ? "It is defined
when the worker _host_ is decommissioned too"
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
##########
@@ -165,10 +165,13 @@ private[spark] class StandaloneSchedulerBackend(
}
override def executorRemoved(
- fullId: String, message: String, exitStatus: Option[Int], hostOpt:
Option[String]): Unit = {
+ fullId: String,
+ message: String,
+ exitStatus: Option[Int],
+ workerHost: Option[String]): Unit = {
Review comment:
nit: Extra space before workerHost
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]