mridulm commented on code in PR #49270:
URL: https://github.com/apache/spark/pull/49270#discussion_r1899391284


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -206,12 +208,16 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       // normally done by TaskSetManager
       taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
       taskSets += taskSet
+      val taskPartitionIds = new HashSet[Int]()
+      taskSet.tasks.foreach(task => taskPartitionIds += task.partitionId)
+      runningTaskInfos.put(taskSet.stageId, taskPartitionIds)

Review Comment:
   nit:
   ```suggestion
         runningTaskInfos.put(taskSet.stageId, new HashSet[Int]() ++ 
taskSet.tasks..map(_.partitionId))
   ```



##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -2252,6 +2268,46 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     assert(scheduler.activeJobs.isEmpty)
   }
 
+  test("SPARK-50648: when job is cancelled during shuffle retry in parent 
stage, " +

Review Comment:
   Nice test !



##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -393,6 +399,16 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
         handleShuffleMergeFinalized(shuffleMapStage, 
shuffleMapStage.shuffleDep.shuffleMergeId)
       }
     }
+
+    override private[scheduler] def handleTaskCompletion(event: 
CompletionEvent): Unit = {
+      super.handleTaskCompletion(event)
+      if (runningTaskInfos.contains(event.task.stageId)) {
+        runningTaskInfos(event.task.stageId) -= event.task.partitionId
+        if (runningTaskInfos(event.task.stageId).isEmpty) {
+          runningTaskInfos.remove(event.task.stageId)
+        }
+      }

Review Comment:
   nit: 
   ```suggestion
         runningTaskInfos.get(event.task.stageId).foreach{ partitions =>
           partitions -= event.task.partitionId
           if (partitions.isEmpty) runningTaskInfos.remove(event.task.stageId)
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to