sleep1661 commented on a change in pull request #33872:
URL: https://github.com/apache/spark/pull/33872#discussion_r700725521
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
##########
@@ -1995,6 +2000,61 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
assert(!normalTSM.runningTasksSet.contains(taskId))
}
+ test("SPARK-36575: Executor lost cause task hang") {
+ val taskScheduler = setupScheduler()
+
+ val resultGetter = new TaskResultGetter(sc.env, taskScheduler) {
+ override protected val getTaskResultExecutor: ExecutorService =
+ ThreadUtils.newDaemonFixedThreadPool(1, "task-result-getter")
+ def taskResultExecutor() : ExecutorService = getTaskResultExecutor
+ }
+ taskScheduler.taskResultGetter = resultGetter
+
+ val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
+ new WorkerOffer("executor1", "host1", 1))
+ val task1 = new ShuffleMapTask(1, 0, null, new Partition {
+ override def index: Int = 0
+ }, Seq(TaskLocation("host0", "executor0")), new Properties, null)
+
+ val task2 = new ShuffleMapTask(1, 0, null, new Partition {
+ override def index: Int = 0
+ }, Seq(TaskLocation("host1", "executor1")), new Properties, null)
+
+ val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0)
+
+ taskScheduler.submitTasks(taskSet)
+ val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+ assert(2 === taskDescriptions.length)
+
+ val ser = sc.env.serializer.newInstance()
+ val directResult = new DirectTaskResult[Int](ser.serialize(1), Seq(),
Array.empty)
+ val resultBytes = ser.serialize(directResult)
+
+ // make getTaskResultExecutor busy
+ import scala.language.reflectiveCalls
+ resultGetter.taskResultExecutor().submit( new Runnable {
+ override def run(): Unit = Thread.sleep(100)
+ })
+
+ // task1 finished
+ taskScheduler.statusUpdate(
+ tid = taskDescriptions(0).taskId,
+ state = TaskState.FINISHED,
+ serializedData = resultBytes
+ )
+
+ // mark executor heartbeat timed out
+ taskScheduler.executorLost(taskDescriptions(0).executorId,
ExecutorProcessLost("Executor " +
+ "heartbeat timed out"))
+
+ // Wait a while until all events are processed
+ Thread.sleep(100)
Review comment:
Based on your question, I realize that some details are indeed missing.
So reorganize the whole process based on the log. Finally, it was found that
`TaskSetManager.executorLost` was executed twice, and the second time resulted
in `tasksSuccessful -= 1`, resulting in `tasksSuccessful` always less than
`numTasks` .
The reason `TaskSetManager.executorLost` is executed twice is because
`Heartbeat.expireDeadHosts` executes `TaskSchedulerImpl.executorLost` for the
first time, and then executes `sc.killAndReplaceExecutor`. This issue occurred
in version 2.3.3. `Heartbeat.expireDeadHosts` had adjusted the logic of the
master branch, so I am not sure whether the stage hang issue was existed in new
version. BTW, I updated the JIRA description, added more detail log.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]