toujours33 commented on code in PR #38711:

@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
    * Called by the TaskSetManager when it decides a speculative task is needed.
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+, taskIndex))

Review Comment:
   I constructed a case with shuffle retry and logged `taskIndex` and 
`partitionId` recorded in task events `SparkListenerTaskStart` & 
   When stage submitted normally(Not a retry stage or more broadly, not a 
partial task submission), `taskIndex` is equal to `partitionId`:
   <img width="1284" alt="image" 
   But when shuffle stage resubmitted with partial tasks, `taskIndex` is not 
necessarily equal to `partitionId`:
   <img width="1278" alt="image" 
   So if we use `partitionId` instead of `taskIndex` for 
`SpeculativeTaskSubmitted `, we won't release pending speculative tasks which 
the normal task is finished before starting the speculative task  and mislead 
the calculation of target executors, for `taskIndex` may not equal to 

