Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/15986
  
    I managed to come up with a standalone end-to-end reproduction of the 
shuffle file leak, allowing me to validate this patch's fix.
    
    Run
    
    ```
    ./bin/spark-shell --master=local-cluster[2,5,1024] --conf 
spark.task.maxFailures=1 --conf spark.local.dir=/tmp
    ```
    
    to bring up a Spark shell with two executor JVMs. Then, execute the 
following:
    
    ```scala
    sc.parallelize(1 to 10, 10).map { x => Thread.sleep(1000); (x, x) 
}.groupByKey(10).map { 
        case _ => Thread.sleep(120 * 1000); 1
    }.union(sc.parallelize(1 to 10)).count()
    ```
    
    (Note that the `.union()` here is critical for the reproduction; I explain 
this below).
    
    The `Thread.sleep()` calls were strategically chosen so that we'll get the 
executor JVMs into a state where both executors have run shuffle map tasks and 
both are in the middle of running reduce / result tasks.
    
    Next, kill one of the executor JVMs abruptly with `kill -9`. The worker JVM 
will immediately detect its executor JVM's death and will send messages to the 
master causing that executor's tasks to be marked as failed. Because of 
`spark.task.maxFailures=1` this will cause the job to immediately fail but 
there will still be five running zombie tasks on the executor that we didn't 
kill.
    
    Wait until those zombie tasks have finished (which will happen within two 
minutes), then run `System.gc()`, then check the non-killed executor's block 
manager directories and observe that shuffle files have been leaked. This is 
due to the leak of the `ShuffleDependency`, which can be validated with `jmap 
-histo`:
    
    ```bash
    $ jmap -histo 72081 | grep ShuffleDependency
    2037:             1             56  org.apache.spark.ShuffleDependency
    ```
    
    This is because the `TaskSetManager` was leaked:
    
    ```
     jmap -histo 72081 | grep 'org.apache.spark.scheduler.TaskSetManager$'
    1252:             1            224  
org.apache.spark.scheduler.TaskSetManager
    ```
    
    Note that while executor death seems to always leak a `TaskSetManager`, 
this doesn't always result in a leaked `ShuffleDependency`; the reasons for 
this are slightly subtle and I can expand on them later, but to summarize in a 
nutshell: a `Task` whose partition is a `ShuffleRddPartition` won't actually 
contain a reference to the parent RDD; the parent RDD and `ShuffleDependency` 
will be kept alive in the scheduler via the parent stage and via inter-stage 
relationships, but there won't be a direct reference chain from the `Task` 
itself. On the other hand, some partition types such as `UnionRDDPartition` may 
have transient references to parent RDD objects, causing the driver-side `Task` 
to keep the whole RDD and ShuffleDependency lineage chain alive. This usually 
isn't a problem since `Task`s typically don't get leaked like this and the 
`@transient` fields prevent us from over-capturing during serialization, but it 
exacerbates the `TaskSetManager` leaks here.
    
    After applying this PR's changes, you can re-run the same experiment and 
see that both the `TaskSetManager` and `ShuffleDependency` are properly cleaned 
up after the zombie tasks finish and GC has run to trigger the ContextCleaner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to