Github user JoshRosen commented on the issue:
https://github.com/apache/spark/pull/15986
I managed to come up with a standalone end-to-end reproduction of the
shuffle file leak, allowing me to validate this patch's fix.
Run
```
./bin/spark-shell --master=local-cluster[2,5,1024] --conf
spark.task.maxFailures=1 --conf spark.local.dir=/tmp
```
to bring up a Spark shell with two executor JVMs. Then, execute the
following:
```scala
sc.parallelize(1 to 10, 10).map { x => Thread.sleep(1000); (x, x)
}.groupByKey(10).map {
case _ => Thread.sleep(120 * 1000); 1
}.union(sc.parallelize(1 to 10)).count()
```
(Note that the `.union()` here is critical for the reproduction; I explain
this below).
The `Thread.sleep()` calls were strategically chosen so that we'll get the
executor JVMs into a state where both executors have run shuffle map tasks and
both are in the middle of running reduce / result tasks.
Next, kill one of the executor JVMs abruptly with `kill -9`. The worker JVM
will immediately detect its executor JVM's death and will send messages to the
master causing that executor's tasks to be marked as failed. Because of
`spark.task.maxFailures=1` this will cause the job to immediately fail but
there will still be five running zombie tasks on the executor that we didn't
kill.
Wait until those zombie tasks have finished (which will happen within two
minutes), then run `System.gc()`, then check the non-killed executor's block
manager directories and observe that shuffle files have been leaked. This is
due to the leak of the `ShuffleDependency`, which can be validated with `jmap
-histo`:
```bash
$ jmap -histo 72081 | grep ShuffleDependency
2037: 1 56 org.apache.spark.ShuffleDependency
```
This is because the `TaskSetManager` was leaked:
```
jmap -histo 72081 | grep 'org.apache.spark.scheduler.TaskSetManager$'
1252: 1 224
org.apache.spark.scheduler.TaskSetManager
```
Note that while executor death seems to always leak a `TaskSetManager`,
this doesn't always result in a leaked `ShuffleDependency`; the reasons for
this are slightly subtle and I can expand on them later, but to summarize in a
nutshell: a `Task` whose partition is a `ShuffleRddPartition` won't actually
contain a reference to the parent RDD; the parent RDD and `ShuffleDependency`
will be kept alive in the scheduler via the parent stage and via inter-stage
relationships, but there won't be a direct reference chain from the `Task`
itself. On the other hand, some partition types such as `UnionRDDPartition` may
have transient references to parent RDD objects, causing the driver-side `Task`
to keep the whole RDD and ShuffleDependency lineage chain alive. This usually
isn't a problem since `Task`s typically don't get leaked like this and the
`@transient` fields prevent us from over-capturing during serialization, but it
exacerbates the `TaskSetManager` leaks here.
After applying this PR's changes, you can re-run the same experiment and
see that both the `TaskSetManager` and `ShuffleDependency` are properly cleaned
up after the zombie tasks finish and GC has run to trigger the ContextCleaner.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]