Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15505#discussion_r103566031
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
    @@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       }
     }
     
    -private[spark] object CoarseGrainedSchedulerBackend {
    +private[spark] object CoarseGrainedSchedulerBackend extends Logging {
       val ENDPOINT_NAME = "CoarseGrainedScheduler"
    +
    +  // abort TaskSetManager without exception
    +  private def abortTaskSetManager(
    +      scheduler: TaskSchedulerImpl,
    +      taskId: Long,
    +      msg: => String,
    +      exception: Option[Throwable] = None): Unit = scheduler.synchronized {
    +    scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr =>
    +      try {
    +        taskSetMgr.abort(msg, exception)
    +      } catch {
    +        case e: Exception => logError("Exception in error callback", e)
    +      }
    +    }
    +  }
    +
    +  private def isZombieTaskSetManager(
    +    scheduler: TaskSchedulerImpl,
    +    taskId: Long): Unit = scheduler.synchronized {
    +    !scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie)
    +  }
    +
    +  private def getTaskSetManager(
    +    scheduler: TaskSchedulerImpl,
    +    taskId: Long): Option[TaskSetManager] = scheduler.synchronized {
    +    scheduler.taskIdToTaskSetManager.get(taskId)
    +  }
    +
    +  private[scheduler] def prepareSerializedTask(
    --- End diff --
    
    The other scheduler backends should not be importing from 
CoarseGrainedSchedulerBackend -- if there's a shared function like this that 
they all need to use, it should be placed in an obviously shared place 
(otherwise it's too hard to track down where things are used.)
    
    Also, I don't think this is quite the right shared function to have -- 
since all of the backends also need the shared functionality of iterating 
through all of the tasks and deciding which ones to abort.  Why not move this 
functionality to TaskSchedulerImpl.scala?  That's a more natural place to 
handle aborting task sets.  Specifically, I'd propose adding something like 
`makeOffersAndSerializeTasks`to `TaskSchedulerImpl` that is not synchronized, 
and handles calling `resourceOffers` and then doing the serialization (and any 
necessary aborting).  That also allows you to eliminate `getTaskSetManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to