Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/5964#discussion_r30862004
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1031,15 +1036,31 @@ class DAGScheduler(
case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
+ val computeCount =
shuffleStage.incComputeCount(smt.partitionId)
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
- if (failedEpoch.contains(execId) && smt.epoch <=
failedEpoch(execId)) {
- logInfo("Ignoring possibly bogus ShuffleMapTask completion
from " + execId)
+ if (computeCount > 1) {
+ // REVIEWERS: do I need to worry about speculation here,
when multiple completion
+ // events are normal?
+
+ // REVIEWERS: is this really only a problem on a
ShuffleMapTask?? does it also cause
+ // problems for ResultTask?
+
+ // This can happen when a retry runs a task, but there was a
lingering task from an
+ // earlier attempt which also finished. The results might
be OK, or they might not.
+ // To be safe, we'll retry the task, and do it in yet
another attempt, to avoid more
+ // task output clobbering.
+ logInfo(s"Multiple completion events for task $task.
Results may be corrupt," +
+ s" assuming task needs to be rerun.")
+ shuffleStage.removeOutputLoc(task.partitionId)
--- End diff --
for (3) & (4)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]