Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/15986#discussion_r89713138
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -335,38 +337,38 @@ private[spark] class TaskSchedulerImpl(
var reason: Option[ExecutorLossReason] = None
synchronized {
try {
- if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
- // We lost this entire executor, so remember that it's gone
- val execId = taskIdToExecutorId(tid)
-
- if (executorIdToTaskCount.contains(execId)) {
- reason = Some(
- SlaveLost(s"Task $tid was lost, so marking the executor as
lost as well."))
- removeExecutor(execId, reason.get)
- failedExecutor = Some(execId)
- }
- }
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
- if (TaskState.isFinished(state)) {
- taskIdToTaskSetManager.remove(tid)
- taskIdToExecutorId.remove(tid).foreach { execId =>
- if (executorIdToTaskCount.contains(execId)) {
- executorIdToTaskCount(execId) -= 1
- }
+ if (state == TaskState.LOST) {
--- End diff --
I think this is technically okay, but the rationale is a little tricky:
If we receive a status update for a task with `TaskState.LOST` and the task
corresponds to a task set which exists (i.e. we're taking this `if` branch),
then there are two cases to consider:
1. the task is running on an executor that we know about (i.e.
`executorIdToRunningTaskIds.contains(execId) == true`), so we mark the executor
as failed in the `removeExecutor()` call, which has the side-effect of calling
`cleanupTaskState` for all tasks running on that executor (including this task).
2. we don't know about the executor now, but we must have known about it
earlier otherwise we wouldn't have scheduled a task there. Therefore, the
executor was already removed earlier and that `removeExecutor()` call should
have already called `cleanupTaskState()`. We shouldn't have to worry about
races within `TaskSchedulerImpl` because all of the methods which interact with
these data structures are synchronized.
That said, `cleanupTaskState()` is idempotent and there's not really any
harm in putting in an extra call here just to make it absolutely clear that
it's guaranteed to be cleaned up in all cases. I'll look into performing this
restructuring.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]