agrawaldevesh commented on a change in pull request #29579:
URL: https://github.com/apache/spark/pull/29579#discussion_r479917376
##########
File path: core/src/main/scala/org/apache/spark/deploy/master/Master.scala
##########
@@ -909,9 +909,9 @@ private[deploy] class Master(
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.DECOMMISSIONED,
Some("worker decommissioned"), None,
- // workerLost is being set to true here to let the driver know that
the host (aka. worker)
+ // worker host is being set here to let the driver know that the
host (aka. worker)
Review comment:
nit: can you reword the comment to be more accurate now :-)
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1989,15 +1989,15 @@ private[spark] class DAGScheduler(
*/
private[scheduler] def handleExecutorLost(
execId: String,
- workerLost: Boolean): Unit = {
+ hostOpt: Option[String]): Unit = {
Review comment:
Can you change this method's comment also if you decide to go with
hostOpt instead of workerLost (perhaps you ought to consider my consider my
comment on making workerLost itself be an Optional[String]). The comment still
refers to "standalone worker"
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -489,17 +491,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
decomInfo: ExecutorDecommissionInfo): Boolean = {
logInfo(s"Asking executor $executorId to decommissioning.")
- try {
- scheduler.executorDecommission(executorId, decomInfo)
- if (driverEndpoint != null) {
- logInfo("Propagating executor decommission to driver.")
- driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))
- }
- } catch {
- case e: Exception =>
- logError(s"Unexpected error during decommissioning ${e.toString}", e)
- return false
- }
+ scheduler.executorDecommission(executorId, decomInfo)
Review comment:
Agreed !
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -922,28 +910,8 @@ private[spark] class TaskSchedulerImpl(
synchronized {
// Don't bother noting decommissioning for executors that we don't know
about
if (executorIdToHost.contains(executorId)) {
- val oldDecomStateOpt = executorsPendingDecommission.get(executorId)
- val newDecomState = if (oldDecomStateOpt.isEmpty) {
- // This is the first time we are hearing of decommissioning this
executor,
- // so create a brand new state.
- ExecutorDecommissionState(
- clock.getTimeMillis(),
- decommissionInfo.isHostDecommissioned)
- } else {
- val oldDecomState = oldDecomStateOpt.get
- if (!oldDecomState.isHostDecommissioned &&
decommissionInfo.isHostDecommissioned) {
- // Only the cluster manager is allowed to send decommission
messages with
- // isHostDecommissioned set. So the new decommissionInfo is from
the cluster
- // manager and is thus authoritative. Flip isHostDecommissioned to
true but keep the old
- // decommission start time.
- ExecutorDecommissionState(
- oldDecomState.startTime,
- isHostDecommissioned = true)
- } else {
- oldDecomState
- }
- }
- executorsPendingDecommission(executorId) = newDecomState
+ executorsPendingDecommission(executorId) =
+ ExecutorDecommissionState(clock.getTimeMillis(),
decommissionInfo.hostOpt)
Review comment:
Agreed ! This cleanup helps.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
##########
@@ -69,5 +69,8 @@ case class ExecutorProcessLost(
*
* This is used by the task scheduler to remove state associated with the
executor, but
* not yet fail any tasks that were running in the executor before the
executor is "fully" lost.
+ *
+ * @param hostOpt it will be set by [[TaskSchedulerImpl]] when the host is
decommissioned too
*/
-private [spark] object ExecutorDecommission extends
ExecutorLossReason("Executor decommission.")
+private [spark] case class ExecutorDecommission(var hostOpt: Option[String] =
None)
Review comment:
I am not a fan of this change of making the hostOpt be a var instead of
a val. I think you only need this for line 932 in TaskSchedulerImpl. I am sure
you would be able to accommodate that use case in a different way.
The reason I don't like it is because other ExecutorLossReason's are
"messages" (for example ExecutorProcessLost) and these messages tend to be
immutable. I think it's a bit hacky to have ExecutorDecommission masquerading
as a message but then make it be mutable.
Even ExecutorDecommission is a message that the TaskSchedulerImpl enqueues
into the event loop of the DAGScheduler.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -489,17 +491,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
decomInfo: ExecutorDecommissionInfo): Boolean = {
logInfo(s"Asking executor $executorId to decommissioning.")
- try {
- scheduler.executorDecommission(executorId, decomInfo)
- if (driverEndpoint != null) {
- logInfo("Propagating executor decommission to driver.")
- driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))
- }
- } catch {
- case e: Exception =>
- logError(s"Unexpected error during decommissioning ${e.toString}", e)
- return false
- }
+ scheduler.executorDecommission(executorId, decomInfo)
// Send decommission message to the executor (it could have originated on
the executor
Review comment:
nit: While you are here, can you please also close the parenthesis here
:-)
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2366,11 +2366,12 @@ private[scheduler] class
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId, reason) =>
- val workerLost = reason match {
- case ExecutorProcessLost(_, true, _) => true
- case _ => false
+ val hostOpt = reason match {
+ case ExecutorProcessLost(_, host, _) => host
+ case ExecutorDecommission(host) => host
Review comment:
nit: should this be ExecutorDecommission(hostOpt) ... to keep parity
with the original name.
##########
File path: core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
##########
@@ -188,7 +188,7 @@ private[deploy] object DeployMessages {
}
case class ExecutorUpdated(id: Int, state: ExecutorState, message:
Option[String],
- exitStatus: Option[Int], workerLost: Boolean)
+ exitStatus: Option[Int], hostOpt: Option[String])
Review comment:
We can also leave the name as workerLost and just make it be an
Optional[String] ? In the spirit of minimal code change ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]