sleep1661 commented on a change in pull request #33872:
URL: https://github.com/apache/spark/pull/33872#discussion_r699938609



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -870,7 +870,15 @@ private[spark] class TaskSchedulerImpl(
       taskSetManager: TaskSetManager,
       tid: Long,
       taskResult: DirectTaskResult[_]): Unit = synchronized {
-    taskSetManager.handleSuccessfulTask(tid, taskResult)
+    if (taskIdToTaskSetManager.contains(tid)) {
+      taskSetManager.handleSuccessfulTask(tid, taskResult)
+    } else {
+      logError(
+        ("Ignoring update with state finished for TID %s because its task set 
is gone (this is " +

Review comment:
       Ok, I will adjust that.

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -870,7 +870,15 @@ private[spark] class TaskSchedulerImpl(
       taskSetManager: TaskSetManager,
       tid: Long,
       taskResult: DirectTaskResult[_]): Unit = synchronized {
-    taskSetManager.handleSuccessfulTask(tid, taskResult)
+    if (taskIdToTaskSetManager.contains(tid)) {
+      taskSetManager.handleSuccessfulTask(tid, taskResult)
+    } else {
+      logError(

Review comment:
       `logInfo` will be fine.

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
##########
@@ -1995,6 +2000,61 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(!normalTSM.runningTasksSet.contains(taskId))
   }
 
+  test("SPARK-36575: Executor lost cause task hang") {
+    val taskScheduler = setupScheduler()
+
+    val resultGetter = new TaskResultGetter(sc.env, taskScheduler) {
+      override protected val getTaskResultExecutor: ExecutorService =
+        ThreadUtils.newDaemonFixedThreadPool(1, "task-result-getter")
+      def taskResultExecutor() : ExecutorService = getTaskResultExecutor
+    }
+    taskScheduler.taskResultGetter = resultGetter
+
+    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
+      new WorkerOffer("executor1", "host1", 1))
+    val task1 = new ShuffleMapTask(1, 0, null, new Partition {
+      override def index: Int = 0
+    }, Seq(TaskLocation("host0", "executor0")), new Properties, null)
+
+    val task2 = new ShuffleMapTask(1, 0, null, new Partition {
+      override def index: Int = 0
+    }, Seq(TaskLocation("host1", "executor1")), new Properties, null)
+
+    val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0)
+
+    taskScheduler.submitTasks(taskSet)
+    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    assert(2 === taskDescriptions.length)
+
+    val ser = sc.env.serializer.newInstance()
+    val directResult = new DirectTaskResult[Int](ser.serialize(1), Seq(), 
Array.empty)
+    val resultBytes = ser.serialize(directResult)
+
+    // make getTaskResultExecutor busy
+    import scala.language.reflectiveCalls
+    resultGetter.taskResultExecutor().submit( new Runnable {
+      override def run(): Unit = Thread.sleep(100)
+    })
+
+    // task1 finished
+    taskScheduler.statusUpdate(
+      tid = taskDescriptions(0).taskId,
+      state = TaskState.FINISHED,
+      serializedData = resultBytes
+    )
+
+    // mark executor heartbeat timed out
+    taskScheduler.executorLost(taskDescriptions(0).executorId, 
ExecutorProcessLost("Executor " +
+      "heartbeat timed out"))
+
+    // Wait a while until all events are processed
+    Thread.sleep(100)

Review comment:
       I had optimized it, you can review it if you have time.

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
##########
@@ -1995,6 +2000,61 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(!normalTSM.runningTasksSet.contains(taskId))
   }
 
+  test("SPARK-36575: Executor lost cause task hang") {
+    val taskScheduler = setupScheduler()
+
+    val resultGetter = new TaskResultGetter(sc.env, taskScheduler) {
+      override protected val getTaskResultExecutor: ExecutorService =
+        ThreadUtils.newDaemonFixedThreadPool(1, "task-result-getter")
+      def taskResultExecutor() : ExecutorService = getTaskResultExecutor
+    }
+    taskScheduler.taskResultGetter = resultGetter
+
+    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
+      new WorkerOffer("executor1", "host1", 1))
+    val task1 = new ShuffleMapTask(1, 0, null, new Partition {
+      override def index: Int = 0
+    }, Seq(TaskLocation("host0", "executor0")), new Properties, null)
+
+    val task2 = new ShuffleMapTask(1, 0, null, new Partition {
+      override def index: Int = 0
+    }, Seq(TaskLocation("host1", "executor1")), new Properties, null)
+
+    val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0)
+
+    taskScheduler.submitTasks(taskSet)
+    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    assert(2 === taskDescriptions.length)
+
+    val ser = sc.env.serializer.newInstance()
+    val directResult = new DirectTaskResult[Int](ser.serialize(1), Seq(), 
Array.empty)
+    val resultBytes = ser.serialize(directResult)
+
+    // make getTaskResultExecutor busy
+    import scala.language.reflectiveCalls
+    resultGetter.taskResultExecutor().submit( new Runnable {
+      override def run(): Unit = Thread.sleep(100)
+    })
+
+    // task1 finished
+    taskScheduler.statusUpdate(
+      tid = taskDescriptions(0).taskId,
+      state = TaskState.FINISHED,
+      serializedData = resultBytes
+    )
+
+    // mark executor heartbeat timed out
+    taskScheduler.executorLost(taskDescriptions(0).executorId, 
ExecutorProcessLost("Executor " +
+      "heartbeat timed out"))
+
+    // Wait a while until all events are processed
+    Thread.sleep(100)

Review comment:
       I had optimized it, you can review it if you have time.
   
   > I don't understand how the issue here leads to the stage hang. From what I 
see now, the task is considered a failed task first (due to executor loss) and 
reenqueue to the pending list, then it's considered a successful task later 
(due to statusupdate event). This does cause some inconsistent status but 
doesn't seem to cause the stage hang. IIRC, the stage can also complete even if 
the pending list isn't empty.
   > 
   > I probably missing something here. Could you elaborate more on how it 
causes stage hang?
   
   The issue will  cause `TaskSetManager.successful` wrong result, mark the 
partition index true.  And `TaskSetManager.dequeueTaskFromList` only dequeue 
the unsuccessful index 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L313),
 so this partition index will be never scheduled again.
   I also had update unit test., add assert code `assert(false == 
taskSetManager.successful(taskDescriptions(0).index))`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to