agrawaldevesh commented on a change in pull request #29367:
URL: https://github.com/apache/spark/pull/29367#discussion_r468955452
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +450,88 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
protected def minRegisteredRatio: Double = _minRegisteredRatio
+ /**
+ * Request that the cluster manager decommission the specified executors.
+ *
+ * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+ * @param adjustTargetNumExecutors whether the target number of executors
will be adjusted down
+ * after these executors have been
decommissioned.
+ * @return the ids of the executors acknowledged by the cluster manager to
be removed.
+ */
+ override def decommissionExecutors(
+ executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
+ adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+ val executorsToDecommission = executorsAndDecomInfo.filter { case
(executorId, _) =>
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ // Only bother decommissioning executors which are alive.
+ if (isExecutorActive(executorId)) {
+ executorsPendingDecommission += executorId
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ // If we don't want to replace the executors we are decommissioning
+ if (adjustTargetNumExecutors) {
Review comment:
should there be a check for executorsToDecommission.notEmpty ?
Otherwise, we will request executors again with no change in the
adjustExecutors helper method. Could again lead to some unnecessary strain on
the driver.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +450,88 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
protected def minRegisteredRatio: Double = _minRegisteredRatio
+ /**
+ * Request that the cluster manager decommission the specified executors.
+ *
+ * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+ * @param adjustTargetNumExecutors whether the target number of executors
will be adjusted down
+ * after these executors have been
decommissioned.
+ * @return the ids of the executors acknowledged by the cluster manager to
be removed.
+ */
+ override def decommissionExecutors(
+ executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
+ adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+ val executorsToDecommission = executorsAndDecomInfo.filter { case
(executorId, _) =>
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ // Only bother decommissioning executors which are alive.
+ if (isExecutorActive(executorId)) {
+ executorsPendingDecommission += executorId
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ // If we don't want to replace the executors we are decommissioning
+ if (adjustTargetNumExecutors) {
+ adjustExecutors(executorsToDecommission.map(_._1))
+ }
+
+ val decommissioned = executorsToDecommission.filter { case (executorId,
decomInfo) =>
+ doDecommission(executorId, decomInfo)
+ }.map(_._1)
+ decommissioned
+ }
+
+
+ private def doDecommission(executorId: String,
+ decomInfo: ExecutorDecommissionInfo): Boolean = {
+
+ logInfo(s"Asking executor $executorId to decommissioning.")
+ try {
+ scheduler.executorDecommission(executorId, decomInfo)
+ if (driverEndpoint != null) {
+ logInfo("Propagating executor decommission to driver.")
+ driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))
Review comment:
You are so RIGHT !!!. Sorry, for not spotting that. In which case I
think this is not just fine but also neat in that the code paths for
decommissioned are pretty nicely converged now.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +450,88 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
protected def minRegisteredRatio: Double = _minRegisteredRatio
+ /**
+ * Request that the cluster manager decommission the specified executors.
+ *
+ * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+ * @param adjustTargetNumExecutors whether the target number of executors
will be adjusted down
+ * after these executors have been
decommissioned.
+ * @return the ids of the executors acknowledged by the cluster manager to
be removed.
+ */
+ override def decommissionExecutors(
+ executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
+ adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+ val executorsToDecommission = executorsAndDecomInfo.filter { case
(executorId, _) =>
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ // Only bother decommissioning executors which are alive.
+ if (isExecutorActive(executorId)) {
+ executorsPendingDecommission += executorId
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ // If we don't want to replace the executors we are decommissioning
+ if (adjustTargetNumExecutors) {
+ adjustExecutors(executorsToDecommission.map(_._1))
+ }
+
+ val decommissioned = executorsToDecommission.filter { case (executorId,
decomInfo) =>
+ doDecommission(executorId, decomInfo)
+ }.map(_._1)
+ decommissioned
Review comment:
nit: This looks like it was put for debugging. Can we omit the temporary
val decommissioned ?
##########
File path:
core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala
##########
@@ -242,8 +242,10 @@ class DecommissionWorkerSuite
assert(jobResult === 2)
}
// 6 tasks: 2 from first stage, 2 rerun again from first stage, 2nd stage
attempt 1 and 2.
- val tasksSeen = listener.getTasksFinished()
Review comment:
Lets undo this change then. I am rerunning this PR locally to debug.
Thanks for sharing the GHA link. It helps.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]