Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22288#discussion_r228605235
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -415,9 +420,54 @@ private[spark] class TaskSchedulerImpl(
                 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
               } while (launchedTaskAtCurrentMaxLocality)
             }
    +
             if (!launchedAnyTask) {
    -          taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
    +          taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
    +            case Some(taskIndex) => // Returns the taskIndex which was 
unschedulable
    +
    +              // If the taskSet is unschedulable we try to find an 
existing idle blacklisted
    +              // executor. If we cannot find one, we abort immediately. 
Else we kill the idle
    +              // executor and kick off an abortTimer which if it doesn't 
schedule a task within the
    +              // the timeout will abort the taskSet if we were unable to 
schedule any task from the
    +              // taskSet.
    +              // Note 1: We keep track of schedulability on a per taskSet 
basis rather than on a per
    +              // task basis.
    +              // Note 2: The taskSet can still be aborted when there are 
more than one idle
    +              // blacklisted executors and dynamic allocation is on. This 
can happen when a killed
    +              // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
    +              // pending tasks and doesn't kill executors on idle 
timeouts, resulting in the abort
    +              // timer to expire and abort the taskSet.
    +              executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) 
match {
    +                case Some ((executorId, _)) =>
    +                  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) 
{
    +                    blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))
    +
    +                    val timeout = 
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
    +                    unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis() + timeout
    +                    logInfo(s"Waiting for $timeout ms for completely "
    +                      + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
    +                    abortTimer.schedule(
    +                      createUnschedulableTaskSetAbortTimer(taskSet, 
taskIndex), timeout)
    +                  }
    +                case _ => // Abort Immediately
    +                  logInfo("Cannot schedule any task because of complete 
blacklisting. No idle" +
    +                    s" executors can be found to kill. Aborting $taskSet." 
)
    +                  taskSet.abortSinceCompletelyBlacklisted(taskIndex)
    +              }
    +            case _ => // Do nothing if no tasks completely blacklisted.
    +          }
    +        } else {
    +          // We want to defer killing any taskSets as long as we have a 
non blacklisted executor
    +          // which can be used to schedule a task from any active 
taskSets. This ensures that the
    +          // job can make progress and if we encounter a flawed taskSet it 
will eventually either
    +          // fail or abort due to being completely blacklisted.
    --- End diff --
    
    I think you should say here that you may have a job wait indefinitely, if 
its effectively blacklisted the entire cluster, but other jobs keep coming in 
and keeping resources occupied so the cluster stays busy.  So its not really 
accurate to say that it will be aborted eventually, we are actually *not* 
guaranteeing that (if I understood things correctly).
    
    Since its folded now lemme reference the prior discussion on this: 
https://github.com/apache/spark/pull/22288#discussion_r226477452
    
    >> Want to make sure I understand this part, and why you aren't only 
clearing the timer for the taskset you just scheduled a task for. If you have 
multiple tasksets running simultaneously, one is making progress but the other 
is totally blacklisted, I guess you do not want to kill anything, because that 
would mess with the taskset that is working correctly? Instead you'll just let 
the taskset which is totally blacklisted eventually fail from the timeout? I 
guess that makes sense, because if one taskset is progressing, it means the 
failing taskset probably is probably flawed, not the executors.
    >>
    >>If that's right, would be good to include something along those lines in 
the comment (personally I don't find a comment about how its related to the 
timer that useful, that's obvious from the code).
    >
     >dhruve 7 days ago  Contributor
    >That is correct. It also covers other scenario that @tgravescs originally 
pointed out.
    >
    >Lets say if you have multiple taskSets running which are completely 
blacklisted. If you were able to get an executor, you would just clear the 
timer for that specific taskSet. Now due to resource constraint, if you weren't 
able to obtain another executor within the timeout for the other taskSet, you 
would abort the other taskSet when you could actually wait for it to be 
scheduled on the newly obtained executor.
    >
    > So clearing the timer for all the taskSets ensures that currently we 
aren't in a completely blacklisted state and should try to run to completion. 
However if the taskset itself is flawed, we would eventually fail. This could 
result in wasted effort, but we don't have a way to determine that yet, so this 
should be okay.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to