[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-12 Thread GitBox


Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1045549245


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1178,8 +1178,13 @@ private[spark] class DAGScheduler(
 listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
   }
 
-  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = 
{
-listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, 
task.stageAttemptId))
+  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], 
taskIndex: Int): Unit = {
+val speculativeTaskSubmittedEvent =
+  SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)
+// add taskIndex field for Executor Dynamic Allocation
+speculativeTaskSubmittedEvent.updateTaskIndex(taskIndex)
+speculativeTaskSubmittedEvent.updatePartitionId(task.partitionId)
+listenerBus.post(speculativeTaskSubmittedEvent)

Review Comment:
   Sounds better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-08 Thread GitBox


Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1044062231


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1178,8 +1178,13 @@ private[spark] class DAGScheduler(
 listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
   }
 
-  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = 
{
-listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, 
task.stageAttemptId))
+  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], 
taskIndex: Int): Unit = {
+val speculativeTaskSubmittedEvent =
+  SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)
+// add taskIndex field for Executor Dynamic Allocation
+speculativeTaskSubmittedEvent.updateTaskIndex(taskIndex)
+speculativeTaskSubmittedEvent.updatePartitionId(task.partitionId)
+listenerBus.post(speculativeTaskSubmittedEvent)

Review Comment:
   How about adding a new apply() to `SparkListenerSpeculativeTaskSubmitted` to 
simplify the construction? e.g.,
   ```scala
   object SparkListenerSpeculativeTaskSubmitted {
 def apply(stageId: Int, stageAttemptId: Int, taskIndex: Int, partitionId: 
Int)
   : SparkListenerSpeculativeTaskSubmitted = {
   val speculativeTaskSubmittedEvent =
 SparkListenerSpeculativeTaskSubmitted(stageId, stageAttemptId)
   // add taskIndex field for Executor Dynamic Allocation
   speculativeTaskSubmittedEvent.updateTaskIndex(taskIndex)
   speculativeTaskSubmittedEvent.updatePartitionId(task.partitionId)
   speculativeTaskSubmittedEvent
 }
   ```
   so here we can construct the `SparkListenerSpeculativeTaskSubmitted` like `  
SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId, 
taskIndex, task.partitionId)
   ` and I think it'd be less error-prone in case of users forget to call 
updates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-08 Thread GitBox


Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1044062231


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1178,8 +1178,13 @@ private[spark] class DAGScheduler(
 listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
   }
 
-  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = 
{
-listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, 
task.stageAttemptId))
+  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], 
taskIndex: Int): Unit = {
+val speculativeTaskSubmittedEvent =
+  SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)
+// add taskIndex field for Executor Dynamic Allocation
+speculativeTaskSubmittedEvent.updateTaskIndex(taskIndex)
+speculativeTaskSubmittedEvent.updatePartitionId(task.partitionId)
+listenerBus.post(speculativeTaskSubmittedEvent)

Review Comment:
   How about adding a new apply() to `SparkListenerSpeculativeTaskSubmitted` to 
simplify the construction? e.g.,
   ```scala
   object SparkListenerSpeculativeTaskSubmitted {
 def apply(stageId: Int, stageAttemptId: Int, taskIndex: Int, partitionId: 
Int)
   : SparkListenerSpeculativeTaskSubmitted = {
   val speculativeTaskSubmittedEvent =
 SparkListenerSpeculativeTaskSubmitted(stageId, stageAttemptId)
   // add taskIndex field for Executor Dynamic Allocation
   speculativeTaskSubmittedEvent.updateTaskIndex(taskIndex)
   speculativeTaskSubmittedEvent.updatePartitionId(task.partitionId)
   speculativeTaskSubmittedEvent
 }
   ```
   so here we can construct the `SparkListenerSpeculativeTaskSubmitted` like `  
SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId, 
taskIndex, task.partitionId)
   `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-08 Thread GitBox


Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1044060202


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
* Called by the TaskSetManager when it decides a speculative task is needed.
*/
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   Don‘t you want to propagate `partitionId` as well since you've added it to 
`SpeculativeTaskSubmitted`.



