GitHub user JoshRosen opened a pull request:
https://github.com/apache/spark/pull/16045
[SPARK-18553][CORE] Fix leak of TaskSetManager following executor loss
## What changes were proposed in this pull request?
_This is the master branch version of #15986; the original description
follows:_
This patch fixes a critical resource leak in the TaskScheduler which could
cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor
with running tasks is permanently lost and the associated stage fails.
This problem was originally identified by analyzing the heap dump of a
driver belonging to a cluster that had run out of shuffle space. This dump
contained several `ShuffleDependency` instances that were retained by
`TaskSetManager`s inside the scheduler but were not otherwise referenced. Each
of these `TaskSetManager`s was considered a "zombie" but had no running tasks
and therefore should have been cleaned up. However, these zombie task sets were
still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map.
Entries are added to the `taskIdToTaskSetManager` map when tasks are
launched and are removed inside of `TaskScheduler.statusUpdate()`, which is
invoked by the scheduler backend while processing `StatusUpdate` messages from
executors. The problem with this design is that a completely dead executor will
never send a `StatusUpdate`. There is [some
code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338)
in `statusUpdate` which handles tasks that exit with the `TaskState.LOST`
state (which is supposed to correspond to a task failure triggered by total
executor loss), but this state only seems to be used in Mesos fine-grained
mode. There doesn't seem to be any code which performs per-task state cleanup
for tasks that were running on an executor that completely disappears without
sending any sort of final death message. The `executorLost` and
[`removeExecutor`](https://github.com/ap
ache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527)
methods don't appear to perform any cleanup of the `taskId -> *` mappings,
causing the leaks observed here.
This patch's fix is to maintain a `executorId -> running task id` mapping
so that these `taskId -> *` maps can be properly cleaned up following an
executor loss.
There are some potential corner-case interactions that I'm concerned about
here, especially some details in [the
comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523)
in `removeExecutor`, so I'd appreciate a very careful review of these changes.
## How was this patch tested?
I added a new unit test to `TaskSchedulerImplSuite`.
/cc @kayousterhout and @markhamstra, who reviewed #15986.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/JoshRosen/spark
fix-leak-following-total-executor-loss-master
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/16045.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #16045
----
commit 96897631dfb5efc9cbf94044ea4eb0a8764ae78c
Author: Josh Rosen <[email protected]>
Date: 2016-11-28T21:51:15Z
Port of #15986 to master branch.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]