sleep1661 commented on a change in pull request #34578:
URL: https://github.com/apache/spark/pull/34578#discussion_r767154891
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
##########
@@ -1995,6 +1999,94 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
assert(!normalTSM.runningTasksSet.contains(taskId))
}
+ test("SPARK-37300: TaskSchedulerImpl should ignore task finished" +
+ " event if its task was finished state") {
+ val taskScheduler = setupScheduler()
+
+ val latch = new CountDownLatch(2)
+ val resultGetter = new TaskResultGetter(sc.env, taskScheduler) {
+ override protected val getTaskResultExecutor: ExecutorService =
+ new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue[Runnable],
+ ThreadUtils.namedThreadFactory("task-result-getter")) {
+ override def execute(command: Runnable): Unit = {
+ super.execute(new Runnable {
+ override def run(): Unit = {
+ command.run()
+ latch.countDown()
+ }
+ })
+ }
+ }
+ def taskResultExecutor() : ExecutorService = getTaskResultExecutor
+ }
+ taskScheduler.taskResultGetter = resultGetter
+
+ val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
+ new WorkerOffer("executor1", "host1", 1))
+ val task1 = new ShuffleMapTask(1, 0, null, new Partition {
+ override def index: Int = 0
+ }, Seq(TaskLocation("host0", "executor0")), new Properties, null)
+
+ val task2 = new ShuffleMapTask(1, 0, null, new Partition {
+ override def index: Int = 1
+ }, Seq(TaskLocation("host1", "executor1")), new Properties, null)
+
+ val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0)
+
+ taskScheduler.submitTasks(taskSet)
+ val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+ assert(2 === taskDescriptions.length)
+
+ val ser = sc.env.serializer.newInstance()
+ val directResult = new DirectTaskResult[Int](ser.serialize(1), Seq(),
Array.empty)
+ val resultBytes = ser.serialize(directResult)
+
+ val busyTask = new Runnable {
+ val lock : Object = new Object
+ var running : AtomicBoolean = new AtomicBoolean(false)
+ override def run(): Unit = {
+ lock.synchronized {
+ running.set(true)
+ lock.wait()
+ }
+ }
+ def markTaskDone: Unit = {
+ lock.synchronized {
+ lock.notify()
+ }
+ }
+ }
+ // make getTaskResultExecutor busy
+ resultGetter.taskResultExecutor().submit(busyTask)
+
+ // task1 finished
+ val tid = taskDescriptions(0).taskId
+ taskScheduler.statusUpdate(
+ tid = tid,
+ state = TaskState.FINISHED,
+ serializedData = resultBytes
+ )
+
+ // mark executor heartbeat timed out
+ taskScheduler.executorLost(taskDescriptions(0).executorId,
ExecutorProcessLost("Executor " +
+ "heartbeat timed out"))
+
+ // Wait busyTask begin running
+ eventually(timeout(500.milliseconds)) {
Review comment:
> Usually, we could set the timeout longer for the eventually to avoid
test flaky..500 ms seems a bit short to me. It might be worth changing to a
longer timer since Jenkins tests haven't passed.
How about 30 seconds ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]