Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/12258#discussion_r65776097
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -343,17 +343,31 @@ private[spark] class TaskSchedulerImpl(
}
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
+ var executorId: String = null
if (TaskState.isFinished(state)) {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { execId =>
+ executorId = execId
if (executorIdToTaskCount.contains(execId)) {
executorIdToTaskCount(execId) -= 1
}
}
}
if (state == TaskState.FINISHED) {
- taskSet.removeRunningTask(tid)
- taskResultGetter.enqueueSuccessfulTask(taskSet, tid,
serializedData)
+ // In some case, executor has already removed by driver for
heartbeats timeout, but
+ // at sometime, before executor killed by cluster, the task
of running on this
+ // executor is finished and return task success state to
driver. However, this kinds
+ // of task should be ignored, because the task on this
executor is already re-queued
+ // by driver. For more details, can check in SPARK-14485.
+ if (executorId.ne(null) &&
!executorIdToTaskCount.contains(executorId)) {
+ taskSet.removeRunningTask(tid)
+ logWarning(
--- End diff --
nit: warning is a bit overkill here, since we already know we don't want
this executor. info or even debug might be more appropriate.
Also, I personally prefer interpolation for these messages.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]