Github user andrewor14 commented on the pull request:
https://github.com/apache/spark/pull/8090#issuecomment-131232391
@squito here is the answer to your question:
## Short version
When we have a fetch failure, we resubmit both the map stage that wrote the
shuffle files, and the stage that attempted to read them. In both cases, we
reuse the same `Stage` object, meaning the partial accumulator values from
previously successful tasks are not cleared. As a result, we end up *double
counting* the accumulator values from the successful tasks.
## Long version
I was able to reproduce this case by simulating a fetch failure. I
implemented your proposed simplification here and added a few `println`'s for
debugging:
https://github.com/apache/spark/compare/master...andrewor14:test-internal-accums
I ran the following in a `bin/spark-shell --master local[*,4]`, which says
use as many cores as are available on the machine, and retry failed tasks up to
4 times. This code runs two shuffles and simulates a fetch failure in the first.
```
import org.apache.spark._
import org.apache.spark.shuffle.FetchFailedException
val data = sc.parallelize(1 to 10, 5).map(identity).groupBy(identity)
val shuffleHandle =
data.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
// Simulate fetch failures
val mappedData = data.map { case (i, _) =>
val taskContext = TaskContext.get
if (taskContext.attemptNumber() == 0 && taskContext.partitionId() == 0) {
// Cause the post-shuffle stage to fail on its first attempt with a
single task failure
val env = SparkEnv.get
val bmAddress = env.blockManager.blockManagerId
val shuffleId = shuffleHandle.shuffleId
val mapId = 0
val reduceId = taskContext.partitionId()
val message = "Simulated fetch failure"
throw new FetchFailedException(bmAddress, shuffleId, mapId, reduceId,
message)
} else {
(i, i)
}
}
mappedData.reduceByKey { _ + _ }.count()
```
Here is the output.
```
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
- all partitions: 0, 1, 2, 3, 4
- partitions to compute: 0, 1, 2, 3, 4
- internal accum values: 0
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
- all partitions: 0, 1, 2, 3, 4
- partitions to compute: 0, 1, 2, 3, 4
- internal accum values: 31488
15/08/14 13:01:45 WARN TaskSetManager: Lost task 0.0 in stage 1.4 (TID 45,
localhost): FetchFailed ...
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
- all partitions: 0, 1, 2, 3, 4
- partitions to compute: 0, 1, 2, 3, 4
- internal accum values: 0
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
- all partitions: 0, 1, 2, 3, 4
- partitions to compute: 0, 1, 2, 3, 4
- internal accum values: 39360
15/08/14 13:01:45 WARN TaskSetManager: Lost task 0.0 in stage 1.5 (TID 55,
localhost): FetchFailed ...
=== STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
- all partitions: 0, 1, 2, 3, 4
- partitions to compute: 0, 1, 2, 3, 4
- internal accum values: 0
=== STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
- all partitions: 0, 1, 2, 3, 4
- partitions to compute: 0, 1, 2, 3, 4
- internal accum values: 47232
15/08/14 13:01:45 WARN TaskSetManager: Lost task 0.0 in stage 1.6 (TID 65,
localhost): FetchFailed ...
```
The result is that every time we retry a stage we end up increasing the
accumulator values. This is because a subset of the tasks *are* in fact
successful, and these tasks keep contributing to the final accumulator value
since we retry all partitions when we retry a `ShuffleMapStage`.
(There is actually a separate bug where we retry the stage infinite number
of times, but that is completely orthogonal to our discussion).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]