Github user dragos commented on a diff in the pull request:
https://github.com/apache/spark/pull/8671#discussion_r39605193
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
---
@@ -202,55 +207,86 @@ private[spark] class MesosSchedulerBackend(
}
/**
- * Method called by Mesos to offer resources on slaves. We respond by
asking our active task sets
- * for tasks in order of priority. We fill each node with tasks in a
round-robin manner so that
- * tasks are balanced across the cluster.
+ * Return the usable Mesos offers and corresponding WorkerOffers.
+ *
+ * This method declines Mesos offers that don't meet minimum cpu, memory
or attribute
+ * requirements.
+ *
+ * @param d Mesos SchedulerDriver to decline offers
+ * @param offers Mesos offers to be considered
+ * @return a pair of Mesos offers and corresponding WorkerOffer that can
be used by the
+ * fine-grained scheduler.
*/
- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
- inClassLoader() {
- // Fail-fast on offers we know will be rejected
- val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
- val mem = getResource(o.getResourcesList, "mem")
- val cpus = getResource(o.getResourcesList, "cpus")
- val slaveId = o.getSlaveId.getValue
- val offerAttributes = toAttributeMap(o.getAttributesList)
-
- // check if all constraints are satisfield
- // 1. Attribute constraints
- // 2. Memory requirements
- // 3. CPU requirements - need at least 1 for executor, 1 for task
- val meetsConstraints =
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
- val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
- val meetsCPURequirements = cpus >= (mesosExecutorCores +
scheduler.CPUS_PER_TASK)
-
- val meetsRequirements =
- (meetsConstraints && meetsMemoryRequirements &&
meetsCPURequirements) ||
+ private[spark] def usableWorkerOffers(d: SchedulerDriver,
+ offers: JList[Offer]): (Seq[Protos.Offer], Seq[WorkerOffer]) = {
+ // Fail-fast on offers we know will be rejected
+ val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
+ val mem = getResource(o.getResourcesList, "mem")
+ val cpus = getResource(o.getResourcesList, "cpus")
+ val slaveId = o.getSlaveId.getValue
+ val offerAttributes = toAttributeMap(o.getAttributesList)
+
+ // check if all constraints are satisfield
+ // 1. Attribute constraints
+ // 2. Memory requirements
+ // 3. CPU requirements - need at least 1 for executor, 1 for task
+ val meetsConstraints =
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+ val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
+ val meetsCPURequirements = cpus >= (mesosExecutorCores +
scheduler.CPUS_PER_TASK)
+
+ val meetsRequirements =
+ (meetsConstraints && meetsMemoryRequirements &&
meetsCPURequirements) ||
(slaveIdToExecutorInfo.contains(slaveId) && cpus >=
scheduler.CPUS_PER_TASK)
- // add some debug messaging
- val debugstr = if (meetsRequirements) "Accepting" else "Declining"
- val id = o.getId.getValue
- logDebug(s"$debugstr offer: $id with attributes: $offerAttributes
mem: $mem cpu: $cpus")
+ // add some debug messaging
+ val debugstr = if (meetsRequirements) "Accepting" else "Declining"
+ val id = o.getId.getValue
+ logDebug(s"$debugstr offer: $id with attributes: $offerAttributes
mem: $mem cpu: $cpus")
+
+ meetsRequirements
+ }
+
+ // Decline offers we ruled out immediately
+ unUsableOffers.foreach(o => d.declineOffer(o.getId))
+
+ var availableCores = Math.max(0, maxCores - totalCoresAcquired)
- meetsRequirements
+ val workerOffers = (for (o <- usableOffers) yield {
+ val coresInOffer = getResource(o.getResourcesList, "cpus")
+ val cores = if
(slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
+ coresInOffer.toInt
+ } else {
+ // If the Mesos executor has not been started on this slave yet,
set aside a few
+ // cores for the Mesos executor by offering fewer cores to the
Spark executor
+ availableCores -= mesosExecutorCores
+ (coresInOffer - mesosExecutorCores).toInt
}
- // Decline offers we ruled out immediately
- unUsableOffers.foreach(o => d.declineOffer(o.getId))
+ // check that we can still acquire cpus
+ val actualCores = Math.min(availableCores, cores).toInt
--- End diff --
Ok, I'll refactor this condition.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]