Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14079#discussion_r72534813
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -236,29 +245,42 @@ private[spark] class TaskSchedulerImpl(
        * given TaskSetManager have completed, so state associated with the 
TaskSetManager should be
        * cleaned up.
        */
    -  def taskSetFinished(manager: TaskSetManager): Unit = synchronized {
    +  def taskSetFinished(manager: TaskSetManager, success: Boolean): Unit = 
synchronized {
         taskSetsByStageIdAndAttempt.get(manager.taskSet.stageId).foreach { 
taskSetsForStage =>
           taskSetsForStage -= manager.taskSet.stageAttemptId
           if (taskSetsForStage.isEmpty) {
             taskSetsByStageIdAndAttempt -= manager.taskSet.stageId
           }
         }
         manager.parent.removeSchedulable(manager)
    -    logInfo("Removed TaskSet %s, whose tasks have all completed, from pool 
%s"
    -      .format(manager.taskSet.id, manager.parent.name))
    +    if (success) {
    +      blacklistTracker.foreach(_.taskSetSucceeded(manager.execToFailures))
    +      logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have 
all completed, from pool" +
    +        s" ${manager.parent.name}")
    +    } else {
    +      logInfo(s"Removed TaskSet ${manager.taskSet.id}, since it failed, 
from pool" +
    +        s" ${manager.parent.name}")
    +    }
       }
     
       private def resourceOfferSingleTaskSet(
           taskSet: TaskSetManager,
           maxLocality: TaskLocality,
    -      shuffledOffers: Seq[WorkerOffer],
    +      shuffledOffers: IndexedSeq[WorkerOffer],
           availableCpus: Array[Int],
           tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
         var launchedTask = false
    +    // nodes and executors that are blacklisted for the entire application 
have already been
    +    // filtered out by this point
         for (i <- 0 until shuffledOffers.size) {
    -      val execId = shuffledOffers(i).executorId
    -      val host = shuffledOffers(i).host
    -      if (availableCpus(i) >= CPUS_PER_TASK) {
    +      val offer = shuffledOffers(i)
    +      val host = offer.host
    +      val execId = offer.executorId
    +      val offerBlacklisted = blacklistTracker.map { bl =>
    +        taskSet.isNodeBlacklistedForTaskSet(host) ||
    +          taskSet.isExecutorBlacklistedForTaskSet(execId)
    +      }.getOrElse(false)
    +      if (!offerBlacklisted && availableCpus(i) >= CPUS_PER_TASK) {
    --- End diff --
    
    It seems more intuitive to me to move all of the taskset-specific 
blacklisting to the TaskSetManager, since in general that class encompasses all 
of the taskset-specific logic.  E.g., taskSet.abortIfCompletelyBlacklisted 
could itself check with the blacklist tracker.  That would also allow the 
TaskSetMAnager to hide some complexity, like checking if the executor is 
blacklisted for the task set.
    
    Calling taskSetSucceeded could also move to the TaskSetManager, so that 
"success" didn't need to be passed into taskSetFinished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to