agrawaldevesh commented on a change in pull request #29788:
URL: https://github.com/apache/spark/pull/29788#discussion_r491083572
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2368,7 +2368,7 @@ private[scheduler] class
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case ExecutorLost(execId, reason) =>
val workerHost = reason match {
case ExecutorProcessLost(_, workerHost, _) => workerHost
- case ExecutorDecommission(workerHost) => workerHost
+ case ExecutorDecommission(_, host) => host
Review comment:
See comment below for `ExecutorDecommission` ... Should this be changed
to a:
```
case decom @ ExecutorDecommission => decom.workerHost // or decom.host
```
You don't need to add an extra '_' then.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
##########
@@ -71,7 +71,8 @@ case class ExecutorProcessLost(
* This is used by the task scheduler to remove state associated with the
executor, but
* not yet fail any tasks that were running in the executor before the
executor is "fully" lost.
*
- * @param workerHost it is defined when the worker is decommissioned too
+ * @param reason the reason why the executor is decommissioned
+ * @param host it is defined when the host where the executor located is
decommissioned too
*/
-private [spark] case class ExecutorDecommission(workerHost: Option[String] =
None)
- extends ExecutorLossReason("Executor decommission.")
+private [spark] case class ExecutorDecommission(reason: String, host:
Option[String] = None)
Review comment:
My scala knowledge is really really poor, but I would rather we make
this be a non case class if you are planning to do this. Currently, I think the
field "reason" is going to be duplicated in the base class ExecutorLossReason
and the ExecutorDecommission.
That's also the reason why you are pattern matching it above with an
additional _ (for the `reason`) argument, when you really don't care about the
`reason`.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionReason.scala
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+private[spark] sealed trait ExecutorDecommissionReason {
+ val reason: String = "decommissioned"
Review comment:
I don't think the `reason` field is really needed anywhere, besides it
being used for `toString` ? Should we just require overriding `toString` by
marking `toString` abstract ? I don't think that child classes need to override
both `toString` and `reason` : I would prefer we just override methods instead
of fields.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -970,6 +970,9 @@ private[spark] class TaskSchedulerImpl(
logDebug(s"Executor $executorId on $hostPort lost, but reason not yet
known.")
case ExecutorKilled =>
logInfo(s"Executor $executorId on $hostPort killed by driver.")
+ case ExecutorDecommission(reason, _) =>
+ // use logInfo instead of logError as the loss of decommissioned
executor is what we expect
+ logInfo(s"Decommissioned executor $executorId on $hostPort shutdown:
$reason")
Review comment:
instead of 'shutdown', should we say 'is finally lost' ? To be more
accurate in this setting.
+1 on this change to avoid log spam.
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -991,7 +991,7 @@ private[spark] class TaskSetManager(
for ((tid, info) <- taskInfos if info.running && info.executorId ==
execId) {
val exitCausedByApp: Boolean = reason match {
case exited: ExecutorExited => exited.exitCausedByApp
- case ExecutorKilled | ExecutorDecommission(_) => false
+ case ExecutorKilled | ExecutorDecommission(_, _) => false
Review comment:
I am wondering if we should instead pattern match in a separate arm
like:
```
_ @ ExecutorDecommission => false
```
To avoid having to change the case arms when we make changes to the
structure definitions.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionReason.scala
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+private[spark] sealed trait ExecutorDecommissionReason {
Review comment:
@holdenk, can you please provide an example of how having this as a
sealed trait would limit the flexibility ?
It is marked as a private[spark], so the resource manager specific scheduler
backends, should be able to extend it ... no ?
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
##########
@@ -191,6 +192,18 @@ private[spark] class KubernetesClusterSchedulerBackend(
private class KubernetesDriverEndpoint extends DriverEndpoint {
+ override def receiveAndReply(context: RpcCallContext):
PartialFunction[Any, Unit] = {
+ case ExecutorDecommissioning(executorId) =>
Review comment:
I didn't fully follow the need for distinction b/w the K8s case and the
simple executor triggered case.
I thought K8s only needs the SIGPWR based thing, and indeed
ExecutorDecommissioning is only sent in response to a SIGPWR.
So I am missing why we override `ExecutorDecommissioning` here and the
motivation for `K8SDecommission`.
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
##########
@@ -88,44 +88,35 @@ private[spark] trait ExecutorAllocationClient {
* Default implementation delegates to kill, scheduler must override
* if it supports graceful decommissioning.
*
- * @param executorsAndDecomInfo identifiers of executors & decom info.
+ * @param executorsAndDecomReason identifiers of executors & decom reason.
* @param adjustTargetNumExecutors whether the target number of executors
will be adjusted down
* after these executors have been
decommissioned.
- * @param triggeredByExecutor whether the decommission is triggered at
executor.
* @return the ids of the executors acknowledged by the cluster manager to
be removed.
*/
def decommissionExecutors(
- executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
- adjustTargetNumExecutors: Boolean,
- triggeredByExecutor: Boolean): Seq[String] = {
- killExecutors(executorsAndDecomInfo.map(_._1),
+ executorsAndDecomReason: Array[(String, ExecutorDecommissionReason)],
Review comment:
This is how it was earlier -- so we aren't changing the semantics save
the renaming :-) And plus yes this can happen: Different executors on different
hosts would have different ExecutorDecommissionReason/Info with different hosts
potentially in them.
This is simply a bulk api : Instead of making n calls we are folding them
into one.
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
##########
@@ -17,7 +17,7 @@
package org.apache.spark
-import org.apache.spark.scheduler.ExecutorDecommissionInfo
+import org.apache.spark.scheduler.ExecutorDecommissionReason
Review comment:
+1, I would argue against un-necessary renaming even if it seems a bit
"unnatural". It creates un-necessary diff noise.
To me "Info" and "Reason" are both similar: They both portend "additional
information".
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionReason.scala
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+private[spark] sealed trait ExecutorDecommissionReason {
+ val reason: String = "decommissioned"
+ override def toString: String = reason
+}
+
+/**
+ * For the case where decommission is trigger because of executor dynamic
allocation
+ */
+case class DynamicAllocationDecommission() extends ExecutorDecommissionReason {
+ override val reason: String = "decommissioned by dynamic allocation"
+}
+
+/**
+ * For the case where decommission is triggered at executor fist.
+ */
+class ExecutorTriggeredDecommission extends ExecutorDecommissionReason
+
+/**
+ * For the Kubernetes workloads
+ */
+case class K8SDecommission() extends ExecutorTriggeredDecommission
+
+/**
+ * For the Standalone workloads.
+ * @param workerHost When workerHost is defined, it means the worker has been
decommissioned too.
+ * Used to infer if the shuffle data might be lost even if
the external shuffle
+ * service is enabled.
+ */
+case class StandaloneDecommission(workerHost: Option[String] = None)
+ extends ExecutorDecommissionReason {
+ override val reason: String = if (workerHost.isDefined) {
+ s"Worker ${workerHost.get} decommissioned"
+ } else {
+ "decommissioned"
+ }
+}
+
+/**
Review comment:
Can you move this test only class somewhere in the test only package ?
See TestResourceIDs as an example.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]