[GitHub] [spark] Ngone51 commented on a change in pull request #29452: [SPARK-32643][CORE] Consolidate state decommissioning in the TaskSchedulerImpl realm
Ngone51 commented on a change in pull request #29452: URL: https://github.com/apache/spark/pull/29452#discussion_r472663605 ## File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ## @@ -1051,15 +1049,19 @@ private[spark] class TaskSetManager( logDebug("Task length threshold for speculation: " + threshold) for (tid <- runningTasksSet) { var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold) -if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) { - // Check whether this task will finish before the exectorKillTime assuming - // it will take medianDuration overall. If this task cannot finish within - // executorKillInterval, then this task is a candidate for speculation - val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + medianDuration - val canExceedDeadline = tidToExecutorKillTimeMapping(tid) < -taskEndTimeBasedOnMedianDuration - if (canExceedDeadline) { -speculated = checkAndSubmitSpeculatableTask(tid, time, 0) +if (!speculated && executorDecommissionKillInterval.nonEmpty) { + val taskInfo = taskInfos(tid) + val decomState = sched.getExecutorDecommissionState(taskInfo.executorId) + if (decomState.nonEmpty) { +// Check whether this task will finish before the exectorKillTime assuming Review comment: We don't have `exectorKillTime` anymore after removing `tidToExecutorKillTimeMapping`. Shall we reword the comment a little bit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #29452: [SPARK-32643][CORE] Consolidate state decommissioning in the TaskSchedulerImpl realm
Ngone51 commented on a change in pull request #29452: URL: https://github.com/apache/spark/pull/29452#discussion_r472657515 ## File path: core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala ## @@ -18,11 +18,22 @@ package org.apache.spark.scheduler /** - * Provides more detail when an executor is being decommissioned. + * Message providing more detail when an executor is being decommissioned. * @param message Human readable reason for why the decommissioning is happening. * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is * being decommissioned too. Used to infer if the shuffle data might * be lost even if the external shuffle service is enabled. */ private[spark] case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean) + +/** + * State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived + * from the info message above but it is kept distinct to allow the state to evolve independently + * from the message. + */ +case class ExecutorDecommissionState( +message: String, +// Timestamp in milliseconds when decommissioning was triggered +tsMillis: Long, Review comment: How about `startTime`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #29452: [SPARK-32643][CORE] Consolidate state decommissioning in the TaskSchedulerImpl realm
Ngone51 commented on a change in pull request #29452: URL: https://github.com/apache/spark/pull/29452#discussion_r472655476 ## File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ## @@ -1051,15 +1049,19 @@ private[spark] class TaskSetManager( logDebug("Task length threshold for speculation: " + threshold) for (tid <- runningTasksSet) { var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold) -if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) { - // Check whether this task will finish before the exectorKillTime assuming - // it will take medianDuration overall. If this task cannot finish within - // executorKillInterval, then this task is a candidate for speculation - val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + medianDuration - val canExceedDeadline = tidToExecutorKillTimeMapping(tid) < -taskEndTimeBasedOnMedianDuration - if (canExceedDeadline) { -speculated = checkAndSubmitSpeculatableTask(tid, time, 0) +if (!speculated && executorDecommissionKillInterval.nonEmpty) { Review comment: I see, we don't have `tidToExecutorKillTimeMapping` anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #29452: [SPARK-32643][CORE] Consolidate state decommissioning in the TaskSchedulerImpl realm
Ngone51 commented on a change in pull request #29452: URL: https://github.com/apache/spark/pull/29452#discussion_r472653962 ## File path: core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala ## @@ -18,11 +18,21 @@ package org.apache.spark.scheduler /** - * Provides more detail when an executor is being decommissioned. + * Message providing more detail when an executor is being decommissioned. * @param message Human readable reason for why the decommissioning is happening. * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is * being decommissioned too. Used to infer if the shuffle data might * be lost even if the external shuffle service is enabled. */ private[spark] case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean) + +/** + * State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived + * from the info message above but it is kept distinct to allow the state to evolve independently + * from the message. + */ +case class ExecutorDecommissionState(message: String, Review comment: Yes, decoupling +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #29452: [SPARK-32643][CORE] Consolidate state decommissioning in the TaskSchedulerImpl realm
Ngone51 commented on a change in pull request #29452: URL: https://github.com/apache/spark/pull/29452#discussion_r471997617 ## File path: core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala ## @@ -18,11 +18,21 @@ package org.apache.spark.scheduler /** - * Provides more detail when an executor is being decommissioned. + * Message providing more detail when an executor is being decommissioned. * @param message Human readable reason for why the decommissioning is happening. * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is * being decommissioned too. Used to infer if the shuffle data might * be lost even if the external shuffle service is enabled. */ private[spark] case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean) + +/** + * State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived + * from the info message above but it is kept distinct to allow the state to evolve independently + * from the message. + */ +case class ExecutorDecommissionState(message: String, Review comment: > So I felt that adding the timestamp field to a "message' is not a good idea because it bloats it un-necessarily with information it does not need. It also raises the question what does that timestamp even mean ? Timestamp of the executor (which can originate this message) or timestamp of the driver ? The timestamp will be used to calculate the `canExceedDeadline` in `TaskSetManager` later? And the timestamp means the approximate time the executor starts to decommission. > Its not a problem to couple these -- its just a preference to not couple them so that we can change them as time goes by (evolve them). Having a copy is redundant but it is also flexible. I just can not imagine what kind of evolution would make things to be hard to handle after we couple them. Actually, what worries me more is that `ExecutorDecommissionInfo` might not satisfy the requirement of other clusters(if we'd support them in the future). That could be a reason that we should not couple `ExecutorDecommissionState` with `ExecutorDecommissionInfo`. This also reminds me again that we'd better have an interface for `ExecutorDecommissionInfo` like `ExecutorLossReason`, so we could have a way to work with other cluster managers rather than Standalone only. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #29452: [SPARK-32643][CORE] Consolidate state decommissioning in the TaskSchedulerImpl realm
Ngone51 commented on a change in pull request #29452: URL: https://github.com/apache/spark/pull/29452#discussion_r471974168 ## File path: core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala ## @@ -18,11 +18,21 @@ package org.apache.spark.scheduler /** - * Provides more detail when an executor is being decommissioned. + * Message providing more detail when an executor is being decommissioned. * @param message Human readable reason for why the decommissioning is happening. * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is * being decommissioned too. Used to infer if the shuffle data might * be lost even if the external shuffle service is enabled. */ private[spark] case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean) + +/** + * State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived + * from the info message above but it is kept distinct to allow the state to evolve independently + * from the message. + */ +case class ExecutorDecommissionState(message: String, + // Timestamp in milliseconds when decommissioning was triggered + tsMillis: Long, + isHostDecommissioned: Boolean) Review comment: wrong indent This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #29452: [SPARK-32643][CORE] Consolidate state decommissioning in the TaskSchedulerImpl realm
Ngone51 commented on a change in pull request #29452: URL: https://github.com/apache/spark/pull/29452#discussion_r471974036 ## File path: core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala ## @@ -18,11 +18,21 @@ package org.apache.spark.scheduler /** - * Provides more detail when an executor is being decommissioned. + * Message providing more detail when an executor is being decommissioned. * @param message Human readable reason for why the decommissioning is happening. * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is * being decommissioned too. Used to infer if the shuffle data might * be lost even if the external shuffle service is enabled. */ private[spark] case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean) + +/** + * State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived + * from the info message above but it is kept distinct to allow the state to evolve independently + * from the message. + */ +case class ExecutorDecommissionState(message: String, Review comment: I'd even prefer `ExecutorDecommissionInfo(message: String, tsMillis: Long, isHostDecommissioned: Boolean)` to make codes simpler...and I don't understand "messages to evolve quite rapidly"...the `message` parameter is a `String` type, it should support any arbitrary strings. Why it's a problem? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org