Ngone51 commented on a change in pull request #22806: [SPARK-25250][CORE] : 
Late zombie task completions handled correctly even before new taskset launched
URL: https://github.com/apache/spark/pull/22806#discussion_r246995261
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -286,6 +286,44 @@ private[spark] class TaskSchedulerImpl(
     }
   }
 
+  /**
+   * SPARK-25250: Whenever any Task gets successfully completed, we simply 
mark the
+   * corresponding partition id as completed in all attempts for that 
particular stage and
+   * additionally, for a Result Stage, we also kill the remaining task 
attempts running on the
+   * same partition. As a result, we do not see any Killed tasks due to
+   * TaskCommitDenied Exceptions showing up in the UI. When this method is 
called from
+   * DAGScheduler.scala on a task completion event being fired, it is assumed 
that the new
+   * TaskSet has already been created and registered. However, a small 
possibility does exist
+   * that when this method gets called, possibly the new TaskSet might have 
not been added
+   * to taskSetsByStageIdAndAttempt. In such a case, we might still hit the 
same issue. However,
+   * the above scenario has not yet been reproduced.
+   */
+  override def completeTasks(partitionId: Int, stageId: Int, killTasks: 
Boolean): Unit = {
+    taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm 
=>
+      tsm.partitionToIndex.get(partitionId) match {
+        case Some(index) =>
+          tsm.markPartitionAsAlreadyCompleted(index)
 
 Review comment:
   Having another look at `completeTasks` & `markPartitionAsAlreadyCompleted`, 
given my thoughts below.
   
   First of all, I think we have reached an agreement on killing the running 
attempts of the same partition for the `tsm` whether is zombie or not.
   
   Besides,
   
   * if `tsm` is active and task corresponding to `partitionId` here is its 
last unfinished task
         - the task is not running 
           So, only  mark `successful(index) = true` but do not mark it as 
`zombie` would make it never have a chance to 
           increase `tasksSuccessful` and finish itself later.
         - the task is running
           `tsm` will hanle the killed task at the end, but also can not finish 
itself. Because we won't mark the TaskSet as 
            `zombie` (unless it's a barrier TaskSet) when we handle failed 
task. Also, `tasksSuccessful`.
   
   * if `tsm` is zombie
   Mark `successful(index) = true` is meaningless. Because we've already sent 
killing siganls to thoes tasks, so I believe those zombie TaskSetManager would 
handle them normally at the end.  So, even call `maybeFinishTaskSet()` here is 
useless.
   
   Furthermore, I'm thinking about whether we need to exclude the `tsm` who 
succeed the task firstly and trigger all the stuff subsequently, so that we may 
avoid doing some redundant stuff, e.g. `successful(index) = true`.  **But, we 
do also need to kill tasks for it.**
   
   cc @cloud-fan @pgandhi999  WDYT ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to