wypoon commented on pull request #28848:
URL: https://github.com/apache/spark/pull/28848#issuecomment-648290364


   @tgravescs thanks for reviewing.
   Our customer was not using spark.files.fetchFailure.unRegisterOutputOnHost.
   In case of `FetchFailure`, in `DAGScheduler#handleTaskCompletion`, there is 
this code:
   ```
             if (bmAddress != null) {
               val hostToUnregisterOutputs = if 
(env.blockManager.externalShuffleServiceEnabled &&
                 unRegisterOutputOnHostOnFetchFailure) {
                 // We had a fetch failure with the external shuffle service, 
so we
                 // assume all shuffle data on the node is bad.
                 Some(bmAddress.host)
               } else {
                 // Unregister shuffle data just for one executor (we don't 
have any
                 // reason to believe shuffle data has been lost for the entire 
host).
                 None
               }
               removeExecutorAndUnregisterOutputs(
                 execId = bmAddress.executorId,
                 fileLost = true,
                 hostToUnregisterOutputs = hostToUnregisterOutputs,
                 maybeEpoch = Some(task.epoch))
             }
   ```
   This is the only place where `removeExecutorAndUnregisterOutputs` is called 
with `fileLost=true` for us. For us, since we're not using Spark Standalone 
(we're using Spark on YARN) and we're using the external shuffle service, the 
call to `removeExecutorAndUnregisterOutputs` from 
`DAGScheduler#handleExecutorLost` will always be with `fileLost=false`.
   Returning to handling the `FetchFailure`, we're calling 
`removeExecutorAndUnregisterOutputs` with `hostToUnregisterOutputs=None`. We 
should be unregistering the outputs for that executor, but when the 
`FetchFailure` happens in the same epoch after the executor is lost (so 
`removeExecutorAndUnregisterOutputs` has already been called from 
`handleExecutorLost`), this (before my change) is a no-op (since 
`failedEpoch(execId)` for that executor has already been set to that epoch). In 
general, there are multiple such `FetchFailure`s.
   
   In the Jira, I have summarized the bug. The `FetchFailure`s after the 
executor is lost will lead to a stage re-attempt, and in the new stage attempt, 
`FetchFailure`s will continue to happen from the lost executor's outputs, but 
now the epoch is higher, so this time, the executor's outputs will be 
unregistered. But this means it takes two stage attempts instead of one to 
recover from the lost executor.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to