mridulm commented on a change in pull request #33872:
URL: https://github.com/apache/spark/pull/33872#discussion_r699797929
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -870,7 +870,15 @@ private[spark] class TaskSchedulerImpl(
taskSetManager: TaskSetManager,
tid: Long,
taskResult: DirectTaskResult[_]): Unit = synchronized {
- taskSetManager.handleSuccessfulTask(tid, taskResult)
+ if (taskIdToTaskSetManager.contains(tid)) {
+ taskSetManager.handleSuccessfulTask(tid, taskResult)
+ } else {
+ logError(
+ ("Ignoring update with state finished for TID %s because its task set
is gone (this is " +
Review comment:
nit: s"Ignoring update with state finished for TID $tid because ...
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -870,7 +870,15 @@ private[spark] class TaskSchedulerImpl(
taskSetManager: TaskSetManager,
tid: Long,
taskResult: DirectTaskResult[_]): Unit = synchronized {
- taskSetManager.handleSuccessfulTask(tid, taskResult)
+ if (taskIdToTaskSetManager.contains(tid)) {
+ taskSetManager.handleSuccessfulTask(tid, taskResult)
+ } else {
+ logError(
Review comment:
Make this a `logWarn` or `logInfo` even ? This is not something users
can mitigate or cause issues once we have handled it this way.
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
##########
@@ -1995,6 +2000,61 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
assert(!normalTSM.runningTasksSet.contains(taskId))
}
+ test("SPARK-36575: Executor lost cause task hang") {
+ val taskScheduler = setupScheduler()
+
+ val resultGetter = new TaskResultGetter(sc.env, taskScheduler) {
+ override protected val getTaskResultExecutor: ExecutorService =
+ ThreadUtils.newDaemonFixedThreadPool(1, "task-result-getter")
+ def taskResultExecutor() : ExecutorService = getTaskResultExecutor
+ }
+ taskScheduler.taskResultGetter = resultGetter
+
+ val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
+ new WorkerOffer("executor1", "host1", 1))
+ val task1 = new ShuffleMapTask(1, 0, null, new Partition {
+ override def index: Int = 0
+ }, Seq(TaskLocation("host0", "executor0")), new Properties, null)
+
+ val task2 = new ShuffleMapTask(1, 0, null, new Partition {
+ override def index: Int = 0
+ }, Seq(TaskLocation("host1", "executor1")), new Properties, null)
+
+ val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0)
+
+ taskScheduler.submitTasks(taskSet)
+ val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+ assert(2 === taskDescriptions.length)
+
+ val ser = sc.env.serializer.newInstance()
+ val directResult = new DirectTaskResult[Int](ser.serialize(1), Seq(),
Array.empty)
+ val resultBytes = ser.serialize(directResult)
+
+ // make getTaskResultExecutor busy
+ import scala.language.reflectiveCalls
+ resultGetter.taskResultExecutor().submit( new Runnable {
+ override def run(): Unit = Thread.sleep(100)
+ })
+
+ // task1 finished
+ taskScheduler.statusUpdate(
+ tid = taskDescriptions(0).taskId,
+ state = TaskState.FINISHED,
+ serializedData = resultBytes
+ )
+
+ // mark executor heartbeat timed out
+ taskScheduler.executorLost(taskDescriptions(0).executorId,
ExecutorProcessLost("Executor " +
+ "heartbeat timed out"))
+
+ // Wait a while until all events are processed
+ Thread.sleep(100)
Review comment:
Using thread.sleep's here will result in flakey tests.
Instead, mock the relevant methods and use latches to control behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]