toujours33 commented on code in PR #38711:

@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
    * Called by the TaskSetManager when it decides a speculative task is needed.
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+, taskIndex))

Review Comment:
   I constructed a case with shuffle retry and logged `taskIndex` and 
`partitionId` recorded in task events `SparkListenerTaskStart` & 
   When stage submitted normally(Not a retry stage or more broadly, not a 
partial task submission), `taskIndex` is equal to `partitionId`:
   <img width="1284" alt="image" 
   But when shuffle stage resubmitted with partial tasks, `taskIndex` is not 
necessarily equal to `partitionId`:
   <img width="1278" alt="image" 
   So if we use `partitionId` instead of `taskIndex` for 
`SpeculativeTaskSubmitted `, we won't release pending speculative tasks which 
the normal task is finished before starting the speculative task  and mislead 
the calculation of target executors, for `taskIndex` may not equal to 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail:

For queries about this service, please contact Infrastructure at:

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to