GitHub user JoshRosen reopened a pull request:

    https://github.com/apache/spark/pull/16070

    [SPARK-18553][CORE][BRANCH-1.6] Fix leak of TaskSetManager following 
executor loss 

    ## What changes were proposed in this pull request?
    
    _This is the master branch-1.6 version of #15986; the original description 
follows:_
    
    This patch fixes a critical resource leak in the TaskScheduler which could 
cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor 
with running tasks is permanently lost and the associated stage fails.
    
    This problem was originally identified by analyzing the heap dump of a 
driver belonging to a cluster that had run out of shuffle space. This dump 
contained several `ShuffleDependency` instances that were retained by 
`TaskSetManager`s inside the scheduler but were not otherwise referenced. Each 
of these `TaskSetManager`s was considered a "zombie" but had no running tasks 
and therefore should have been cleaned up. However, these zombie task sets were 
still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map.
    
    Entries are added to the `taskIdToTaskSetManager` map when tasks are 
launched and are removed inside of `TaskScheduler.statusUpdate()`, which is 
invoked by the scheduler backend while processing `StatusUpdate` messages from 
executors. The problem with this design is that a completely dead executor will 
never send a `StatusUpdate`. There is [some 
code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338)
 in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` 
state (which is supposed to correspond to a task failure triggered by total 
executor loss), but this state only seems to be used in Mesos fine-grained 
mode. There doesn't seem to be any code which performs per-task state cleanup 
for tasks that were running on an executor that completely disappears without 
sending any sort of final death message. The `executorLost` and 
[`removeExecutor`](https://github.com/ap
 
ache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527)
 methods don't appear to perform any cleanup of the `taskId -> *` mappings, 
causing the leaks observed here.
    
    This patch's fix is to maintain a `executorId -> running task id` mapping 
so that these `taskId -> *` maps can be properly cleaned up following an 
executor loss.
    
    There are some potential corner-case interactions that I'm concerned about 
here, especially some details in [the 
comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523)
 in `removeExecutor`, so I'd appreciate a very careful review of these changes.
    
    ## How was this patch tested?
    
    I added a new unit test to `TaskSchedulerImplSuite`.
    
    /cc @kayousterhout and @markhamstra, who reviewed #15986.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark 
fix-leak-following-total-executor-loss-1.6

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16070.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16070
    
----
commit 15c0489cdc5bb81695405240c879fc7cf70fc745
Author: Josh Rosen <joshro...@databricks.com>
Date:   2016-11-28T21:17:24Z

    [SPARK-18553][CORE][BRANCH-2.0] Fix leak of TaskSetManager following 
executor loss
    
    This patch fixes a critical resource leak in the TaskScheduler which could 
cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor 
with running tasks is permanently lost and the associated stage fails.
    
    This problem was originally identified by analyzing the heap dump of a 
driver belonging to a cluster that had run out of shuffle space. This dump 
contained several `ShuffleDependency` instances that were retained by 
`TaskSetManager`s inside the scheduler but were not otherwise referenced. Each 
of these `TaskSetManager`s was considered a "zombie" but had no running tasks 
and therefore should have been cleaned up. However, these zombie task sets were 
still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map.
    
    Entries are added to the `taskIdToTaskSetManager` map when tasks are 
launched and are removed inside of `TaskScheduler.statusUpdate()`, which is 
invoked by the scheduler backend while processing `StatusUpdate` messages from 
executors. The problem with this design is that a completely dead executor will 
never send a `StatusUpdate`. There is [some 
code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338)
 in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` 
state (which is supposed to correspond to a task failure triggered by total 
executor loss), but this state only seems to be used in Mesos fine-grained 
mode. There doesn't seem to be any code which performs per-task state cleanup 
for tasks that were running on an executor that completely disappears without 
sending any sort of final death message. The `executorLost` and 
[`removeExecutor`](https://github.com/ap
 
ache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527)
 methods don't appear to perform any cleanup of the `taskId -> *` mappings, 
causing the leaks observed here.
    
    This patch's fix is to maintain a `executorId -> running task id` mapping 
so that these `taskId -> *` maps can be properly cleaned up following an 
executor loss.
    
    There are some potential corner-case interactions that I'm concerned about 
here, especially some details in [the 
comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523)
 in `removeExecutor`, so I'd appreciate a very careful review of these changes.
    
    This PR is opened against branch-2.0, where I first observed this problem, 
but will also need to be fixed in master, branch-2.1, and branch-1.6 (which 
I'll do in followup PRs after this fix is reviewed and merged).
    
    I added a new unit test to `TaskSchedulerImplSuite`. You can check out this 
PR as of 25e455e711b978cd331ee0f484f70fde31307634 to see the failing test.
    
    cc kayousterhout, markhamstra, rxin for review.
    
    Author: Josh Rosen <joshro...@databricks.com>
    
    Closes #15986 from JoshRosen/fix-leak-following-total-executor-loss.

commit 776269ffeb7f2324a0a00899658e712621b3ff56
Author: Josh Rosen <joshro...@databricks.com>
Date:   2016-11-30T00:59:47Z

    Update TaskSchedulerImpl.scala

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to