Ngone51 commented on a change in pull request #34834:
URL: https://github.com/apache/spark/pull/34834#discussion_r771442414



##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -2244,6 +2244,80 @@ class TaskSetManagerSuite
     // After 3s have elapsed now the task is marked as speculative task
     assert(sched.speculativeTasks.size == 1)
   }
+
+  test("SPARK-37580 task failed reach max failure threshold should check if 
another attempt " +
+    "succeeded before abort the stage") {
+    sc = new SparkContext("local", "test")
+    // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.6)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 
("exec3", "host3"))
+    sched.backend = mock(classOf[SchedulerBackend])
+    val taskSet = FakeTask.createTaskSet(3)
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+
+    // Offer resources for 3 task to start
+    val tasks = new ArrayBuffer[TaskDescription]()
+    for ((k, v) <- List("exec1" -> "host1", "exec2" -> "host2", "exec3" -> 
"host3")) {
+      val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+      assert(taskOption.isDefined)
+      val task = taskOption.get
+      assert(task.executorId === k)
+      tasks += task
+    }
+    assert(sched.startedTasks.toSet === (0 until 3).toSet)
+
+    def runningTaskForIndex(index: Int): TaskDescription = {
+      tasks.find { task =>
+        task.index == index && !sched.endedTasks.contains(task.taskId)
+      }.getOrElse {
+        throw new RuntimeException(s"couldn't find index $index in " +
+          s"tasks: ${tasks.map { t => t.index -> t.taskId }} with endedTasks:" 
+
+          s" ${sched.endedTasks.keys}")
+      }
+    }
+    clock.advance(1)
+
+    // running task with taskId(index 1) fail 3 times (not enough to abort the 
stage)
+    (0 until 3).foreach { attempt =>
+      val task = runningTaskForIndex(1)
+      logInfo(s"failing task $task")

Review comment:
       We usually don't add logs to the test. Could you use assertion instead 
if you really want to know if the task is failed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to