Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/8090#issuecomment-131232391
  
    @squito here is the answer to your question:
    
    ## Short version
    
    When we have a fetch failure, we resubmit both the map stage that wrote the 
shuffle files, and the stage that attempted to read them. In both cases, we 
reuse the same `Stage` object, meaning the partial accumulator values from 
previously successful tasks are not cleared. As a result, we end up *double 
counting* the accumulator values from the successful tasks.
    
    ## Long version
    
    I was able to reproduce this case by simulating a fetch failure. I 
implemented your proposed simplification here and added a few `println`'s for 
debugging: 
https://github.com/apache/spark/compare/master...andrewor14:test-internal-accums
    
    I ran the following in a `bin/spark-shell --master local[*,4]`, which says 
use as many cores as are available on the machine, and retry failed tasks up to 
4 times. This code runs two shuffles and simulates a fetch failure in the first.
    
    ```
    import org.apache.spark._
    import org.apache.spark.shuffle.FetchFailedException
    
    val data = sc.parallelize(1 to 10, 5).map(identity).groupBy(identity)
    val shuffleHandle = 
data.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
    
    // Simulate fetch failures
    val mappedData = data.map { case (i, _) =>
      val taskContext = TaskContext.get
      if (taskContext.attemptNumber() == 0 && taskContext.partitionId() == 0) {
        // Cause the post-shuffle stage to fail on its first attempt with a 
single task failure
        val env = SparkEnv.get
        val bmAddress = env.blockManager.blockManagerId
        val shuffleId = shuffleHandle.shuffleId
        val mapId = 0
        val reduceId = taskContext.partitionId()
        val message = "Simulated fetch failure"
        throw new FetchFailedException(bmAddress, shuffleId, mapId, reduceId, 
message)
      } else {
        (i, i)
      }
    }
    
    mappedData.reduceByKey { _ + _ }.count()
    ```
    Here is the output.
    ```
    === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
      - all partitions: 0, 1, 2, 3, 4
      - partitions to compute: 0, 1, 2, 3, 4
      - internal accum values: 0
    === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
      - all partitions: 0, 1, 2, 3, 4
      - partitions to compute: 0, 1, 2, 3, 4
      - internal accum values: 31488
    15/08/14 13:01:45 WARN TaskSetManager: Lost task 0.0 in stage 1.4 (TID 45, 
localhost): FetchFailed ...
    === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
      - all partitions: 0, 1, 2, 3, 4
      - partitions to compute: 0, 1, 2, 3, 4
      - internal accum values: 0
    === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
      - all partitions: 0, 1, 2, 3, 4
      - partitions to compute: 0, 1, 2, 3, 4
      - internal accum values: 39360
    15/08/14 13:01:45 WARN TaskSetManager: Lost task 0.0 in stage 1.5 (TID 55, 
localhost): FetchFailed ...
    === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
      - all partitions: 0, 1, 2, 3, 4
      - partitions to compute: 0, 1, 2, 3, 4
      - internal accum values: 0
    === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
      - all partitions: 0, 1, 2, 3, 4
      - partitions to compute: 0, 1, 2, 3, 4
      - internal accum values: 47232
    15/08/14 13:01:45 WARN TaskSetManager: Lost task 0.0 in stage 1.6 (TID 65, 
localhost): FetchFailed ...
    ```
    The result is that every time we retry a stage we end up increasing the 
accumulator values. This is because a subset of the tasks *are* in fact 
successful, and these tasks keep contributing to the final accumulator value 
since we retry all partitions when we retry a `ShuffleMapStage`.
    
    (There is actually a separate bug where we retry the stage infinite number 
of times, but that is completely orthogonal to our discussion).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to