Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18150#discussion_r121021307
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1383,19 +1394,43 @@ class DAGScheduler(
        */
       private[scheduler] def handleExecutorLost(
           execId: String,
    -      filesLost: Boolean,
    -      maybeEpoch: Option[Long] = None) {
    +      workerLost: Boolean): Unit = {
    +    // if the cluster manager explicitly tells us that the entire worker 
was lost, then
    +    // we know to unregister shuffle output.  (Note that "worker" 
specifically refers to the process
    +    // from a Standalone cluster, where the shuffle service lives in the 
Worker.)
    +    val fileLost = workerLost || 
!env.blockManager.externalShuffleServiceEnabled
    +    removeExecutorAndUnregisterOutputs(
    +      execId = execId,
    +      fileLost = fileLost,
    +      hostToUnregisterOutputs = None,
    +      maybeEpoch = None)
    +  }
    +
    +  private def removeExecutorAndUnregisterOutputs(
    +      execId: String,
    +      fileLost: Boolean,
    +      hostToUnregisterOutputs: Option[String],
    +      maybeEpoch: Option[Long] = None): Unit = {
         val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
         if (!failedEpoch.contains(execId) || failedEpoch(execId) < 
currentEpoch) {
           failedEpoch(execId) = currentEpoch
           logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
           blockManagerMaster.removeExecutor(execId)
    -
    -      if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
    -        logInfo("Shuffle files lost for executor: %s (epoch 
%d)".format(execId, currentEpoch))
    +      if (fileLost) {
    +        hostToUnregisterOutputs match {
    +          case Some(host) =>
    +            logInfo("Shuffle files lost for host: %s (epoch 
%d)".format(host, currentEpoch))
    +          case None =>
    +            logInfo("Shuffle files lost for executor: %s (epoch 
%d)".format(execId, currentEpoch))
    +        }
             // TODO: This will be really slow if we keep accumulating shuffle 
map stages
             for ((shuffleId, stage) <- shuffleIdToMapStage) {
    -          stage.removeOutputsOnExecutor(execId)
    +          hostToUnregisterOutputs match {
    --- End diff --
    
    Ah, I remember now: the problem is that the current code already avoids the 
problem with the "1 -> 2 -> 3" cascading retries issue because it's _already_ 
treating any fetch failure as a complete executor output loss via
    
    ```
    handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
    ```
    
    Over on the earlier PR, there were a number of arguments that "fetch 
failure does not imply lost executor", but those are kind of moot given that 
we're _already_ handling it that way in today's code and before the changes 
proposed here.
    
    So now I see why the PR description focuses on the multiple executors per 
host scenario: this isn't actually changing behavior in the 
single-executor-per-host world.
    
    In the case where the host is legitimately down then _most likely_ we'll 
get fetch failures from all of the executors on that host and will remove all 
of the outputs, so in some scenarios we'll happen to do the right thing. But I 
can imagine how there are scenarios on very large clusters where we'll get 
unlucky and won't observe fetch failures from the complete set of executors 
being served from that host's shuffle service; this could be especially likely 
to happen if the shuffle service is serving outputs from many dead executors 
which OOM'd and restarted during the map phase. Therefore I can understand the 
argument here that it's best to just round up.
    
    The only real question here is how often we'd regret doing this in 
practice: how often is a fetch failure actually a transient issue? Given that 
there are already fetch retry mechanisms inside of tasks, I'm guessing that the 
"false positive" scenario is somewhat rare.
    
    Therefore, I'm now persuaded that this is a good change and I have a better 
understanding of how this fits into the larger context of fetch failure 
handling issues.
    
    Since this has been somewhat controversial, what do you think about 
compromising and adding a feature-flag which lets users opt-in to the old 
behavior (i.e. the flag just disables the promotion of "all outputs on executor 
lost" to "all outputs on host lost")?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to