agrawaldevesh commented on a change in pull request #29452:
URL: https://github.com/apache/spark/pull/29452#discussion_r475310046
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
##########
@@ -18,11 +18,22 @@
package org.apache.spark.scheduler
/**
- * Provides more detail when an executor is being decommissioned.
+ * Message providing more detail when an executor is being decommissioned.
* @param message Human readable reason for why the decommissioning is
happening.
* @param isHostDecommissioned Whether the host (aka the `node` or `worker` in
other places) is
* being decommissioned too. Used to infer if the
shuffle data might
* be lost even if the external shuffle service is
enabled.
*/
private[spark]
case class ExecutorDecommissionInfo(message: String, isHostDecommissioned:
Boolean)
+
+/**
+ * State related to decommissioning that is kept by the TaskSchedulerImpl.
This state is derived
+ * from the info message above but it is kept distinct to allow the state to
evolve independently
+ * from the message.
+ */
+case class ExecutorDecommissionState(
+ message: String,
Review comment:
So far that need hasn't come up :-) But when it does, we can easily add
it.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
##########
@@ -18,11 +18,22 @@
package org.apache.spark.scheduler
/**
- * Provides more detail when an executor is being decommissioned.
+ * Message providing more detail when an executor is being decommissioned.
* @param message Human readable reason for why the decommissioning is
happening.
* @param isHostDecommissioned Whether the host (aka the `node` or `worker` in
other places) is
* being decommissioned too. Used to infer if the
shuffle data might
* be lost even if the external shuffle service is
enabled.
*/
private[spark]
case class ExecutorDecommissionInfo(message: String, isHostDecommissioned:
Boolean)
+
+/**
+ * State related to decommissioning that is kept by the TaskSchedulerImpl.
This state is derived
+ * from the info message above but it is kept distinct to allow the state to
evolve independently
+ * from the message.
+ */
+case class ExecutorDecommissionState(
+ message: String,
+ // Timestamp the decommissioning commenced in millis since epoch of the
driver's clock
Review comment:
Yeah, it is used to compute the formerly known
`tidToExecutorKillTimeMapping` (search for this on the code on the left). It's
not so much for expiry of the decommission state, for which we are using the
cache that you suggested in the previous PR.
Good suggestion to add some idea of how it is used. I will add a comment.
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1123,14 +1127,6 @@ private[spark] class TaskSetManager(
def executorDecommission(execId: String): Unit = {
recomputeLocality()
- if (speculationEnabled) {
Review comment:
This was used as an efficiency improvement: To not do this book keeping
in the driver if the speculation is not enabled. Save both some cpu cycles and
memory.
Now this check is done in checkSpeculatableTasks, which is not even called
if speculation is disabled. And thus automatically begets this efficiency
improvement. This is a positive side effect of changing the book keeping by
merging tidToExecutorKillTimeMapping into executorDecommissionState.
In the meanwhile I will hunt for a suitable test that adds some coverage
here or consider adding one.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -926,18 +926,21 @@ private[spark] class TaskSchedulerImpl(
// and some of those can have isHostDecommissioned false. We merge
them such that
// if we heard isHostDecommissioned ever true, then we keep that one
since it is
// most likely coming from the cluster manager and thus authoritative
- val oldDecomInfo = executorsPendingDecommission.get(executorId)
- if (!oldDecomInfo.exists(_.isHostDecommissioned)) {
- executorsPendingDecommission(executorId) = decommissionInfo
+ val oldDecomState = executorsPendingDecommission.get(executorId)
+ if (!oldDecomState.exists(_.isHostDecommissioned)) {
+ executorsPendingDecommission(executorId) = ExecutorDecommissionState(
+ decommissionInfo.message,
+ oldDecomState.map(_.startTime).getOrElse(clock.getTimeMillis()),
Review comment:
Sure, I will tweak the comment above.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]