wangyum commented on code in PR #39011:
URL: https://github.com/apache/spark/pull/39011#discussion_r1732356898


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1046,17 +1048,45 @@ private[spark] class TaskSetManager(
 
   /** Called by TaskScheduler when an executor is lost so we can re-enqueue 
our tasks */
   override def executorLost(execId: String, host: String, reason: 
ExecutorLossReason): Unit = {
-    // Re-enqueue any tasks that ran on the failed executor if this is a 
shuffle map stage,
-    // and we are not using an external shuffle server which could serve the 
shuffle outputs.
-    // The reason is the next stage wouldn't be able to fetch the data from 
this dead executor
-    // so we would need to rerun these tasks on other executors.
-    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie) {
+    // Re-enqueue any tasks with potential shuffle data loss that ran on the 
failed executor
+    // if this is a shuffle map stage, and we are not using an external 
shuffle server which
+    // could serve the shuffle outputs or the executor lost is caused by 
decommission (which
+    // can destroy the whole host). The reason is the next stage wouldn't be 
able to fetch the
+    // data from this dead executor so we would need to rerun these tasks on 
other executors.
+    val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
+      (reason.isInstanceOf[ExecutorDecommission] || 
!env.blockManager.externalShuffleServiceEnabled)

Review Comment:
   It seems that this change may cause task re-run many times and can't 
complete for a long running stage if 
    both `spark.decommission.enabled` and `spark.shuffle.service.enabled` are 
enabled.
   
   <img width="1650" alt="image" 
src="https://github.com/user-attachments/assets/342de17f-0515-4a18-b2bb-cf5402a0bb9c";>
   
   <img width="554" alt="image" 
src="https://github.com/user-attachments/assets/eae968f4-8bac-4646-824f-a59d39a5741f";>
   
   Below is the additional log we added:
   ```diff
   ---
    .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 8 ++++++--
    1 file changed, 6 insertions(+), 2 deletions(-)
   
   diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
   index a1a54daf5f8..5846827c832 100644
   --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
   +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
   @@ -1117,6 +1117,7 @@ private[spark] class TaskSetManager(
    
      /** Called by TaskScheduler when an executor is lost so we can re-enqueue 
our tasks */
      override def executorLost(execId: String, host: String, reason: 
ExecutorLossReason): Unit = {
   +    logInfo(s"Executor lost: execId: $execId, host: $host, reason: 
$reason.")
        // Re-enqueue any tasks with potential shuffle data loss that ran on 
the failed executor
        // if this is a shuffle map stage, and we are not using an external 
shuffle server which
        // could serve the shuffle outputs or the executor lost is caused by 
decommission (which
   @@ -1148,8 +1149,11 @@ private[spark] class TaskSetManager(
                //    This shouldn't not happen ideally since TaskSetManager 
handles executor lost first
                //    before DAGScheduler. So the map statues for the 
successful task must be available
                //    at this moment. keep it here in case the handling order 
changes.
   -            locationOpt.exists(_.host != host)
   -
   +            val isShuffleMapOutputLoss = locationOpt.exists(_.host != host)
   +            logInfo(s"Is shuffle map output available: partition id: 
${info.partitionId}, " +
   +              s"tid: ${tid}, locationOpt: ${locationOpt}, " +
   +              s"isShuffleMapOutputLoss: ${isShuffleMapOutputLoss}.")
   +            isShuffleMapOutputLoss
              case _ => false
            }
            // We may have a running task whose partition has been marked as 
successful,
   ```
   ```
   24/08/26 16:56:22 INFO YarnClusterSchedulerBackend: Decommission executors: 
1608
   24/08/26 16:56:22 INFO YarnClusterSchedulerBackend: Notify executor 1608 to 
decommission.
   24/08/26 16:56:22 INFO BlockManagerMasterEndpoint: Mark BlockManagers 
(BlockManagerId(1608, hdc42-mcc10-01-0110-7302-007.company.com, 30502, None)) 
as being decommissioning.
   24/08/26 16:56:22 INFO ExecutorAllocationManager: Executors 1608 removed due 
to idle timeout.
   24/08/26 16:56:23 INFO TaskSetManager: Finished task 4992.1 in stage 7.0 
(TID 16851) in 807662 ms on hdc42-mcc10-01-0710-4001-001.company.com (executor 
1300) (5000/6000)
   24/08/26 16:56:23 INFO TaskSetManager: Finished task 1335.2 in stage 7.0 
(TID 16713) in 903010 ms on hdc42-mcc10-01-0110-7303-009.company.com (executor 
1141) (5001/6000)
   24/08/26 16:56:23 INFO TaskSetManager: Finished task 2290.1 in stage 7.0 
(TID 16573) in 1115189 ms on hdc42-mcc10-01-1110-3305-038.company (executor 
568) (5002/6000)
   24/08/26 16:56:23 INFO TaskSetManager: Finished task 349.1 in stage 7.0 (TID 
16916) in 777120 ms on hdc42-mcc10-01-0110-5803-003.company.com (executor 1345) 
(5003/6000)
   24/08/26 16:56:23 INFO YarnAllocator: Driver requested a total number of 499 
executor(s) for resource profile id: 0.
   24/08/26 16:56:24 INFO YarnClusterScheduler: Executor 1608 on 
hdc42-mcc10-01-0110-7302-007.company.com is decommissioned after 1.3 s.
   24/08/26 16:56:24 INFO TaskSetManager: Executor lost: execId: 1608, host: 
hdc42-mcc10-01-0110-7302-007.company.com, reason: Executor decommission: spark 
scale down.
   24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: 
partition id: 3749, tid: 10302, locationOpt: Some(BlockManagerId(1608, 
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss: 
false.
   24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3749), so 
marking it as still running.
   24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: 
partition id: 763, tid: 16636, locationOpt: Some(BlockManagerId(1608, 
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss: 
false.
   24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 763), so 
marking it as still running.
   24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: 
partition id: 2971, tid: 15433, locationOpt: Some(BlockManagerId(1608, 
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss: 
false.
   24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 2971), so 
marking it as still running.
   24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: 
partition id: 5835, tid: 16587, locationOpt: Some(BlockManagerId(1608, 
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss: 
false.
   24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 5835), so 
marking it as still running.
   24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: 
partition id: 611, tid: 15118, locationOpt: Some(BlockManagerId(1608, 
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss: 
false.
   24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 611), so 
marking it as still running.
   24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: 
partition id: 3750, tid: 10303, locationOpt: Some(BlockManagerId(1608, 
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss: 
false.
   24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3750), so 
marking it as still running.
   24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: 
partition id: 3740, tid: 13610, locationOpt: Some(BlockManagerId(1608, 
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss: 
false.
   24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3740), so 
marking it as still running.
   24/08/26 16:56:24 INFO DAGScheduler: Executor lost: 1608 (epoch 1)
   24/08/26 16:56:24 INFO BlockManagerMasterEndpoint: Trying to remove executor 
1608 from BlockManagerMaster.
   ```
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to