##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
* Called by the TaskSetManager when it decides a speculative task is needed.
*/
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   Don‘t you want to propagate `partitionId` as well since you've added it to 
`SpeculativeTaskSubmitted`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-07 Thread GitBox


Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1042993398


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -733,7 +735,7 @@ private[spark] class ExecutorAllocationManager(
 
 // If this is the last stage with pending tasks, mark the scheduler 
queue as empty
 // This is needed in case the stage is aborted for any reason
-if (stageAttemptToNumTasks.isEmpty && 
stageAttemptToNumSpeculativeTasks.isEmpty) {
+if (stageAttemptToNumTasks.isEmpty && 
stageAttemptToPendingSpeculativeTasks.isEmpty) {

Review Comment:
   Should it be:
   ```suggestion
   if (stageAttemptToNumTasks.isEmpty && 
stageAttemptToPendingSpeculativeTasks.isEmpty && 
stageAttemptToSpeculativeTaskIndices.isEmpty) {
   ```
   
   `stageAttemptToNumSpeculativeTasks` used to include both pending/running 
speculative tasks but not it's splitted.



##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -733,7 +735,7 @@ private[spark] class ExecutorAllocationManager(
 
 // If this is the last stage with pending tasks, mark the scheduler 
queue as empty
 // This is needed in case the stage is aborted for any reason
-if (stageAttemptToNumTasks.isEmpty && 
stageAttemptToNumSpeculativeTasks.isEmpty) {
+if (stageAttemptToNumTasks.isEmpty && 
stageAttemptToPendingSpeculativeTasks.isEmpty) {

Review Comment:
   Should it be:
   ```suggestion
   if (stageAttemptToNumTasks.isEmpty && 
stageAttemptToPendingSpeculativeTasks.isEmpty && 
stageAttemptToSpeculativeTaskIndices.isEmpty) {
   ```
   
   `stageAttemptToNumSpeculativeTasks` used to include both pending/running 
speculative tasks but now it's splitted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-07 Thread GitBox


Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1042987205


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
 // Should be 0 when no stages are active.
 private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
 private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-// Number of speculative tasks pending/running in each stageAttempt
-private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
-// The speculative tasks started in each stageAttempt
+// Number of speculative tasks running in each stageAttempt
+// TODO(SPARK-41192): We simply need an Int for this.
 private val stageAttemptToSpeculativeTaskIndices =
+  new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
+// Number of speculative tasks pending in each stageAttempt

Review Comment:
   ```suggestion
   // Map from each stageAttempt to a set of pending speculative task 
indexes
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-07 Thread GitBox


Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1042987003


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
 // Should be 0 when no stages are active.
 private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
 private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-// Number of speculative tasks pending/running in each stageAttempt
-private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
-// The speculative tasks started in each stageAttempt
+// Number of speculative tasks running in each stageAttempt

Review Comment:
   ```suggestion
   // Map from each stageAttempt to a set of running speculative task 
indexes
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-05 Thread GitBox


Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1040337253


##
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) 
extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   +1 to @mridulm 's option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-28 Thread GitBox


Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034375202


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -810,9 +812,10 @@ private[spark] class ExecutorAllocationManager(
   val stageId = speculativeTask.stageId
   val stageAttemptId = speculativeTask.stageAttemptId
   val stageAttempt = StageAttempt(stageId, stageAttemptId)
+  val taskIndex = speculativeTask.taskIndex
   allocationManager.synchronized {
-stageAttemptToNumSpeculativeTasks(stageAttempt) =
-  stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1
+stageAttemptToUnsubmittedSpeculativeTasks.getOrElseUpdate(stageAttempt,

Review Comment:
   "Unsubmitted" here is confusing since the method name here is 
`onSpeculativeTaskSubmitted`.. how about 
`stageAttemptToPendingSpeculativeTasks`?



##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -775,16 +779,14 @@ private[spark] class ExecutorAllocationManager(
   }
 }
 if (taskEnd.taskInfo.speculative) {
-  stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach 
{_.remove{taskIndex}}
-  // If the previous task attempt succeeded first and it was the last 
task in a stage,
-  // the stage may have been removed before handing this speculative 
TaskEnd event.
-  if (stageAttemptToNumSpeculativeTasks.contains(stageAttempt)) {
-stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
-  }
+  
stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach(_.remove(taskIndex))
 }
 
 taskEnd.reason match {
-  case Success | _: TaskKilled =>
+  case Success =>
+// remove speculative task for task finished.

Review Comment:
   ```suggestion
   // Remove pending speculative task in case the normal task is 
finished before starting the speculative task
   ```



##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
 // Should be 0 when no stages are active.
 private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
 private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-// Number of speculative tasks pending/running in each stageAttempt
-private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
-// The speculative tasks started in each stageAttempt
+// Number of speculative tasks running in each stageAttempt
+// TODO(SPARK-14492): We simply need an Int for this.

Review Comment:
   SPARK-41192?



##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
 // Should be 0 when no stages are active.
 private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
 private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-// Number of speculative tasks pending/running in each stageAttempt
-private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
-// The speculative tasks started in each stageAttempt
+// Number of speculative tasks running in each stageAttempt
+// TODO(SPARK-14492): We simply need an Int for this.
 private val stageAttemptToSpeculativeTaskIndices =
+  new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
+// Number of speculative tasks pending in each stageAttempt
+private val stageAttemptToUnsubmittedSpeculativeTasks =

Review Comment:
   How about renaming to `stageAttemptToPendingSpeculativeTasks` and renaming 
`stageAttemptToSpeculativeTaskIndices` to 
`stageAttemptToRunningSpeculativeTaskIndices`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org