Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/18150#discussion_r121021307
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1383,19 +1394,43 @@ class DAGScheduler(
*/
private[scheduler] def handleExecutorLost(
execId: String,
- filesLost: Boolean,
- maybeEpoch: Option[Long] = None) {
+ workerLost: Boolean): Unit = {
+ // if the cluster manager explicitly tells us that the entire worker
was lost, then
+ // we know to unregister shuffle output. (Note that "worker"
specifically refers to the process
+ // from a Standalone cluster, where the shuffle service lives in the
Worker.)
+ val fileLost = workerLost ||
!env.blockManager.externalShuffleServiceEnabled
+ removeExecutorAndUnregisterOutputs(
+ execId = execId,
+ fileLost = fileLost,
+ hostToUnregisterOutputs = None,
+ maybeEpoch = None)
+ }
+
+ private def removeExecutorAndUnregisterOutputs(
+ execId: String,
+ fileLost: Boolean,
+ hostToUnregisterOutputs: Option[String],
+ maybeEpoch: Option[Long] = None): Unit = {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) <
currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)
-
- if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
- logInfo("Shuffle files lost for executor: %s (epoch
%d)".format(execId, currentEpoch))
+ if (fileLost) {
+ hostToUnregisterOutputs match {
+ case Some(host) =>
+ logInfo("Shuffle files lost for host: %s (epoch
%d)".format(host, currentEpoch))
+ case None =>
+ logInfo("Shuffle files lost for executor: %s (epoch
%d)".format(execId, currentEpoch))
+ }
// TODO: This will be really slow if we keep accumulating shuffle
map stages
for ((shuffleId, stage) <- shuffleIdToMapStage) {
- stage.removeOutputsOnExecutor(execId)
+ hostToUnregisterOutputs match {
--- End diff --
Ah, I remember now: the problem is that the current code already avoids the
problem with the "1 -> 2 -> 3" cascading retries issue because it's _already_
treating any fetch failure as a complete executor output loss via
```
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
```
Over on the earlier PR, there were a number of arguments that "fetch
failure does not imply lost executor", but those are kind of moot given that
we're _already_ handling it that way in today's code and before the changes
proposed here.
So now I see why the PR description focuses on the multiple executors per
host scenario: this isn't actually changing behavior in the
single-executor-per-host world.
In the case where the host is legitimately down then _most likely_ we'll
get fetch failures from all of the executors on that host and will remove all
of the outputs, so in some scenarios we'll happen to do the right thing. But I
can imagine how there are scenarios on very large clusters where we'll get
unlucky and won't observe fetch failures from the complete set of executors
being served from that host's shuffle service; this could be especially likely
to happen if the shuffle service is serving outputs from many dead executors
which OOM'd and restarted during the map phase. Therefore I can understand the
argument here that it's best to just round up.
The only real question here is how often we'd regret doing this in
practice: how often is a fetch failure actually a transient issue? Given that
there are already fetch retry mechanisms inside of tasks, I'm guessing that the
"false positive" scenario is somewhat rare.
Therefore, I'm now persuaded that this is a good change and I have a better
understanding of how this fits into the larger context of fetch failure
handling issues.
Since this has been somewhat controversial, what do you think about
compromising and adding a feature-flag which lets users opt-in to the old
behavior (i.e. the flag just disables the promotion of "all outputs on executor
lost" to "all outputs on host lost")?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]