[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-14 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1048446891


##
project/MimaExcludes.scala:
##
@@ -122,6 +122,13 @@ object MimaExcludes {
 // [SPARK-41072][SS] Add the error class STREAM_FAILED to 
StreamingQueryException
 
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"),
 
+
+// [SPARK-41192] add taskIndex in SparkListenerSpeculativeTaskSubmitted 
Event
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.apply"),

Review Comment:
   Fixed ~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-08 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1044148832


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1178,8 +1178,13 @@ private[spark] class DAGScheduler(
 listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
   }
 
-  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = 
{
-listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, 
task.stageAttemptId))
+  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], 
taskIndex: Int): Unit = {
+val speculativeTaskSubmittedEvent =
+  SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)
+// add taskIndex field for Executor Dynamic Allocation
+speculativeTaskSubmittedEvent.updateTaskIndex(taskIndex)
+speculativeTaskSubmittedEvent.updatePartitionId(task.partitionId)
+listenerBus.post(speculativeTaskSubmittedEvent)

Review Comment:
   How about this:
   ```
   case class SparkListenerSpeculativeTaskSubmitted(
   stageId: Int,
   stageAttemptId: Int = 0)
 extends SparkListenerEvent {
 // Note: this is here for backwards-compatibility with older versions of 
this event which
 // didn't stored taskIndex
 private var _taskIndex: Int = -1
 private var _partitionId: Int = -1
   
 def taskIndex: Int = _taskIndex
 def partitionId: Int = _partitionId
   
 def this(stageId: Int, stageAttemptId: Int, taskIndex: Int, partitionId: 
Int) = {
   this(stageId, stageAttemptId)
   _partitionId = partitionId
   _taskIndex = taskIndex
 }
   }
   ```
   
   We can constrict with `taskIndex` and `partitionId`.
   ```
   val speculativeTaskSubmittedEvent = new 
SparkListenerSpeculativeTaskSubmitted(
 task.stageId, task.stageAttemptId, taskIndex, task.partitionId)
   ```
   
   default usage as
   ```
   val speculativeTaskSubmittedEvent  = speculativeTaskSubmittedEvent(stageId, 
stageAttempId)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-07 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1043030519


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
 // Should be 0 when no stages are active.
 private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
 private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-// Number of speculative tasks pending/running in each stageAttempt
-private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
-// The speculative tasks started in each stageAttempt
+// Number of speculative tasks running in each stageAttempt

Review Comment:
   Got it, fix soon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-07 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1043028737


##
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) 
extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Got it, I'll make a commit to try and fix this soon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-07 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r104358


##
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) 
extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   `Is it a lot of change to add support for it ?`
   If you just want to add partitionId as a field of taskEvent, the change will 
not be too much. The question is why do we need `partitionId` in 
`ExecutorAllocationManager`?
   `I would want to keep both of them in sync, and prevent divergence`
   But if you want to keep `partitionId` and `taskIndex` in sync or use 
`partitionId` instead of `taskIndex`,  the changes will be huge, and I don’t 
think it’s appropriate to make changes here~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-07 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1042997408


##
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) 
extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   `Is it a lot of change to add support for it ?`
   If you just want to add `partitionId` as a field of taskEvent, the change 
will not be too much.
   `I would want to keep both of them in sync, and prevent divergence`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-07 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1042129179


##
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) 
extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Since `partitionId` is never used in this PR, is it only added for possible 
future use? 
   If so, I'd prefer to add this in another pr to solve problem given 
[SPARK-37831](https://issues.apache.org/jira/browse/SPARK-37831)~ @mridulm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-05 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1040411406


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
 // Should be 0 when no stages are active.
 private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
 private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-// Number of speculative tasks pending/running in each stageAttempt
-private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
-// The speculative tasks started in each stageAttempt
+// Number of speculative tasks running in each stageAttempt
+// TODO(SPARK-41192): We simply need an Int for this.
 private val stageAttemptToSpeculativeTaskIndices =
+  new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()

Review Comment:
   Line is longer than 100 characters



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-04 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1039084796


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
* Called by the TaskSetManager when it decides a speculative task is needed.
*/
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   Maybe we can refer to @LuciferYang 's suggestion: add field in 
`SparkListenerSpeculativeTaskSubmitted` to minimize api change. @mridulm cc - 
@Ngone51 
   https://github.com/apache/spark/pull/38711#discussion_r1037165963



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-04 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1039084796


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
* Called by the TaskSetManager when it decides a speculative task is needed.
*/
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   Maybe we can follow the suggestion pulled by @LuciferYang 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-01 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1037744591


##
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) 
extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Fix like this:
   https://user-images.githubusercontent.com/20420642/205205060-071827bf-bba2-47bc-b98d-7c778e00820d.png";>
   https://user-images.githubusercontent.com/20420642/205205206-fecb2fe3-bbc7-49e0-bf60-af5cc3a09f98.png";>
   cc - @mridulm @Ngone51 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-01 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1037744591


##
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) 
extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Fix like this:
   https://user-images.githubusercontent.com/20420642/205205060-071827bf-bba2-47bc-b98d-7c778e00820d.png";>
   https://user-images.githubusercontent.com/20420642/205205206-fecb2fe3-bbc7-49e0-bf60-af5cc3a09f98.png";>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-30 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1036003754


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
* Called by the TaskSetManager when it decides a speculative task is needed.
*/
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   I constructed a case with shuffle retry and logged `taskIndex` and 
`partitionId` recorded in task events `SparkListenerTaskStart` & 
`SparkListenerTaskEnd`.
   When stage submitted normally(Not a retry stage or more broadly, not a 
partial task submission), `taskIndex` is equal to `partitionId`:
   https://user-images.githubusercontent.com/20420642/204814905-59625694-2e37-4484-8b10-522af8f0d61e.png";>
   But when shuffle stage resubmitted with partial tasks, `taskIndex` is not 
necessarily equal to `partitionId`:
   https://user-images.githubusercontent.com/20420642/204815345-eef0eada-62b3-4e7c-b51d-1e9748e2235c.png";>
   
   So if we use `partitionId` instead of `taskIndex` for 
`SpeculativeTaskSubmitted `, we won't release pending speculative tasks which 
the normal task is finished before starting the speculative task  and mislead 
the calculation of target executors, for `taskIndex` may not equal to 
`partitionId`
   
   @mridulm Please check if there is any problem~ cc- @Ngone51 @LuciferYang 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-30 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1036003754


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
* Called by the TaskSetManager when it decides a speculative task is needed.
*/
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   I constructed a case with shuffle retry and logged `taskIndex` and 
`partitionId` recorded in task events `SparkListenerTaskStart` & 
`SparkListenerTaskEnd`.
   When stage submitted normally(Not a retry stage or more broadly, not a 
partial task submission), `taskIndex` is equal to `partitionId`:
   https://user-images.githubusercontent.com/20420642/204814905-59625694-2e37-4484-8b10-522af8f0d61e.png";>
   But when shuffle stage resubmitted with partial tasks, `taskIndex` is not 
necessarily equal to `partitionId`:
   https://user-images.githubusercontent.com/20420642/204815345-eef0eada-62b3-4e7c-b51d-1e9748e2235c.png";>
   
   So if we use `partitionId` instead of `taskIndex` for 
`SpeculativeTaskSubmitted `, we won't release pending speculative tasks which 
the normal task is finished before starting the speculative task  and mislead 
the calculation of target executors, for `taskIndex` may not equal to 
`partitionId`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-30 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1036003754


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
* Called by the TaskSetManager when it decides a speculative task is needed.
*/
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   I constructed a case with shuffle retry and logged `taskIndex` and 
`partitionId` recorded in task events `SparkListenerTaskStart` & 
`SparkListenerTaskEnd`.
   When stage submitted normally(Not a retry stage or more broadly, not a 
partial task submission), `taskIndex` is equal to `partitionId`:
   https://user-images.githubusercontent.com/20420642/204814905-59625694-2e37-4484-8b10-522af8f0d61e.png";>
   But when shuffle stage resubmitted with partial tasks, `taskIndex` is not 
necessarily equal to `partitionId`:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-30 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1035806206


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
* Called by the TaskSetManager when it decides a speculative task is needed.
*/
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   I'll check later if it makes a difference when use `partitionId` instead of 
`taskIndex`.
   Btw~ even if we use the `partitionId` instead, we can minimize the change of 
`SpeculativeTaskSubmitted` in `DAGSchedulerEvent`.  But the change to developer 
api `SparkListenerSpeculativeTaskSubmitted` is unavoidable, for 
`SparkListenerSpeculativeTaskSubmitted` now just take `stageId` and 
`stageAttemptId` as arguments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-28 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034404863


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -775,16 +779,14 @@ private[spark] class ExecutorAllocationManager(
   }
 }
 if (taskEnd.taskInfo.speculative) {
-  stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach 
{_.remove{taskIndex}}
-  // If the previous task attempt succeeded first and it was the last 
task in a stage,
-  // the stage may have been removed before handing this speculative 
TaskEnd event.
-  if (stageAttemptToNumSpeculativeTasks.contains(stageAttempt)) {
-stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
-  }
+  
stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach(_.remove(taskIndex))
 }
 
 taskEnd.reason match {
-  case Success | _: TaskKilled =>
+  case Success =>
+// remove speculative task for task finished.

Review Comment:
   Good idea!



##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -810,9 +812,10 @@ private[spark] class ExecutorAllocationManager(
   val stageId = speculativeTask.stageId
   val stageAttemptId = speculativeTask.stageAttemptId
   val stageAttempt = StageAttempt(stageId, stageAttemptId)
+  val taskIndex = speculativeTask.taskIndex
   allocationManager.synchronized {
-stageAttemptToNumSpeculativeTasks(stageAttempt) =
-  stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1
+stageAttemptToUnsubmittedSpeculativeTasks.getOrElseUpdate(stageAttempt,

Review Comment:
   Got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-28 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034403916


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
 // Should be 0 when no stages are active.
 private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
 private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-// Number of speculative tasks pending/running in each stageAttempt
-private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
-// The speculative tasks started in each stageAttempt
+// Number of speculative tasks running in each stageAttempt
+// TODO(SPARK-14492): We simply need an Int for this.
 private val stageAttemptToSpeculativeTaskIndices =
+  new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
+// Number of speculative tasks pending in each stageAttempt
+private val stageAttemptToUnsubmittedSpeculativeTasks =

Review Comment:
   btw, `stageAttemptToPendingSpeculativeTasks` is a good idea to replace 
`stageAttemptToUnsubmittedSpeculativeTasks`, I'll fix it soon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-28 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034403079


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
 // Should be 0 when no stages are active.
 private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
 private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-// Number of speculative tasks pending/running in each stageAttempt
-private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
-// The speculative tasks started in each stageAttempt
+// Number of speculative tasks running in each stageAttempt
+// TODO(SPARK-14492): We simply need an Int for this.
 private val stageAttemptToSpeculativeTaskIndices =
+  new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
+// Number of speculative tasks pending in each stageAttempt
+private val stageAttemptToUnsubmittedSpeculativeTasks =

Review Comment:
   `stageAttemptToSpeculativeTaskIndices` keeps the same meaning as 
`stageAttemptToTaskIndicies`, I would prefer to keep it~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-28 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034397331


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
 // Should be 0 when no stages are active.
 private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
 private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-// Number of speculative tasks pending/running in each stageAttempt
-private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
-// The speculative tasks started in each stageAttempt
+// Number of speculative tasks running in each stageAttempt
+// TODO(SPARK-14492): We simply need an Int for this.

Review Comment:
   Sorry, typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-28 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034274241


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -643,10 +643,11 @@ private[spark] class ExecutorAllocationManager(
 // Should be 0 when no stages are active.
 private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
 private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-// Number of speculative tasks pending/running in each stageAttempt
-private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
-// The speculative tasks started in each stageAttempt
+// Number of speculative tasks running in each stageAttempt
 private val stageAttemptToSpeculativeTaskIndices =
+  new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()

Review Comment:
   I will add a todo to mark this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-28 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034273628


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -722,9 +723,8 @@ private[spark] class ExecutorAllocationManager(
 // because the attempt may still have running tasks,
 // even after another attempt for the stage is submitted.
 stageAttemptToNumTasks -= stageAttempt
-stageAttemptToNumSpeculativeTasks -= stageAttempt
+stageAttemptToUnsubmittedSpeculativeTasks -= stageAttempt

Review Comment:
   Got



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-28 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034271797


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
* Called by the TaskSetManager when it decides a speculative task is needed.
*/
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   In some scenarios, `task.partitionId` is not the same as `taskIndex` ?.
   eg. when shuffleMapStage retry, taskSet only contains tasks resubmitted due 
to fetch failure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-21 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1028830007


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -749,8 +749,10 @@ private[spark] class ExecutorAllocationManager(
   stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
 // If this is the last pending task, mark the scheduler queue as empty
 if (taskStart.taskInfo.speculative) {
-  stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
-new mutable.HashSet[Int]) += taskIndex
+  stageAttemptToSpeculativeTaskIndices
+.getOrElseUpdate(stageAttempt, new 
mutable.HashSet[Int]).add(taskIndex)
+  stageAttemptToUnsubmittedSpeculativeTasks

Review Comment:
   I think it's ok here.
   As PR describes, 
`stageAttemptToUnsubmittedSpeculativeTasks.remove(taskIndex)` should be called 
only when speculative task start or task finished(whether it is speculative or 
not). 
   Line 754 will do nothing if this speculative task has been removed when task 
finished, which is expected and will be ok.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-21 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1028811606


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -749,8 +749,10 @@ private[spark] class ExecutorAllocationManager(
   stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
 // If this is the last pending task, mark the scheduler queue as empty
 if (taskStart.taskInfo.speculative) {
-  stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
-new mutable.HashSet[Int]) += taskIndex
+  stageAttemptToSpeculativeTaskIndices
+.getOrElseUpdate(stageAttempt, new 
mutable.HashSet[Int]).add(taskIndex)
+  stageAttemptToUnsubmittedSpeculativeTasks

Review Comment:
   1. How about `stageAttemptToUnsubmittedSpeculativeTasks 
.get(stageAttempt).foreach(_.remove(taskIndex))`.
   In this way, nothing will changed even if 
`stageAttemptToUnsubmittedSpecificativeTasks` not initialized.
   2. For `stageAttemptToUnsubmittedSpeculativeTasks` is thread-safe, 
`stageAttemptToUnsubmittedSpeculativeTasks.contains(stageAttempt)` would be 
correct anyway



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-21 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1028807362


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -749,8 +749,10 @@ private[spark] class ExecutorAllocationManager(
   stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
 // If this is the last pending task, mark the scheduler queue as empty
 if (taskStart.taskInfo.speculative) {
-  stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
-new mutable.HashSet[Int]) += taskIndex
+  stageAttemptToSpeculativeTaskIndices

Review Comment:
   `allocationManager.synchronized` will be called to protect this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-11-21 Thread GitBox


toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1028788976


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -774,17 +776,16 @@ private[spark] class ExecutorAllocationManager(
 removeStageFromResourceProfileIfUnused(stageAttempt)
   }
 }
+

Review Comment:
   Done~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org