Ngone51 commented on a change in pull request #22806: [SPARK-25250][CORE] :
Late zombie task completions handled correctly even before new taskset launched
URL: https://github.com/apache/spark/pull/22806#discussion_r246995261
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -286,6 +286,44 @@ private[spark] class TaskSchedulerImpl(
}
}
+ /**
+ * SPARK-25250: Whenever any Task gets successfully completed, we simply
mark the
+ * corresponding partition id as completed in all attempts for that
particular stage and
+ * additionally, for a Result Stage, we also kill the remaining task
attempts running on the
+ * same partition. As a result, we do not see any Killed tasks due to
+ * TaskCommitDenied Exceptions showing up in the UI. When this method is
called from
+ * DAGScheduler.scala on a task completion event being fired, it is assumed
that the new
+ * TaskSet has already been created and registered. However, a small
possibility does exist
+ * that when this method gets called, possibly the new TaskSet might have
not been added
+ * to taskSetsByStageIdAndAttempt. In such a case, we might still hit the
same issue. However,
+ * the above scenario has not yet been reproduced.
+ */
+ override def completeTasks(partitionId: Int, stageId: Int, killTasks:
Boolean): Unit = {
+ taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm
=>
+ tsm.partitionToIndex.get(partitionId) match {
+ case Some(index) =>
+ tsm.markPartitionAsAlreadyCompleted(index)
Review comment:
Having another look at `completeTasks` & `markPartitionAsAlreadyCompleted`,
given my thoughts below.
First of all, I think we have reached an agreement on killing the running
attempts for the same partition.
Besides,
* if `tsm` is active and task corresponding to `partitionId` here is its
last unfinished task
- the task is not running
So, only mark `successful(index) = true` but do not mark it as
`zombie` would make it never have a chance to
increase `tasksSuccessful` and finish itself later.
- the task is running
`tsm` will hanle the killed task at the end, but also can not finish
itself. Because we won't mark the TaskSet as
`zombie` (unless it's a barrier TaskSet) when we handle failed
task. Also, `tasksSuccessful`.
* if `tsm` is zombie
Mark `successful(index) = true` is meaningless. Because we've already sent
killing siganls to thoes tasks, so I believe those zombie TaskSetManager would
handle them normally at the end. So, even call `maybeFinishTaskSet()` here is
useless.
Furthermore, I'm thinking about whether we need to exclude the `tsm` who
succeed the task firstly and trigger all the stuff subsequently, so that we may
avoid doing some redundant stuff, e.g. `successful(index) = true`. **But, we
do also need to kill tasks for it.**
cc @cloud-fan @pgandhi999 WDYT ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]