Github user tnachen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8671#discussion_r39575837
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
 ---
    @@ -202,55 +207,86 @@ private[spark] class MesosSchedulerBackend(
       }
     
       /**
    -   * Method called by Mesos to offer resources on slaves. We respond by 
asking our active task sets
    -   * for tasks in order of priority. We fill each node with tasks in a 
round-robin manner so that
    -   * tasks are balanced across the cluster.
    +   * Return the usable Mesos offers and corresponding WorkerOffers.
    +   *
    +   * This method declines Mesos offers that don't meet minimum cpu, memory 
or attribute
    +   * requirements.
    +   *
    +   * @param d Mesos SchedulerDriver to decline offers
    +   * @param offers Mesos offers to be considered
    +   * @return a pair of Mesos offers and corresponding WorkerOffer that can 
be used by the
    +   *         fine-grained scheduler.
        */
    -  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
    -    inClassLoader() {
    -      // Fail-fast on offers we know will be rejected
    -      val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
    -        val mem = getResource(o.getResourcesList, "mem")
    -        val cpus = getResource(o.getResourcesList, "cpus")
    -        val slaveId = o.getSlaveId.getValue
    -        val offerAttributes = toAttributeMap(o.getAttributesList)
    -
    -        // check if all constraints are satisfield
    -        //  1. Attribute constraints
    -        //  2. Memory requirements
    -        //  3. CPU requirements - need at least 1 for executor, 1 for task
    -        val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
    -        val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
    -        val meetsCPURequirements = cpus >= (mesosExecutorCores + 
scheduler.CPUS_PER_TASK)
    -
    -        val meetsRequirements =
    -          (meetsConstraints && meetsMemoryRequirements && 
meetsCPURequirements) ||
    +  private[spark] def usableWorkerOffers(d: SchedulerDriver,
    +    offers: JList[Offer]): (Seq[Protos.Offer], Seq[WorkerOffer]) = {
    +    // Fail-fast on offers we know will be rejected
    +    val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
    +      val mem = getResource(o.getResourcesList, "mem")
    +      val cpus = getResource(o.getResourcesList, "cpus")
    +      val slaveId = o.getSlaveId.getValue
    +      val offerAttributes = toAttributeMap(o.getAttributesList)
    +
    +      // check if all constraints are satisfield
    +      //  1. Attribute constraints
    +      //  2. Memory requirements
    +      //  3. CPU requirements - need at least 1 for executor, 1 for task
    +      val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
    +      val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
    +      val meetsCPURequirements = cpus >= (mesosExecutorCores + 
scheduler.CPUS_PER_TASK)
    +
    +      val meetsRequirements =
    +        (meetsConstraints && meetsMemoryRequirements && 
meetsCPURequirements) ||
               (slaveIdToExecutorInfo.contains(slaveId) && cpus >= 
scheduler.CPUS_PER_TASK)
     
    -        // add some debug messaging
    -        val debugstr = if (meetsRequirements) "Accepting" else "Declining"
    -        val id = o.getId.getValue
    -        logDebug(s"$debugstr offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus")
    +      // add some debug messaging
    +      val debugstr = if (meetsRequirements) "Accepting" else "Declining"
    +      val id = o.getId.getValue
    +      logDebug(s"$debugstr offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus")
    +
    +      meetsRequirements
    +    }
    +
    +    // Decline offers we ruled out immediately
    +    unUsableOffers.foreach(o => d.declineOffer(o.getId))
    +
    +    var availableCores = Math.max(0, maxCores - totalCoresAcquired)
     
    -        meetsRequirements
    +    val workerOffers = (for (o <- usableOffers) yield {
    +      val coresInOffer = getResource(o.getResourcesList, "cpus")
    +      val cores = if 
(slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
    +        coresInOffer.toInt
    +      } else {
    +        // If the Mesos executor has not been started on this slave yet, 
set aside a few
    +        // cores for the Mesos executor by offering fewer cores to the 
Spark executor
    +        availableCores -= mesosExecutorCores
    +        (coresInOffer - mesosExecutorCores).toInt
           }
     
    -      // Decline offers we ruled out immediately
    -      unUsableOffers.foreach(o => d.declineOffer(o.getId))
    +      // check that we can still acquire cpus
    +      val actualCores = Math.min(availableCores, cores).toInt
    --- End diff --
    
    Will availableCores becomes negative and just starts adding actual Cores?
    I think it's probably safer to check if availableCores < Cores then we just 
return None.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to