JoshRosen commented on code in PR #39011:
URL: https://github.com/apache/spark/pull/39011#discussion_r1177257697


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1046,17 +1048,45 @@ private[spark] class TaskSetManager(
 
   /** Called by TaskScheduler when an executor is lost so we can re-enqueue 
our tasks */
   override def executorLost(execId: String, host: String, reason: 
ExecutorLossReason): Unit = {
-    // Re-enqueue any tasks that ran on the failed executor if this is a 
shuffle map stage,
-    // and we are not using an external shuffle server which could serve the 
shuffle outputs.
-    // The reason is the next stage wouldn't be able to fetch the data from 
this dead executor
-    // so we would need to rerun these tasks on other executors.
-    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie) {
+    // Re-enqueue any tasks with potential shuffle data loss that ran on the 
failed executor
+    // if this is a shuffle map stage, and we are not using an external 
shuffle server which
+    // could serve the shuffle outputs or the executor lost is caused by 
decommission (which
+    // can destroy the whole host). The reason is the next stage wouldn't be 
able to fetch the
+    // data from this dead executor so we would need to rerun these tasks on 
other executors.
+    val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
+      (reason.isInstanceOf[ExecutorDecommission] || 
!env.blockManager.externalShuffleServiceEnabled)

Review Comment:
   @Ngone51 @mridulm  I have a question about the logic here:
   
   Executor decommissioning does not necessarily imply worker decommissioning. 
If an external shuffle service is used and the executor is decommissioned 
without the worker also being decommissioned then the shuffle files will 
continue to be available at the original host.
   
   Prior to this PR, I don't think this `executorLost` method would not have 
scheduled re-runs in that case because the `&& 
!env.blockManager.externalShuffleServiceEnabled` condition would evaluate to 
`false` when the ESS was used, causing us to skip all of the resubmission logic 
here.
       
   With this PR, though, I think these changes might actually _cause_ 
unnecessary task re-submission in that case because the 
`(reason.isInstanceOf[ExecutorDecommission] || 
!env.blockManager.externalShuffleServiceEnabled)` condition would evaluate to 
true and `locationOpt.exists(_.host != host)` would evaluate to false because 
the original outputs are still available because no migration is needed.
   
   I think this could be addressed by checking whether 
[`ExecutorDecommission.workerHost`](https://github.com/apache/spark/blob/b2f9aab8a2cabf04ac3267de0670bdd403831390/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala#L83)
 is defined, i.e. to do
   
   ```scala
   val workerIsDecommissioned = reason match {
     case e: ExecutorDecommission if e.workerHost.isDefined => true
     case _ => false
   }
   
   val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
     (workerIsDecommissioned || !env.blockManager.externalShuffleServiceEnabled)
   ```
   
   That said, my argument above doesn't hold for Spark-on-Kubernetes because it 
never sets `workerHost`: 
   
   
https://github.com/apache/spark/blob/a2a5299cb2d252e85d3b05dbabf7a6db3f488cb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala#L336-L341
   
   Given this, I am wondering whether this PR's change might represent a 
regression when Dynamic Allocation is used alongside an external shuffle 
service in YARN.
   
   WDYT? Am I interpreting the code correctly or have I overlooked something 
here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to