Ngone51 commented on a change in pull request #33872:
URL: https://github.com/apache/spark/pull/33872#discussion_r701033071
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
##########
@@ -1995,6 +2000,61 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
assert(!normalTSM.runningTasksSet.contains(taskId))
}
+ test("SPARK-36575: Executor lost cause task hang") {
+ val taskScheduler = setupScheduler()
+
+ val resultGetter = new TaskResultGetter(sc.env, taskScheduler) {
+ override protected val getTaskResultExecutor: ExecutorService =
+ ThreadUtils.newDaemonFixedThreadPool(1, "task-result-getter")
+ def taskResultExecutor() : ExecutorService = getTaskResultExecutor
+ }
+ taskScheduler.taskResultGetter = resultGetter
+
+ val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
+ new WorkerOffer("executor1", "host1", 1))
+ val task1 = new ShuffleMapTask(1, 0, null, new Partition {
+ override def index: Int = 0
+ }, Seq(TaskLocation("host0", "executor0")), new Properties, null)
+
+ val task2 = new ShuffleMapTask(1, 0, null, new Partition {
+ override def index: Int = 0
+ }, Seq(TaskLocation("host1", "executor1")), new Properties, null)
+
+ val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0)
+
+ taskScheduler.submitTasks(taskSet)
+ val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+ assert(2 === taskDescriptions.length)
+
+ val ser = sc.env.serializer.newInstance()
+ val directResult = new DirectTaskResult[Int](ser.serialize(1), Seq(),
Array.empty)
+ val resultBytes = ser.serialize(directResult)
+
+ // make getTaskResultExecutor busy
+ import scala.language.reflectiveCalls
+ resultGetter.taskResultExecutor().submit( new Runnable {
+ override def run(): Unit = Thread.sleep(100)
+ })
+
+ // task1 finished
+ taskScheduler.statusUpdate(
+ tid = taskDescriptions(0).taskId,
+ state = TaskState.FINISHED,
+ serializedData = resultBytes
+ )
+
+ // mark executor heartbeat timed out
+ taskScheduler.executorLost(taskDescriptions(0).executorId,
ExecutorProcessLost("Executor " +
+ "heartbeat timed out"))
+
+ // Wait a while until all events are processed
+ Thread.sleep(100)
Review comment:
Ok, I checked the logs in JIRA with the 2.3 code. Now I can guess how
`TaskSetManager.executorLost` can be called twice. The first time is called by
`TaskSchedulerImpl.executorLost` in `HeartbeatReceiver`, and the second time
could be triggered by `onDisconnect -> removeExecutor -> executorLost` due to
`sc.killAndReplaceExecutor`. And the second time can only take effect when a
`resourceOffer` request happens before it so that the `executorId` can be added
back to `executorIdToHost`. But, as you can see,
https://github.com/apache/spark/blob/75cc3b2da9ee0b51ecf0f13169f2b634e36a60c4/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L936-L949
`tasksSuccessful -= 1` only happens when `successful(index)=true`. And after
first `executorLost`, `successful(index)` is already `false`. So, I think
`tasksSuccessful -= 1` wouldn't happen the second time.
I hope you can provide the complete or more logs if possible so that we can
figure out the what's the real issue here. For now, I still can't see how the
stage gets hang.
Beside, in Master, `TaskSchedulerImpl.executorLost` is no longer directly
called in `HeartbeatReceiver`. Instead, it's delegated to the
`CoarseGrainedSchedulerBackend`. And since `sc.killAndReplaceExecutor` would
also be delegated to the `CoarseGrainedSchedulerBackend` at the end, thus,
executor data would be synchronized by `CoarseGrainedSchedulerBackend`. So, I
think call `executorLost` twice won't happen in Master.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]