Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1487#discussion_r20117032
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
 ---
    @@ -180,53 +194,40 @@ private[spark] class MesosSchedulerBackend(
        * tasks are balanced across the cluster.
        */
       override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
    -    val oldClassLoader = setClassLoader()
    -    try {
    -      synchronized {
    -        // Build a big list of the offerable workers, and remember their 
indices so that we can
    -        // figure out which Offer to reply to for each worker
    -        val offerableWorkers = new ArrayBuffer[WorkerOffer]
    -        val offerableIndices = new HashMap[String, Int]
    -
    -        def enoughMemory(o: Offer) = {
    -          val mem = getResource(o.getResourcesList, "mem")
    -          val slaveId = o.getSlaveId.getValue
    -          mem >= sc.executorMemory || 
slaveIdsWithExecutors.contains(slaveId)
    -        }
    -
    -        for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) 
{
    -          offerableIndices.put(offer.getSlaveId.getValue, index)
    -          offerableWorkers += new WorkerOffer(
    -            offer.getSlaveId.getValue,
    -            offer.getHostname,
    -            getResource(offer.getResourcesList, "cpus").toInt)
    -        }
    -
    -        // Call into the TaskSchedulerImpl
    -        val taskLists = scheduler.resourceOffers(offerableWorkers)
    -
    -        // Build a list of Mesos tasks for each slave
    -        val mesosTasks = offers.map(o => new JArrayList[MesosTaskInfo]())
    -        for ((taskList, index) <- taskLists.zipWithIndex) {
    -          if (!taskList.isEmpty) {
    -            for (taskDesc <- taskList) {
    -              val slaveId = taskDesc.executorId
    -              val offerNum = offerableIndices(slaveId)
    -              slaveIdsWithExecutors += slaveId
    -              taskIdToSlaveId(taskDesc.taskId) = slaveId
    -              mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
    -            }
    -          }
    -        }
    -
    -        // Reply to the offers
    -        val filters = Filters.newBuilder().setRefuseSeconds(1).build() // 
TODO: lower timeout?
    -        for (i <- 0 until offers.size) {
    -          d.launchTasks(Collections.singleton(offers(i).getId), 
mesosTasks(i), filters)
    +    inClassLoader() {
    +      val (acceptedOffers, declinedOffers) = offers.partition(o => {
    +        val mem = getResource(o.getResourcesList, "mem")
    +        val slaveId = o.getSlaveId.getValue
    +        mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
    +      })
    +
    +      val offerableWorkers = acceptedOffers.map(toWorkerOffer)
    +
    +      val slaveIdToOffer = acceptedOffers.map(o => o.getSlaveId.getValue 
-> o).toMap
    +
    +      val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
    +
    +      // Call into the TaskSchedulerImpl
    +      scheduler.resourceOffers(offerableWorkers)
    +        .filter(!_.isEmpty)
    +        .foreach(_.foreach(taskDesc => {
    +        val slaveId = taskDesc.executorId
    +        slaveIdsWithExecutors += slaveId
    +        taskIdToSlaveId(taskDesc.taskId) = slaveId
    +        mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
    +          .add(createMesosTask(taskDesc, slaveId))
    +      }))
    +
    +      // Reply to the offers
    +      val filters = Filters.newBuilder().setRefuseSeconds(1).build() // 
TODO: lower timeout?
    +
    +      mesosTasks.foreach {
    +        case (slaveId, tasks) => {
    +          
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, 
filters)
             }
    --- End diff --
    
    Style should be
    ```
    mesosTasks.foreach { case (slaveId, tasks) =>
      d.launchTasks(...)
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to