agrawaldevesh commented on a change in pull request #29579:
URL: https://github.com/apache/spark/pull/29579#discussion_r479868581
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -489,17 +491,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
decomInfo: ExecutorDecommissionInfo): Boolean = {
logInfo(s"Asking executor $executorId to decommissioning.")
- try {
- scheduler.executorDecommission(executorId, decomInfo)
- if (driverEndpoint != null) {
- logInfo("Propagating executor decommission to driver.")
- driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))
- }
- } catch {
- case e: Exception =>
- logError(s"Unexpected error during decommissioning ${e.toString}", e)
- return false
- }
+ scheduler.executorDecommission(executorId, decomInfo)
Review comment:
Just trying to follow why the driver end point message was dropped ?
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -390,16 +390,18 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
case Some(executorInfo) =>
// This must be synchronized because variables mutated
// in this block are read when requesting executors
- val killed = CoarseGrainedSchedulerBackend.this.synchronized {
+ val lossReason = CoarseGrainedSchedulerBackend.this.synchronized {
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingLossReason -= executorId
- executorsPendingDecommission -= executorId
- executorsPendingToRemove.remove(executorId).getOrElse(false)
+ val decommissioned =
executorsPendingDecommission.remove(executorId)
+ executorsPendingToRemove
+ .remove(executorId).filter(killedByDriver =>
killedByDriver).map(_ => ExecutorKilled)
Review comment:
This scala statement is quite a mouthful ? Would an if condition make it
simpler ?
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -922,28 +910,8 @@ private[spark] class TaskSchedulerImpl(
synchronized {
// Don't bother noting decommissioning for executors that we don't know
about
if (executorIdToHost.contains(executorId)) {
- val oldDecomStateOpt = executorsPendingDecommission.get(executorId)
- val newDecomState = if (oldDecomStateOpt.isEmpty) {
- // This is the first time we are hearing of decommissioning this
executor,
- // so create a brand new state.
- ExecutorDecommissionState(
- clock.getTimeMillis(),
- decommissionInfo.isHostDecommissioned)
- } else {
- val oldDecomState = oldDecomStateOpt.get
- if (!oldDecomState.isHostDecommissioned &&
decommissionInfo.isHostDecommissioned) {
- // Only the cluster manager is allowed to send decommission
messages with
- // isHostDecommissioned set. So the new decommissionInfo is from
the cluster
- // manager and is thus authoritative. Flip isHostDecommissioned to
true but keep the old
- // decommission start time.
- ExecutorDecommissionState(
- oldDecomState.startTime,
- isHostDecommissioned = true)
- } else {
- oldDecomState
- }
- }
- executorsPendingDecommission(executorId) = newDecomState
+ executorsPendingDecommission(executorId) =
+ ExecutorDecommissionState(clock.getTimeMillis(),
decommissionInfo.hostOpt)
Review comment:
Just to make sure I am understanding: are you removing the multiple
message handling ? Yeah we do not have good coverage on that but why remove it ?
##########
File path: core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
##########
@@ -188,7 +188,7 @@ private[deploy] object DeployMessages {
}
case class ExecutorUpdated(id: Int, state: ExecutorState, message:
Option[String],
- exitStatus: Option[Int], workerLost: Boolean)
+ exitStatus: Option[Int], hostOpt: Option[String])
Review comment:
I think we need a better name than hostOpt ? How about just "Hostname" ?
The type already conveys that this is an optional.
##########
File path: core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
##########
@@ -188,7 +188,7 @@ private[deploy] object DeployMessages {
}
case class ExecutorUpdated(id: Int, state: ExecutorState, message:
Option[String],
- exitStatus: Option[Int], workerLost: Boolean)
+ exitStatus: Option[Int], hostOpt: Option[String])
Review comment:
How about hostLost ?
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -952,28 +920,19 @@ private[spark] class TaskSchedulerImpl(
override def getExecutorDecommissionState(executorId: String)
: Option[ExecutorDecommissionState] = synchronized {
- executorsPendingDecommission
- .get(executorId)
- .orElse(Option(decommissionedExecutorsRemoved.get(executorId)))
+ executorsPendingDecommission.get(executorId)
}
- override def executorLost(executorId: String, givenReason:
ExecutorLossReason): Unit = {
+ override def executorLost(executorId: String, reason: ExecutorLossReason):
Unit = {
var failedExecutor: Option[String] = None
- val reason = givenReason match {
- // Handle executor process loss due to decommissioning
- case ExecutorProcessLost(message, origWorkerLost, origCausedByApp) =>
- val executorDecommissionState =
getExecutorDecommissionState(executorId)
- ExecutorProcessLost(
- message,
- // Also mark the worker lost if we know that the host was
decommissioned
- origWorkerLost ||
executorDecommissionState.exists(_.isHostDecommissioned),
- // Executor loss is certainly not caused by app if we knew that this
executor is being
- // decommissioned
- causedByApp = executorDecommissionState.isEmpty && origCausedByApp)
- case e => e
- }
synchronized {
+ reason match {
+ case e @ ExecutorDecommission(_) =>
+ e.hostOpt =
getExecutorDecommissionState(executorId).map(_.hostOpt).get
Review comment:
Is hostOpt a var that it can be thus changed ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]