[GitHub] [spark] Ngone51 commented on a change in pull request #29452: [SPARK-32643][CORE] Consolidate state decommissioning in the TaskSchedulerImpl realm

2020-08-18 Thread GitBox


Ngone51 commented on a change in pull request #29452:
URL: https://github.com/apache/spark/pull/29452#discussion_r472663605



##
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##
@@ -1051,15 +1049,19 @@ private[spark] class TaskSetManager(
   logDebug("Task length threshold for speculation: " + threshold)
   for (tid <- runningTasksSet) {
 var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
-if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
-  // Check whether this task will finish before the exectorKillTime 
assuming
-  // it will take medianDuration overall. If this task cannot finish 
within
-  // executorKillInterval, then this task is a candidate for 
speculation
-  val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + 
medianDuration
-  val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
-taskEndTimeBasedOnMedianDuration
-  if (canExceedDeadline) {
-speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
+if (!speculated && executorDecommissionKillInterval.nonEmpty) {
+  val taskInfo = taskInfos(tid)
+  val decomState = 
sched.getExecutorDecommissionState(taskInfo.executorId)
+  if (decomState.nonEmpty) {
+// Check whether this task will finish before the exectorKillTime 
assuming

Review comment:
   We don't have `exectorKillTime` anymore after removing 
`tidToExecutorKillTimeMapping`. Shall we reword the comment a little bit?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #29452: [SPARK-32643][CORE] Consolidate state decommissioning in the TaskSchedulerImpl realm

2020-08-18 Thread GitBox


Ngone51 commented on a change in pull request #29452:
URL: https://github.com/apache/spark/pull/29452#discussion_r472657515



##
File path: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
##
@@ -18,11 +18,22 @@
 package org.apache.spark.scheduler
 
 /**
- * Provides more detail when an executor is being decommissioned.
+ * Message providing more detail when an executor is being decommissioned.
  * @param message Human readable reason for why the decommissioning is 
happening.
  * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in 
other places) is
  * being decommissioned too. Used to infer if the 
shuffle data might
  * be lost even if the external shuffle service is 
enabled.
  */
 private[spark]
 case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: 
Boolean)
+
+/**
+ * State related to decommissioning that is kept by the TaskSchedulerImpl. 
This state is derived
+ * from the info message above but it is kept distinct to allow the state to 
evolve independently
+ * from the message.
+ */
+case class ExecutorDecommissionState(
+message: String,
+// Timestamp in milliseconds when decommissioning was triggered
+tsMillis: Long,

Review comment:
   How about `startTime`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #29452: [SPARK-32643][CORE] Consolidate state decommissioning in the TaskSchedulerImpl realm

2020-08-18 Thread GitBox


Ngone51 commented on a change in pull request #29452:
URL: https://github.com/apache/spark/pull/29452#discussion_r472655476



##
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##
@@ -1051,15 +1049,19 @@ private[spark] class TaskSetManager(
   logDebug("Task length threshold for speculation: " + threshold)
   for (tid <- runningTasksSet) {
 var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
-if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
-  // Check whether this task will finish before the exectorKillTime 
assuming
-  // it will take medianDuration overall. If this task cannot finish 
within
-  // executorKillInterval, then this task is a candidate for 
speculation
-  val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + 
medianDuration
-  val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
-taskEndTimeBasedOnMedianDuration
-  if (canExceedDeadline) {
-speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
+if (!speculated && executorDecommissionKillInterval.nonEmpty) {

Review comment:
   I see, we don't have `tidToExecutorKillTimeMapping` anymore.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #29452: [SPARK-32643][CORE] Consolidate state decommissioning in the TaskSchedulerImpl realm

2020-08-18 Thread GitBox


Ngone51 commented on a change in pull request #29452:
URL: https://github.com/apache/spark/pull/29452#discussion_r472653962



##
File path: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
##
@@ -18,11 +18,21 @@
 package org.apache.spark.scheduler
 
 /**
- * Provides more detail when an executor is being decommissioned.
+ * Message providing more detail when an executor is being decommissioned.
  * @param message Human readable reason for why the decommissioning is 
happening.
  * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in 
other places) is
  * being decommissioned too. Used to infer if the 
shuffle data might
  * be lost even if the external shuffle service is 
enabled.
  */
 private[spark]
 case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: 
Boolean)
+
+/**
+ * State related to decommissioning that is kept by the TaskSchedulerImpl. 
This state is derived
+ * from the info message above but it is kept distinct to allow the state to 
evolve independently
+ * from the message.
+ */
+case class ExecutorDecommissionState(message: String,

Review comment:
   Yes, decoupling +1





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #29452: [SPARK-32643][CORE] Consolidate state decommissioning in the TaskSchedulerImpl realm

2020-08-18 Thread GitBox


Ngone51 commented on a change in pull request #29452:
URL: https://github.com/apache/spark/pull/29452#discussion_r471997617



##
File path: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
##
@@ -18,11 +18,21 @@
 package org.apache.spark.scheduler
 
 /**
- * Provides more detail when an executor is being decommissioned.
+ * Message providing more detail when an executor is being decommissioned.
  * @param message Human readable reason for why the decommissioning is 
happening.
  * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in 
other places) is
  * being decommissioned too. Used to infer if the 
shuffle data might
  * be lost even if the external shuffle service is 
enabled.
  */
 private[spark]
 case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: 
Boolean)
+
+/**
+ * State related to decommissioning that is kept by the TaskSchedulerImpl. 
This state is derived
+ * from the info message above but it is kept distinct to allow the state to 
evolve independently
+ * from the message.
+ */
+case class ExecutorDecommissionState(message: String,

Review comment:
   > So I felt that adding the timestamp field to a "message' is not a good 
idea because it bloats it un-necessarily with information it does not need. It 
also raises the question what does that timestamp even mean ? Timestamp of the 
executor (which can originate this message) or timestamp of the driver ?
   
   
   The timestamp will be used to calculate the `canExceedDeadline` in 
`TaskSetManager` later? And the timestamp means the approximate time the 
executor starts to decommission.
   
   
   > Its not a problem to couple these -- its just a preference to not couple 
them so that we can change them as time goes by (evolve them). Having a copy is 
redundant but it is also flexible.
   
   I just can not imagine what kind of evolution would make things to be hard 
to handle after we couple them. Actually, what worries me more is that 
`ExecutorDecommissionInfo` might not satisfy the requirement of other 
clusters(if we'd support them in the future). That could be a reason that we 
should not couple `ExecutorDecommissionState` with `ExecutorDecommissionInfo`. 
This also reminds me again that we'd better have an interface for 
`ExecutorDecommissionInfo` like `ExecutorLossReason`, so we could have a way to 
work with other cluster managers rather than Standalone only.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #29452: [SPARK-32643][CORE] Consolidate state decommissioning in the TaskSchedulerImpl realm

2020-08-18 Thread GitBox


Ngone51 commented on a change in pull request #29452:
URL: https://github.com/apache/spark/pull/29452#discussion_r471974168



##
File path: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
##
@@ -18,11 +18,21 @@
 package org.apache.spark.scheduler
 
 /**
- * Provides more detail when an executor is being decommissioned.
+ * Message providing more detail when an executor is being decommissioned.
  * @param message Human readable reason for why the decommissioning is 
happening.
  * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in 
other places) is
  * being decommissioned too. Used to infer if the 
shuffle data might
  * be lost even if the external shuffle service is 
enabled.
  */
 private[spark]
 case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: 
Boolean)
+
+/**
+ * State related to decommissioning that is kept by the TaskSchedulerImpl. 
This state is derived
+ * from the info message above but it is kept distinct to allow the state to 
evolve independently
+ * from the message.
+ */
+case class ExecutorDecommissionState(message: String,
+ // Timestamp in milliseconds when 
decommissioning was triggered
+ tsMillis: Long,
+ isHostDecommissioned: Boolean)

Review comment:
   wrong indent





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #29452: [SPARK-32643][CORE] Consolidate state decommissioning in the TaskSchedulerImpl realm

2020-08-18 Thread GitBox


Ngone51 commented on a change in pull request #29452:
URL: https://github.com/apache/spark/pull/29452#discussion_r471974036



##
File path: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
##
@@ -18,11 +18,21 @@
 package org.apache.spark.scheduler
 
 /**
- * Provides more detail when an executor is being decommissioned.
+ * Message providing more detail when an executor is being decommissioned.
  * @param message Human readable reason for why the decommissioning is 
happening.
  * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in 
other places) is
  * being decommissioned too. Used to infer if the 
shuffle data might
  * be lost even if the external shuffle service is 
enabled.
  */
 private[spark]
 case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: 
Boolean)
+
+/**
+ * State related to decommissioning that is kept by the TaskSchedulerImpl. 
This state is derived
+ * from the info message above but it is kept distinct to allow the state to 
evolve independently
+ * from the message.
+ */
+case class ExecutorDecommissionState(message: String,

Review comment:
   I'd even prefer `ExecutorDecommissionInfo(message: String, tsMillis: 
Long, isHostDecommissioned: Boolean)` to make codes simpler...and I don't 
understand "messages to evolve quite rapidly"...the `message` parameter is a 
`String` type, it should support any arbitrary strings. Why it's a problem?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org