[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-12 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r52793626
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: 
SlaveID, b: Array[Byte]) {}
 
   /**
-   * Called when a slave is lost or a Mesos task finished. Update local 
view on
-   * what tasks are running and remove the terminated slave from the list 
of pending
-   * slave IDs that we might have asked to be killed. It also notifies the 
driver
-   * that an executor was removed.
+   * Called when a slave is lost or a Mesos task finished. Updates local 
view on
+   * what tasks are running. It also notifies the driver that an executor 
was removed.
*/
-  private def executorTerminated(d: SchedulerDriver, slaveId: String, 
reason: String): Unit = {
+  private def executorTerminated(d: SchedulerDriver,
+ slaveId: String,
+ taskId: String,
+ reason: String): Unit = {
 stateLock.synchronized {
-  if (slaveIdsWithExecutors.contains(slaveId)) {
-val slaveIdToTaskId = taskIdToSlaveId.inverse()
-if (slaveIdToTaskId.containsKey(slaveId)) {
-  val taskId: Int = slaveIdToTaskId.get(slaveId)
-  taskIdToSlaveId.remove(taskId)
-  removeExecutor(sparkExecutorId(slaveId, taskId.toString), 
SlaveLost(reason))
-}
-// TODO: This assumes one Spark executor per Mesos slave,
-// which may no longer be true after SPARK-5095
-pendingRemovedSlaveIds -= slaveId
-slaveIdsWithExecutors -= slaveId
-  }
+  removeExecutor(taskId, SlaveLost(reason))
+  slaves(slaveId).taskIDs.remove(taskId)
--- End diff --

1) It's not any more of an issue than it was before.  I didn't add any 

2) Total memory accrual is O(slaves on which this driver ever launches an 
executor).  This is bounded by the number of slaves in the cluster.  Largest 
known Mesos cluster is ~50k slaves.  If a Spark job somehow spans the entire 
cluster (highly unlikely), this object will grow to ~50k * sizeof(slaveID) ~= 
1.5MB.  I think we're fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r52503772
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -89,13 +82,11 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   private[mesos] def executorLimit: Int = 
executorLimitOption.getOrElse(Int.MaxValue)
 
-  private val pendingRemovedSlaveIds = new HashSet[String]
--- End diff --

nice


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r52504852
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
 ---
@@ -37,6 +41,223 @@ class CoarseMesosSchedulerBackendSuite extends 
SparkFunSuite
 with MockitoSugar
 with BeforeAndAfter {
 
+  var sparkConf: SparkConf = _
+  var driver: SchedulerDriver = _
+  var taskScheduler: TaskSchedulerImpl = _
+  var backend: CoarseMesosSchedulerBackend = _
+  var externalShuffleClient: MesosExternalShuffleClient = _
+  var driverEndpoint: RpcEndpointRef = _
--- End diff --

please make all of these private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r52504373
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -410,40 +507,25 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: 
SlaveID, b: Array[Byte]) {}
 
   /**
-   * Called when a slave is lost or a Mesos task finished. Update local 
view on
-   * what tasks are running and remove the terminated slave from the list 
of pending
-   * slave IDs that we might have asked to be killed. It also notifies the 
driver
-   * that an executor was removed.
+   * Called when a slave is lost or a Mesos task finished. Updates local 
view on
+   * what tasks are running. It also notifies the driver that an executor 
was removed.
*/
-  private def executorTerminated(d: SchedulerDriver, slaveId: String, 
reason: String): Unit = {
+  private def executorTerminated(d: SchedulerDriver,
+ slaveId: String,
+ taskId: String,
+ reason: String): Unit = {
--- End diff --

style:
```
private def executorTerminated(
d: SchedulerDriver,
slaveId: String,
taskId: String,
reason: String): Unit = {
}

```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/10993


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r52503444
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -113,27 +107,31 @@ private[spark] class CoarseMesosSchedulerBackend(
   private val rejectOfferDurationForUnmetConstraints =
 getRejectOfferDurationForUnmetConstraints(sc)
 
-  // A client for talking to the external shuffle service, if it is a
+  // A client for talking to the external shuffle service
   private val mesosExternalShuffleClient: 
Option[MesosExternalShuffleClient] = {
 if (shuffleServiceEnabled) {
-  Some(new MesosExternalShuffleClient(
-SparkTransportConf.fromSparkConf(conf, "shuffle"),
-securityManager,
-securityManager.isAuthenticationEnabled(),
-securityManager.isSaslEncryptionEnabled()))
+  Some(getShuffleClient())
 } else {
   None
 }
   }
 
+  protected def getShuffleClient(): MesosExternalShuffleClient = {
--- End diff --

is this exposed so we can test it? If so we should add a comment and say so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r52503364
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -73,17 +73,13 @@ private[spark] class CoarseMesosSchedulerBackend(
   private val shuffleServiceEnabled = 
conf.getBoolean("spark.shuffle.service.enabled", false)
 
   // Cores we have acquired with each Mesos task ID
-  val coresByTaskId = new HashMap[Int, Int]
+  val coresByTaskId = new HashMap[String, Int]
   var totalCoresAcquired = 0
 
-  val slaveIdsWithExecutors = new HashSet[String]
-
-  // Maping from slave Id to hostname
-  private val slaveIdToHost = new HashMap[String, String]
-
-  val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, 
String]
-  // How many times tasks on each slave failed
-  val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]
+  // SlaveID -> Slave
+  // This map accumulates entries for the duration of the job.  Slaves are 
never deleted, because
+  // we need to maintain e.g. failure state and connection state.
+  private val slaves = new HashMap[String, Slave]
--- End diff --

elsewhere in Spark we would call this class `SlaveInfo` instead of just 
`Slave`, so we don't confuse it with the Mesos Slave


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182528576
  
LGTM merging into master. @mgummelt feel free to address the remainder of 
the comments in a follow-up patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r52504536
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: 
SlaveID, b: Array[Byte]) {}
 
   /**
-   * Called when a slave is lost or a Mesos task finished. Update local 
view on
-   * what tasks are running and remove the terminated slave from the list 
of pending
-   * slave IDs that we might have asked to be killed. It also notifies the 
driver
-   * that an executor was removed.
+   * Called when a slave is lost or a Mesos task finished. Updates local 
view on
+   * what tasks are running. It also notifies the driver that an executor 
was removed.
*/
-  private def executorTerminated(d: SchedulerDriver, slaveId: String, 
reason: String): Unit = {
+  private def executorTerminated(d: SchedulerDriver,
+ slaveId: String,
+ taskId: String,
+ reason: String): Unit = {
 stateLock.synchronized {
-  if (slaveIdsWithExecutors.contains(slaveId)) {
-val slaveIdToTaskId = taskIdToSlaveId.inverse()
-if (slaveIdToTaskId.containsKey(slaveId)) {
-  val taskId: Int = slaveIdToTaskId.get(slaveId)
-  taskIdToSlaveId.remove(taskId)
-  removeExecutor(sparkExecutorId(slaveId, taskId.toString), 
SlaveLost(reason))
-}
-// TODO: This assumes one Spark executor per Mesos slave,
-// which may no longer be true after SPARK-5095
-pendingRemovedSlaveIds -= slaveId
-slaveIdsWithExecutors -= slaveId
-  }
+  removeExecutor(taskId, SlaveLost(reason))
+  slaves(slaveId).taskIDs.remove(taskId)
--- End diff --

is this still an issue (though not related to this patch)? If so can one of 
you file a JIRA about this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread mgummelt
Github user mgummelt commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182589806
  
Thanks for merging.  Will this go into 1.6.1, or not until 2.0?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182592130
  
This is a big new feature. It will not go into a maintenance release 
(1.6.1).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r52504157
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -260,113 +257,209 @@ private[spark] class CoarseMesosSchedulerBackend(
 offers.asScala.map(_.getId).foreach(d.declineOffer)
 return
   }
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]): Unit = {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  /**
+* Launches executors on accepted offers, and declines unused offers. 
Executors are launched
+* round-robin on offers.
+*
+* @param d SchedulerDriver
+* @param offers Mesos offers that match attribute constraints
+*/
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]): Unit = {
+val tasks = buildMesosTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  /**
+* Returns a map from OfferIDs to the tasks to launch on those offers.  
In order to maximize
+* per-task memory and IO, tasks are round-robin assigned to offers.
+*
+* @param offers Mesos offers that match attribute constraints
+* @return A map from OfferID to a list of Mesos tasks to launch on 
that offer
+*/
+  private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = mutable.Map(offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
+
+var launchTasks = true
+
+// TODO(mgummelt): combine offers for a single slave
+//
+// round-robin create executors on the available offers
+while (launchTasks) {
+  launchTasks = false
+
+  for (offer <- offers) {
 val slaveId = offer.getSlaveId.getValue
-val mem = getResource(offer.getResourcesList, "mem")
-val cpus = getResource(offer.getResourcesList, "cpus").toInt
-val id = offer.getId.getValue
-if (meetsConstraints) {
-  if (taskIdToSlaveId.size < executorLimit &&
-  totalCoresAcquired < 

[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182645311
  
@mgummelt looks like this caused a flaky test:

https://spark-tests.appspot.com/tests/org.apache.spark.scheduler.cluster.mesos.CoarseMesosSchedulerBackendSuite/mesos%20kills%20an%20executor%20when%20told

Do you have the bandwidth to fix it quickly? If not I'll just revert this 
patch for now and we can resubmit it later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread Astralidea
Github user Astralidea commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182673482
  
@mgummelt Great Work! I think this feature will allow more people to use 
mesos. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread mgummelt
Github user mgummelt commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182673851
  
I haven't found the problem, but here's a PR to remove the test in the 
interim #11164

It's a strange test to be flaky.  It's very simple.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread mgummelt
Github user mgummelt commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182671529
  
looking into it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-10 Thread mgummelt
Github user mgummelt commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182674567
  
Ah, I see the issue.  There's a thread causing a race.

I won't be able to fix until tomorrow, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-09 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r52352874
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -260,113 +257,208 @@ private[spark] class CoarseMesosSchedulerBackend(
 offers.asScala.map(_.getId).foreach(d.declineOffer)
 return
   }
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]): Unit = {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  /**
+* Launches executors on accepted offers, and declines unused offers. 
Executors are launched
+* round-robin on offers.
+*
+* @param d SchedulerDriver
+* @param offers Mesos offers that match attribute constraints
+*/
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]): Unit = {
+val tasks = buildMesosTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  /**
+* Returns a map from OfferIDs to the tasks to launch on those offers.  
In order to maximize
+* per-task memory and IO, tasks are round-robin assigned to offers.
+*
+* @param offers Mesos offers that match attribute constraints
+* @return A map from OfferID to a list of Mesos tasks to launch on 
that offer
+*/
+  private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = mutable.Map(offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
+
+var launchTasks = true
+
+// TODO(mgummelt): combine offers for a single slave
+//
+// round-robin create executors on the available offers
+while (launchTasks) {
+  launchTasks = false
+
+  for (offer <- offers) {
 val slaveId = offer.getSlaveId.getValue
-val mem = getResource(offer.getResourcesList, "mem")
-val cpus = getResource(offer.getResourcesList, "cpus").toInt
-val id = offer.getId.getValue
-if (meetsConstraints) {
-  if (taskIdToSlaveId.size < executorLimit &&
-  totalCoresAcquired < 

[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182003212
  
**[Test build #50985 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50985/consoleFull)**
 for PR 10993 at commit 
[`ecad77a`](https://github.com/apache/spark/commit/ecad77a6ac85892f1155f596e84729342e484088).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-09 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-181999051
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182059314
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182059321
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50985/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182065042
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182065047
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50984/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182064048
  
**[Test build #50984 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50984/consoleFull)**
 for PR 10993 at commit 
[`ecad77a`](https://github.com/apache/spark/commit/ecad77a6ac85892f1155f596e84729342e484088).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182058487
  
**[Test build #50985 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50985/consoleFull)**
 for PR 10993 at commit 
[`ecad77a`](https://github.com/apache/spark/commit/ecad77a6ac85892f1155f596e84729342e484088).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-182001548
  
**[Test build #50984 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50984/consoleFull)**
 for PR 10993 at commit 
[`ecad77a`](https://github.com/apache/spark/commit/ecad77a6ac85892f1155f596e84729342e484088).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-09 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r52422595
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -260,113 +257,208 @@ private[spark] class CoarseMesosSchedulerBackend(
 offers.asScala.map(_.getId).foreach(d.declineOffer)
 return
   }
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]): Unit = {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  /**
+* Launches executors on accepted offers, and declines unused offers. 
Executors are launched
+* round-robin on offers.
+*
+* @param d SchedulerDriver
+* @param offers Mesos offers that match attribute constraints
+*/
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]): Unit = {
+val tasks = buildMesosTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  /**
+* Returns a map from OfferIDs to the tasks to launch on those offers.  
In order to maximize
+* per-task memory and IO, tasks are round-robin assigned to offers.
+*
+* @param offers Mesos offers that match attribute constraints
+* @return A map from OfferID to a list of Mesos tasks to launch on 
that offer
+*/
+  private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = mutable.Map(offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
+
+var launchTasks = true
+
+// TODO(mgummelt): combine offers for a single slave
+//
+// round-robin create executors on the available offers
+while (launchTasks) {
+  launchTasks = false
+
+  for (offer <- offers) {
 val slaveId = offer.getSlaveId.getValue
-val mem = getResource(offer.getResourcesList, "mem")
-val cpus = getResource(offer.getResourcesList, "cpus").toInt
-val id = offer.getId.getValue
-if (meetsConstraints) {
-  if (taskIdToSlaveId.size < executorLimit &&
-  totalCoresAcquired < 

[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-08 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r52188015
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -260,113 +257,208 @@ private[spark] class CoarseMesosSchedulerBackend(
 offers.asScala.map(_.getId).foreach(d.declineOffer)
 return
   }
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]): Unit = {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  /**
+* Launches executors on accepted offers, and declines unused offers. 
Executors are launched
+* round-robin on offers.
+*
+* @param d SchedulerDriver
+* @param offers Mesos offers that match attribute constraints
+*/
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]): Unit = {
+val tasks = buildMesosTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  /**
+* Returns a map from OfferIDs to the tasks to launch on those offers.  
In order to maximize
+* per-task memory and IO, tasks are round-robin assigned to offers.
+*
+* @param offers Mesos offers that match attribute constraints
+* @return A map from OfferID to a list of Mesos tasks to launch on 
that offer
+*/
+  private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = mutable.Map(offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
+
+var launchTasks = true
+
+// TODO(mgummelt): combine offers for a single slave
+//
+// round-robin create executors on the available offers
+while (launchTasks) {
+  launchTasks = false
+
+  for (offer <- offers) {
 val slaveId = offer.getSlaveId.getValue
-val mem = getResource(offer.getResourcesList, "mem")
-val cpus = getResource(offer.getResourcesList, "cpus").toInt
-val id = offer.getId.getValue
-if (meetsConstraints) {
-  if (taskIdToSlaveId.size < executorLimit &&
-  totalCoresAcquired < 

[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-08 Thread tnachen
Github user tnachen commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-181444389
  
Just one comment, overall LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-08 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r52188465
  
--- Diff: docs/configuration.md ---
@@ -825,13 +825,18 @@ Apart from these, the following properties are also 
available, and may be useful
 
 
   spark.executor.cores
-  1 in YARN mode, all the available cores on the worker in standalone 
mode.
   
-The number of cores to use on each executor. For YARN and standalone 
mode only.
+1 in YARN mode, all the available cores on the worker in
+standalone and Mesos coarse-grained modes.
--- End diff --

We don't have multiple coarse-grained modes right? Just mode?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-06 Thread dragos
Github user dragos commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180752078
  
LGTM! Great work, @mgummelt!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180510322
  
**[Test build #50833 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50833/consoleFull)**
 for PR 10993 at commit 
[`7e3f39d`](https://github.com/apache/spark/commit/7e3f39d92b3f2159b5b2682ab7bdbc0954cc3adb).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-05 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180505172
  
Yeah @mgummelt we have lots of those. Welcome to Spark :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-05 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r52059914
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: 
SlaveID, b: Array[Byte]) {}
 
   /**
-   * Called when a slave is lost or a Mesos task finished. Update local 
view on
-   * what tasks are running and remove the terminated slave from the list 
of pending
-   * slave IDs that we might have asked to be killed. It also notifies the 
driver
-   * that an executor was removed.
+   * Called when a slave is lost or a Mesos task finished. Updates local 
view on
+   * what tasks are running. It also notifies the driver that an executor 
was removed.
*/
-  private def executorTerminated(d: SchedulerDriver, slaveId: String, 
reason: String): Unit = {
+  private def executorTerminated(d: SchedulerDriver,
+ slaveId: String,
+ taskId: String,
+ reason: String): Unit = {
 stateLock.synchronized {
-  if (slaveIdsWithExecutors.contains(slaveId)) {
-val slaveIdToTaskId = taskIdToSlaveId.inverse()
-if (slaveIdToTaskId.containsKey(slaveId)) {
-  val taskId: Int = slaveIdToTaskId.get(slaveId)
-  taskIdToSlaveId.remove(taskId)
-  removeExecutor(sparkExecutorId(slaveId, taskId.toString), 
SlaveLost(reason))
-}
-// TODO: This assumes one Spark executor per Mesos slave,
-// which may no longer be true after SPARK-5095
-pendingRemovedSlaveIds -= slaveId
-slaveIdsWithExecutors -= slaveId
-  }
+  removeExecutor(taskId, SlaveLost(reason))
+  slaves(slaveId).taskIDs.remove(taskId)
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-05 Thread mgummelt
Github user mgummelt commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180508893
  
@andrewor14 Glad to be here!  Flaky tests or no

I think all concerns have been addressed except for dynamic allocation 
testing, which seems to be broken entirely: SPARK-12583

@dragos Any other comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-05 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180505004
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180511868
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180560007
  
**[Test build #50833 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50833/consoleFull)**
 for PR 10993 at commit 
[`7e3f39d`](https://github.com/apache/spark/commit/7e3f39d92b3f2159b5b2682ab7bdbc0954cc3adb).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180511871
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50831/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180560552
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180560557
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50833/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-05 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51989924
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: 
SlaveID, b: Array[Byte]) {}
 
   /**
-   * Called when a slave is lost or a Mesos task finished. Update local 
view on
-   * what tasks are running and remove the terminated slave from the list 
of pending
-   * slave IDs that we might have asked to be killed. It also notifies the 
driver
-   * that an executor was removed.
+   * Called when a slave is lost or a Mesos task finished. Updates local 
view on
+   * what tasks are running. It also notifies the driver that an executor 
was removed.
*/
-  private def executorTerminated(d: SchedulerDriver, slaveId: String, 
reason: String): Unit = {
+  private def executorTerminated(d: SchedulerDriver,
+ slaveId: String,
+ taskId: String,
+ reason: String): Unit = {
 stateLock.synchronized {
-  if (slaveIdsWithExecutors.contains(slaveId)) {
-val slaveIdToTaskId = taskIdToSlaveId.inverse()
-if (slaveIdToTaskId.containsKey(slaveId)) {
-  val taskId: Int = slaveIdToTaskId.get(slaveId)
-  taskIdToSlaveId.remove(taskId)
-  removeExecutor(sparkExecutorId(slaveId, taskId.toString), 
SlaveLost(reason))
-}
-// TODO: This assumes one Spark executor per Mesos slave,
-// which may no longer be true after SPARK-5095
-pendingRemovedSlaveIds -= slaveId
-slaveIdsWithExecutors -= slaveId
-  }
+  removeExecutor(taskId, SlaveLost(reason))
+  slaves(slaveId).taskIDs.remove(taskId)
--- End diff --

I missed this fact, I agree it makes sense to keep slave instances without 
tasks. Thanks for explaining it! Would you mind adding a line about it in the 
definition of `slaves`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180051489
  
**[Test build #50770 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50770/consoleFull)**
 for PR 10993 at commit 
[`b587f8f`](https://github.com/apache/spark/commit/b587f8fb98edaaedc9e4cfc93dee5cc79ce2e362).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180076930
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180076934
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50764/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180088967
  
**[Test build #50770 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50770/consoleFull)**
 for PR 10993 at commit 
[`b587f8f`](https://github.com/apache/spark/commit/b587f8fb98edaaedc9e4cfc93dee5cc79ce2e362).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180089190
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180089197
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50770/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180076446
  
**[Test build #50764 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50764/consoleFull)**
 for PR 10993 at commit 
[`4857e57`](https://github.com/apache/spark/commit/4857e570489f61f207e62246e6fb1ceeff943095).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread mgummelt
Github user mgummelt commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180103124
  
Looks like it failed a network test.  Flaky test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread dragos
Github user dragos commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-179843161
  
@Astralidea it will deploy more than one executor on the same slave if 
there are enough resources and `spark.cores.max` wasn't reached yet. It's just 
that it will first spawn executors on each eligible slave in the current set of 
offers, and continue iterating until one of the stop conditions is hit (not 
enough resources or the `spark.cores.max` limit).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51923243
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,113 +240,207 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]): Unit = {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  /**
+* Launches executors on accepted offers, and declines unused offers. 
Executors are launched
+* round-robin on offers.
+*
+* @param d SchedulerDriver
+* @param offers Mesos offers that match attribute constraints
+*/
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]): Unit = {
+val tasks = getMesosTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  /**
+* Returns a map from OfferIDs to the tasks to launch on those offers.  
In order to maximize
+* per-task memory and IO, tasks are round-robin assigned to offers.
+*
+* @param offers Mesos offers that match attribute constraints
+* @return A map from OfferID to a list of Mesos tasks to launch on 
that offer
+*/
+  private def getMesosTasks(offers: Buffer[Offer]): Map[OfferID, 
List[MesosTaskInfo]] = {
--- End diff --

Sure.  Changed to `build`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51923784
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: 
SlaveID, b: Array[Byte]) {}
 
   /**
-   * Called when a slave is lost or a Mesos task finished. Update local 
view on
-   * what tasks are running and remove the terminated slave from the list 
of pending
-   * slave IDs that we might have asked to be killed. It also notifies the 
driver
-   * that an executor was removed.
+   * Called when a slave is lost or a Mesos task finished. Updates local 
view on
+   * what tasks are running. It also notifies the driver that an executor 
was removed.
*/
-  private def executorTerminated(d: SchedulerDriver, slaveId: String, 
reason: String): Unit = {
+  private def executorTerminated(d: SchedulerDriver,
+ slaveId: String,
+ taskId: String,
+ reason: String): Unit = {
 stateLock.synchronized {
-  if (slaveIdsWithExecutors.contains(slaveId)) {
-val slaveIdToTaskId = taskIdToSlaveId.inverse()
-if (slaveIdToTaskId.containsKey(slaveId)) {
-  val taskId: Int = slaveIdToTaskId.get(slaveId)
-  taskIdToSlaveId.remove(taskId)
-  removeExecutor(sparkExecutorId(slaveId, taskId.toString), 
SlaveLost(reason))
-}
-// TODO: This assumes one Spark executor per Mesos slave,
-// which may no longer be true after SPARK-5095
-pendingRemovedSlaveIds -= slaveId
-slaveIdsWithExecutors -= slaveId
-  }
+  removeExecutor(taskId, SlaveLost(reason))
+  slaves(slaveId).taskIDs.remove(taskId)
--- End diff --

We want to keep the `taskFailures` and `shuffleRegistered` even when there 
are no `taskIDs`, so I don't think this will work


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51924965
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: 
SlaveID, b: Array[Byte]) {}
 
   /**
-   * Called when a slave is lost or a Mesos task finished. Update local 
view on
-   * what tasks are running and remove the terminated slave from the list 
of pending
-   * slave IDs that we might have asked to be killed. It also notifies the 
driver
-   * that an executor was removed.
+   * Called when a slave is lost or a Mesos task finished. Updates local 
view on
+   * what tasks are running. It also notifies the driver that an executor 
was removed.
*/
-  private def executorTerminated(d: SchedulerDriver, slaveId: String, 
reason: String): Unit = {
+  private def executorTerminated(d: SchedulerDriver,
+ slaveId: String,
+ taskId: String,
+ reason: String): Unit = {
 stateLock.synchronized {
-  if (slaveIdsWithExecutors.contains(slaveId)) {
-val slaveIdToTaskId = taskIdToSlaveId.inverse()
-if (slaveIdToTaskId.containsKey(slaveId)) {
-  val taskId: Int = slaveIdToTaskId.get(slaveId)
-  taskIdToSlaveId.remove(taskId)
-  removeExecutor(sparkExecutorId(slaveId, taskId.toString), 
SlaveLost(reason))
-}
-// TODO: This assumes one Spark executor per Mesos slave,
-// which may no longer be true after SPARK-5095
-pendingRemovedSlaveIds -= slaveId
-slaveIdsWithExecutors -= slaveId
-  }
+  removeExecutor(taskId, SlaveLost(reason))
+  slaves(slaveId).taskIDs.remove(taskId)
--- End diff --

a bigger issue is with streaming jobs, which may run for many days. If 
there's state we never clean up then this will cause an OOM on the driver. This 
kind of leak has happened before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread mgummelt
Github user mgummelt commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180021501
  
resolved merge conflicts with #10319 
fixed the method naming issue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51927502
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: 
SlaveID, b: Array[Byte]) {}
 
   /**
-   * Called when a slave is lost or a Mesos task finished. Update local 
view on
-   * what tasks are running and remove the terminated slave from the list 
of pending
-   * slave IDs that we might have asked to be killed. It also notifies the 
driver
-   * that an executor was removed.
+   * Called when a slave is lost or a Mesos task finished. Updates local 
view on
+   * what tasks are running. It also notifies the driver that an executor 
was removed.
*/
-  private def executorTerminated(d: SchedulerDriver, slaveId: String, 
reason: String): Unit = {
+  private def executorTerminated(d: SchedulerDriver,
+ slaveId: String,
+ taskId: String,
+ reason: String): Unit = {
 stateLock.synchronized {
-  if (slaveIdsWithExecutors.contains(slaveId)) {
-val slaveIdToTaskId = taskIdToSlaveId.inverse()
-if (slaveIdToTaskId.containsKey(slaveId)) {
-  val taskId: Int = slaveIdToTaskId.get(slaveId)
-  taskIdToSlaveId.remove(taskId)
-  removeExecutor(sparkExecutorId(slaveId, taskId.toString), 
SlaveLost(reason))
-}
-// TODO: This assumes one Spark executor per Mesos slave,
-// which may no longer be true after SPARK-5095
-pendingRemovedSlaveIds -= slaveId
-slaveIdsWithExecutors -= slaveId
-  }
+  removeExecutor(taskId, SlaveLost(reason))
+  slaves(slaveId).taskIDs.remove(taskId)
--- End diff --

At the very least, this isn't a regression, because the previous SlaveID 
HashMaps were not cleaned up either.

But even if we wanted to change that in this PR, how would we maintain the 
current behaviour where the driver blacklists slaves and only registered with a 
shuffle service once?  We need `taskFailures` and `shuffleRegistered` state for 
the lifetime of the driver.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread mgummelt
Github user mgummelt commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180023827
  
@Astralidea You can't guarantee that receivers run on different nodes even 
with Coarse-Grained Spark as it exists today.  One executor running on a slave 
does not guarantee that one Spark task will run on a slave.

I have some new config vars in mind that will solve this problem, as well 
as other scheduling problems, though:
spark.mesos.executor.max_memory
spark.mesos.memory.min_per_core
spark.mesos.memory.max_per_core
spark.mesos.cores.max_per_node

I think these 4 new config vars will capture any constraints a user has.  
For example, you can guarantee one receiver per node by setting 
spark.mesos.cores.max_per_node == spark.task.cores

But this is a discussion that should be moved to JIRA



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180026167
  
**[Test build #50764 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50764/consoleFull)**
 for PR 10993 at commit 
[`4857e57`](https://github.com/apache/spark/commit/4857e570489f61f207e62246e6fb1ceeff943095).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread drcrallen
Github user drcrallen commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180024934
  
@mgummelt I had done limits for memory per core in 
https://github.com/apache/spark/pull/10232 in response to 
https://issues.apache.org/jira/browse/SPARK-12248 but totally forgot to fix the 
spark PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51934657
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava,
+  filters)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = HashMap[String, 
JList[Resource]](offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
+
+var launchTasks = true
+
+// TODO(mgummelt): combine offers for a single slave
+// round-robin create executors on the available offers
+while (launchTasks) {
+  launchTasks = false
+
+  for (offer <- offers) {
 val slaveId = offer.getSlaveId.getValue
-val mem = getResource(offer.getResourcesList, "mem")
-val cpus = getResource(offer.getResourcesList, "cpus").toInt
-val id = offer.getId.getValue
-if (meetsConstraints) {
-  if (taskIdToSlaveId.size < executorLimit &&
-  totalCoresAcquired < maxCores &&
-  mem >= calculateTotalMemory(sc) &&
-  cpus >= 1 &&
-  failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES 
&&
-  !slaveIdsWithExecutors.contains(slaveId)) {
-// Launch an executor on the slave
-val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-totalCoresAcquired += cpusToUse
-val taskId = newMesosTaskId()
-taskIdToSlaveId.put(taskId, slaveId)
-

[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180043255
  
**[Test build #50769 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50769/consoleFull)**
 for PR 10993 at commit 
[`88e6322`](https://github.com/apache/spark/commit/88e63229e2f5fbc2ebf5119afffe3bd338d644a1).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180045237
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50769/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180045220
  
**[Test build #50769 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50769/consoleFull)**
 for PR 10993 at commit 
[`88e6322`](https://github.com/apache/spark/commit/88e63229e2f5fbc2ebf5119afffe3bd338d644a1).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-180045231
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-03 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51704761
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: 
SlaveID, b: Array[Byte]) {}
 
   /**
-   * Called when a slave is lost or a Mesos task finished. Update local 
view on
-   * what tasks are running and remove the terminated slave from the list 
of pending
-   * slave IDs that we might have asked to be killed. It also notifies the 
driver
-   * that an executor was removed.
+   * Called when a slave is lost or a Mesos task finished. Updates local 
view on
+   * what tasks are running. It also notifies the driver that an executor 
was removed.
*/
-  private def executorTerminated(d: SchedulerDriver, slaveId: String, 
reason: String): Unit = {
+  private def executorTerminated(d: SchedulerDriver,
+ slaveId: String,
+ taskId: String,
+ reason: String): Unit = {
 stateLock.synchronized {
-  if (slaveIdsWithExecutors.contains(slaveId)) {
-val slaveIdToTaskId = taskIdToSlaveId.inverse()
-if (slaveIdToTaskId.containsKey(slaveId)) {
-  val taskId: Int = slaveIdToTaskId.get(slaveId)
-  taskIdToSlaveId.remove(taskId)
-  removeExecutor(sparkExecutorId(slaveId, taskId.toString), 
SlaveLost(reason))
-}
-// TODO: This assumes one Spark executor per Mesos slave,
-// which may no longer be true after SPARK-5095
-pendingRemovedSlaveIds -= slaveId
-slaveIdsWithExecutors -= slaveId
-  }
+  removeExecutor(taskId, SlaveLost(reason))
+  slaves(slaveId).taskIDs.remove(taskId)
--- End diff --

You're right that memory-wise it's not a big loss. But I prefer clean code. 
For instance, what's the meaning of having a Slave record for a host that 
doesn't run any tasks? Can this become a source of confusion down the road? 
Since it seems that it's not that complex to clean up, I'd go for it now.

```
def removeTask(slave: Slave, taskId: String) = {
slave.taskIDs.remove(taskId)
if (slave.taskIDs.isEmpty) {
  slaves.remove(slaveId)
}
  }
```

In fact, this method may go inside `Slave`, who could properly encapsulate 
`taskIDs`. It may even be an inner class, so it can update `slaves`. Unless I'm 
missing something, we're talking about two lines of code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-03 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51703515
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava,
+  filters)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = HashMap[String, 
JList[Resource]](offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
+
+var launchTasks = true
+
+// TODO(mgummelt): combine offers for a single slave
+// round-robin create executors on the available offers
+while (launchTasks) {
+  launchTasks = false
+
+  for (offer <- offers) {
 val slaveId = offer.getSlaveId.getValue
-val mem = getResource(offer.getResourcesList, "mem")
-val cpus = getResource(offer.getResourcesList, "cpus").toInt
-val id = offer.getId.getValue
-if (meetsConstraints) {
-  if (taskIdToSlaveId.size < executorLimit &&
-  totalCoresAcquired < maxCores &&
-  mem >= calculateTotalMemory(sc) &&
-  cpus >= 1 &&
-  failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES 
&&
-  !slaveIdsWithExecutors.contains(slaveId)) {
-// Launch an executor on the slave
-val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-totalCoresAcquired += cpusToUse
-val taskId = newMesosTaskId()
-taskIdToSlaveId.put(taskId, slaveId)
-

[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-03 Thread dragos
Github user dragos commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-179266568
  
I see the same behavior with master. I think this is a regression 
introduced when Akka was removed, and communication has switched to Netty. 

Here's what happens: the connection between the driver and each shuffle 
server is idle, and controlled by the general `spark.network.timeout`, 
defaulting to 120s. That's exactly what can be seen in the logs: the 
application disconnects exactly 2 minutes after registration.

We'd need a TCP connection without a timeout, or have heartbeats exchanged 
between the two. I'll file a Jira ticket.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-03 Thread dragos
Github user dragos commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-179250730
  
@Astralidea this PR implements round-robin on the received offers. That 
means it will try to schedule executors on all slaves in the current set of 
offers, before deploying a second executor on a given slave.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-03 Thread dragos
Github user dragos commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-179294468
  
It seems this was reported already 
[SPARK-12583](https://issues.apache.org/jira/browse/SPARK-12583), I somehow 
missed it... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-03 Thread dragos
Github user dragos commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-179251806
  
I'm having troubles running this with dynamic allocation. Did you test it 
in that scenario?

I'm seeing disconnects from the driver, leading to 

```
6/02/03 15:03:29 WARN TaskSetManager: Lost task 3.2 in stage 4.0 (TID 4015, 
10.0.1.205): java.io.FileNotFoundException: 
/tmp/blockmgr-f008b463-1d87-406b-b879-bae73c915907/27/shuffle_2_3_0.data.607ce66e-b528-4fc8-97e2-5028fc7b8e99
 (No such file or directory)
```

In the Shuffle Service logs I see

```
16/02/03 14:58:32 DEBUG MesosExternalShuffleBlockHandler: Received 
registration request from app 1521e408-d8fe-416d-898b-3801e73a8293-0119 (remote 
address /10.0.1.47:52808).
16/02/03 14:58:34 INFO ExternalShuffleBlockResolver: Registered executor 
AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=4} with 
ExecutorShuffleInfo{localDirs=[/tmp/blockmgr-248a584a-89b7-461a-8d8d-3363bd0f1a1b],
 subDirsPerLocalDir=64, shuffleManager=sort}
16/02/03 14:58:34 WARN MesosExternalShuffleBlockHandler: Unknown 
/10.0.1.208:42483 disconnected.
16/02/03 14:58:43 INFO ExternalShuffleBlockResolver: Registered executor 
AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=2} with 
ExecutorShuffleInfo{localDirs=[/tmp/blockmgr-d9865194-5c38-46ae-bce7-de5605cbb4f6],
 subDirsPerLocalDir=64, shuffleManager=sort}
16/02/03 14:58:43 WARN MesosExternalShuffleBlockHandler: Unknown 
/10.0.1.208:42498 disconnected.
16/02/03 14:58:43 INFO ExternalShuffleBlockResolver: Registered executor 
AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=0} with 
ExecutorShuffleInfo{localDirs=[/tmp/blockmgr-b8350cfd-fa2e-4a29-92c2-a88f1bec17ca],
 subDirsPerLocalDir=64, shuffleManager=sort}
16/02/03 14:58:43 WARN MesosExternalShuffleBlockHandler: Unknown 
/10.0.1.208:42499 disconnected.
16/02/03 14:59:20 WARN MesosExternalShuffleBlockHandler: Unknown 
/10.0.1.208:42509 disconnected.
16/02/03 14:59:20 WARN MesosExternalShuffleBlockHandler: Unknown 
/10.0.1.205:35465 disconnected.
16/02/03 14:59:20 WARN MesosExternalShuffleBlockHandler: Unknown 
/10.0.1.205:35462 disconnected.
16/02/03 15:00:09 INFO ExternalShuffleBlockResolver: Registered executor 
AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=7} with 
ExecutorShuffleInfo{localDirs=[/tmp/blockmgr-19a734ac-496a-4b7d-b304-acf16f4b5a78],
 subDirsPerLocalDir=64, shuffleManager=sort}
16/02/03 15:00:09 WARN MesosExternalShuffleBlockHandler: Unknown 
/10.0.1.208:42522 disconnected.
16/02/03 15:00:32 INFO MesosExternalShuffleBlockHandler: Application 
1521e408-d8fe-416d-898b-3801e73a8293-0119 disconnected (address was 
/10.0.1.47:52808).
16/02/03 15:00:32 INFO ExternalShuffleBlockResolver: Application 
1521e408-d8fe-416d-898b-3801e73a8293-0119 removed, cleanupLocalDirs = true
16/02/03 15:00:32 INFO ExternalShuffleBlockResolver: Cleaning up executor 
AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=4}'s 1 local 
dirs
16/02/03 15:00:32 INFO ExternalShuffleBlockResolver: Cleaning up executor 
AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=2}'s 1 local 
dirs
16/02/03 15:00:32 INFO ExternalShuffleBlockResolver: Cleaning up executor 
AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=0}'s 1 local 
dirs
16/02/03 15:00:32 INFO ExternalShuffleBlockResolver: Cleaning up executor 
AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=7}'s 1 local 
dirs
```

I am not sure if it's related to this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-03 Thread Astralidea
Github user Astralidea commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-179579452
  
@dragos oh, but if I want to deploy 2 executor per slave . This PR can not 
do that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51649684
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
--- End diff --

Added explicit return types.

This method both accepts and declines offers, which is why I called it 
`handle`.  I've added a scaladoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51652453
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: 
SlaveID, b: Array[Byte]) {}
 
   /**
-   * Called when a slave is lost or a Mesos task finished. Update local 
view on
-   * what tasks are running and remove the terminated slave from the list 
of pending
-   * slave IDs that we might have asked to be killed. It also notifies the 
driver
-   * that an executor was removed.
+   * Called when a slave is lost or a Mesos task finished. Updates local 
view on
+   * what tasks are running. It also notifies the driver that an executor 
was removed.
*/
-  private def executorTerminated(d: SchedulerDriver, slaveId: String, 
reason: String): Unit = {
+  private def executorTerminated(d: SchedulerDriver,
+ slaveId: String,
+ taskId: String,
+ reason: String): Unit = {
 stateLock.synchronized {
-  if (slaveIdsWithExecutors.contains(slaveId)) {
-val slaveIdToTaskId = taskIdToSlaveId.inverse()
-if (slaveIdToTaskId.containsKey(slaveId)) {
-  val taskId: Int = slaveIdToTaskId.get(slaveId)
-  taskIdToSlaveId.remove(taskId)
-  removeExecutor(sparkExecutorId(slaveId, taskId.toString), 
SlaveLost(reason))
-}
-// TODO: This assumes one Spark executor per Mesos slave,
-// which may no longer be true after SPARK-5095
-pendingRemovedSlaveIds -= slaveId
-slaveIdsWithExecutors -= slaveId
-  }
+  removeExecutor(taskId, SlaveLost(reason))
+  slaves(slaveId).taskIDs.remove(taskId)
--- End diff --

 Are you worried about memory?  Even w/ 1k executors, this should only be 
~20k of memory.  I'd prefer to take that tradeoff in order to keep the code as 
simple as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-178882985
  
**[Test build #50608 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50608/consoleFull)**
 for PR 10993 at commit 
[`0a1181a`](https://github.com/apache/spark/commit/0a1181a9160e72a164efcc05459326b4e01f8f5c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51650781
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava,
+  filters)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = HashMap[String, 
JList[Resource]](offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
--- End diff --

Ah, yes.  Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51651630
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava,
+  filters)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = HashMap[String, 
JList[Resource]](offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
+
+var launchTasks = true
+
+// TODO(mgummelt): combine offers for a single slave
+// round-robin create executors on the available offers
+while (launchTasks) {
+  launchTasks = false
+
+  for (offer <- offers) {
 val slaveId = offer.getSlaveId.getValue
-val mem = getResource(offer.getResourcesList, "mem")
-val cpus = getResource(offer.getResourcesList, "cpus").toInt
-val id = offer.getId.getValue
-if (meetsConstraints) {
-  if (taskIdToSlaveId.size < executorLimit &&
-  totalCoresAcquired < maxCores &&
-  mem >= calculateTotalMemory(sc) &&
-  cpus >= 1 &&
-  failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES 
&&
-  !slaveIdsWithExecutors.contains(slaveId)) {
-// Launch an executor on the slave
-val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-totalCoresAcquired += cpusToUse
-val taskId = newMesosTaskId()
-taskIdToSlaveId.put(taskId, slaveId)
-

[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51652632
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -426,23 +489,23 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def doKillExecutors(executorIds: Seq[String]): Boolean = {
 if (mesosDriver == null) {
   logWarning("Asked to kill executors before the Mesos driver was 
started.")
-  return false
-}
-
-val slaveIdToTaskId = taskIdToSlaveId.inverse()
-for (executorId <- executorIds) {
-  val slaveId = executorId.split("/")(0)
-  if (slaveIdToTaskId.containsKey(slaveId)) {
-mesosDriver.killTask(
-  
TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build())
-pendingRemovedSlaveIds += slaveId
-  } else {
-logWarning("Unable to find executor Id '" + executorId + "' in 
Mesos scheduler")
+  false
+} else {
+  for (executorId <- executorIds) {
+val taskId = TaskID.newBuilder().setValue(executorId).build()
+mesosDriver.killTask(taskId)
   }
+  // no need to adjust `executorLimitOption` since the 
AllocationManager already communicated
+  // the desired limit through a call to `doRequestTotalExecutors`.
+  // See 
[[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
+  true
 }
-// no need to adjust `executorLimitOption` since the AllocationManager 
already communicated
-// the desired limit through a call to `doRequestTotalExecutors`.
-// See 
[[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
-true
   }
 }
+
+private class Slave(val hostname: String) {
+  var taskFailures = 0
+  val taskIDs = new HashSet[String]()
+  var pendingRemoval = false
--- End diff --

removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51652611
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava,
+  filters)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = HashMap[String, 
JList[Resource]](offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
+
+var launchTasks = true
+
+// TODO(mgummelt): combine offers for a single slave
+// round-robin create executors on the available offers
+while (launchTasks) {
+  launchTasks = false
+
+  for (offer <- offers) {
 val slaveId = offer.getSlaveId.getValue
-val mem = getResource(offer.getResourcesList, "mem")
-val cpus = getResource(offer.getResourcesList, "cpus").toInt
-val id = offer.getId.getValue
-if (meetsConstraints) {
-  if (taskIdToSlaveId.size < executorLimit &&
-  totalCoresAcquired < maxCores &&
-  mem >= calculateTotalMemory(sc) &&
-  cpus >= 1 &&
-  failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES 
&&
-  !slaveIdsWithExecutors.contains(slaveId)) {
-// Launch an executor on the slave
-val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-totalCoresAcquired += cpusToUse
-val taskId = newMesosTaskId()
-taskIdToSlaveId.put(taskId, slaveId)
-

[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51648592
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
--- End diff --

I was thinking about this when I ran into it.  The default is actually 5: 
https://github.com/apache/mesos/blob/master/include/mesos/mesos.proto#L1211

So I'll just remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51650484
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava,
+  filters)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, 
List[MesosTaskInfo]] = {
--- End diff --

Changed to immutable map.
Added scaladoc.

I prefer the get* naming even for non-getters.  Whether or not a function 
is backed by a single instance variable seems to be an implementation detail 
that shouldn't be exposed by any particular naming scheme.  There's also other 
lots of precedence in the codebase for non-getter get* methods.  Though I have 
renamed it to `getMesosTasks` to disambiguate between Spark tasks and Mesos 
tasks.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51650895
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava,
+  filters)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = HashMap[String, 
JList[Resource]](offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
+
+var launchTasks = true
+
+// TODO(mgummelt): combine offers for a single slave
+// round-robin create executors on the available offers
+while (launchTasks) {
+  launchTasks = false
+
+  for (offer <- offers) {
 val slaveId = offer.getSlaveId.getValue
-val mem = getResource(offer.getResourcesList, "mem")
-val cpus = getResource(offer.getResourcesList, "cpus").toInt
-val id = offer.getId.getValue
-if (meetsConstraints) {
-  if (taskIdToSlaveId.size < executorLimit &&
-  totalCoresAcquired < maxCores &&
-  mem >= calculateTotalMemory(sc) &&
-  cpus >= 1 &&
-  failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES 
&&
-  !slaveIdsWithExecutors.contains(slaveId)) {
-// Launch an executor on the slave
-val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-totalCoresAcquired += cpusToUse
-val taskId = newMesosTaskId()
-taskIdToSlaveId.put(taskId, slaveId)
-

[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51651674
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava,
+  filters)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = HashMap[String, 
JList[Resource]](offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
+
+var launchTasks = true
+
+// TODO(mgummelt): combine offers for a single slave
+// round-robin create executors on the available offers
+while (launchTasks) {
+  launchTasks = false
+
+  for (offer <- offers) {
 val slaveId = offer.getSlaveId.getValue
-val mem = getResource(offer.getResourcesList, "mem")
-val cpus = getResource(offer.getResourcesList, "cpus").toInt
-val id = offer.getId.getValue
-if (meetsConstraints) {
-  if (taskIdToSlaveId.size < executorLimit &&
-  totalCoresAcquired < maxCores &&
-  mem >= calculateTotalMemory(sc) &&
-  cpus >= 1 &&
-  failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES 
&&
-  !slaveIdsWithExecutors.contains(slaveId)) {
-// Launch an executor on the slave
-val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-totalCoresAcquired += cpusToUse
-val taskId = newMesosTaskId()
-taskIdToSlaveId.put(taskId, slaveId)
-

[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-178931774
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50608/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-178931643
  
**[Test build #50608 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50608/consoleFull)**
 for PR 10993 at commit 
[`0a1181a`](https://github.com/apache/spark/commit/0a1181a9160e72a164efcc05459326b4e01f8f5c).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-178931772
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread dragos
Github user dragos commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-178503833
  
I didn't have time to look at this in detail, I'll do so this afternoon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51594185
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava,
+  filters)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = HashMap[String, 
JList[Resource]](offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
--- End diff --

A simpler way to do the same thing: `offers.map(offer => 
(offer.getId.getValue, offer.getResourcesList)).toMap`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51599060
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava,
+  filters)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = HashMap[String, 
JList[Resource]](offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
+
+var launchTasks = true
+
+// TODO(mgummelt): combine offers for a single slave
+// round-robin create executors on the available offers
+while (launchTasks) {
+  launchTasks = false
+
+  for (offer <- offers) {
 val slaveId = offer.getSlaveId.getValue
-val mem = getResource(offer.getResourcesList, "mem")
-val cpus = getResource(offer.getResourcesList, "cpus").toInt
-val id = offer.getId.getValue
-if (meetsConstraints) {
-  if (taskIdToSlaveId.size < executorLimit &&
-  totalCoresAcquired < maxCores &&
-  mem >= calculateTotalMemory(sc) &&
-  cpus >= 1 &&
-  failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES 
&&
-  !slaveIdsWithExecutors.contains(slaveId)) {
-// Launch an executor on the slave
-val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-totalCoresAcquired += cpusToUse
-val taskId = newMesosTaskId()
-taskIdToSlaveId.put(taskId, slaveId)
-

[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51599986
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava,
+  filters)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = HashMap[String, 
JList[Resource]](offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
+
+var launchTasks = true
+
+// TODO(mgummelt): combine offers for a single slave
+// round-robin create executors on the available offers
+while (launchTasks) {
+  launchTasks = false
+
+  for (offer <- offers) {
 val slaveId = offer.getSlaveId.getValue
-val mem = getResource(offer.getResourcesList, "mem")
-val cpus = getResource(offer.getResourcesList, "cpus").toInt
-val id = offer.getId.getValue
-if (meetsConstraints) {
-  if (taskIdToSlaveId.size < executorLimit &&
-  totalCoresAcquired < maxCores &&
-  mem >= calculateTotalMemory(sc) &&
-  cpus >= 1 &&
-  failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES 
&&
-  !slaveIdsWithExecutors.contains(slaveId)) {
-// Launch an executor on the slave
-val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-totalCoresAcquired += cpusToUse
-val taskId = newMesosTaskId()
-taskIdToSlaveId.put(taskId, slaveId)
-

[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51600410
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava,
+  filters)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = HashMap[String, 
JList[Resource]](offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
+
+var launchTasks = true
+
+// TODO(mgummelt): combine offers for a single slave
+// round-robin create executors on the available offers
+while (launchTasks) {
+  launchTasks = false
+
+  for (offer <- offers) {
 val slaveId = offer.getSlaveId.getValue
-val mem = getResource(offer.getResourcesList, "mem")
-val cpus = getResource(offer.getResourcesList, "cpus").toInt
-val id = offer.getId.getValue
-if (meetsConstraints) {
-  if (taskIdToSlaveId.size < executorLimit &&
-  totalCoresAcquired < maxCores &&
-  mem >= calculateTotalMemory(sc) &&
-  cpus >= 1 &&
-  failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES 
&&
-  !slaveIdsWithExecutors.contains(slaveId)) {
-// Launch an executor on the slave
-val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-totalCoresAcquired += cpusToUse
-val taskId = newMesosTaskId()
-taskIdToSlaveId.put(taskId, slaveId)
-

[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51600547
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: 
SlaveID, b: Array[Byte]) {}
 
   /**
-   * Called when a slave is lost or a Mesos task finished. Update local 
view on
-   * what tasks are running and remove the terminated slave from the list 
of pending
-   * slave IDs that we might have asked to be killed. It also notifies the 
driver
-   * that an executor was removed.
+   * Called when a slave is lost or a Mesos task finished. Updates local 
view on
+   * what tasks are running. It also notifies the driver that an executor 
was removed.
*/
-  private def executorTerminated(d: SchedulerDriver, slaveId: String, 
reason: String): Unit = {
+  private def executorTerminated(d: SchedulerDriver,
+ slaveId: String,
+ taskId: String,
+ reason: String): Unit = {
 stateLock.synchronized {
-  if (slaveIdsWithExecutors.contains(slaveId)) {
-val slaveIdToTaskId = taskIdToSlaveId.inverse()
-if (slaveIdToTaskId.containsKey(slaveId)) {
-  val taskId: Int = slaveIdToTaskId.get(slaveId)
-  taskIdToSlaveId.remove(taskId)
-  removeExecutor(sparkExecutorId(slaveId, taskId.toString), 
SlaveLost(reason))
-}
-// TODO: This assumes one Spark executor per Mesos slave,
-// which may no longer be true after SPARK-5095
-pendingRemovedSlaveIds -= slaveId
-slaveIdsWithExecutors -= slaveId
-  }
+  removeExecutor(taskId, SlaveLost(reason))
+  slaves(slaveId).taskIDs.remove(taskId)
--- End diff --

`slaves` is never cleaned up. Keys will continue to accumulate for the 
duration of this Job. Would it make sense to check if `taskIDs` is empty and 
remove the `slaveId` from the map?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51600867
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -426,23 +489,23 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def doKillExecutors(executorIds: Seq[String]): Boolean = {
 if (mesosDriver == null) {
   logWarning("Asked to kill executors before the Mesos driver was 
started.")
-  return false
-}
-
-val slaveIdToTaskId = taskIdToSlaveId.inverse()
-for (executorId <- executorIds) {
-  val slaveId = executorId.split("/")(0)
-  if (slaveIdToTaskId.containsKey(slaveId)) {
-mesosDriver.killTask(
-  
TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build())
-pendingRemovedSlaveIds += slaveId
-  } else {
-logWarning("Unable to find executor Id '" + executorId + "' in 
Mesos scheduler")
+  false
+} else {
+  for (executorId <- executorIds) {
+val taskId = TaskID.newBuilder().setValue(executorId).build()
+mesosDriver.killTask(taskId)
   }
+  // no need to adjust `executorLimitOption` since the 
AllocationManager already communicated
+  // the desired limit through a call to `doRequestTotalExecutors`.
+  // See 
[[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
+  true
 }
-// no need to adjust `executorLimitOption` since the AllocationManager 
already communicated
-// the desired limit through a call to `doRequestTotalExecutors`.
-// See 
[[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
-true
   }
 }
+
+private class Slave(val hostname: String) {
+  var taskFailures = 0
+  val taskIDs = new HashSet[String]()
+  var pendingRemoval = false
--- End diff --

This field is never used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread dragos
Github user dragos commented on the pull request:

https://github.com/apache/spark/pull/10993#issuecomment-178696871
  
@mgummelt this looks really good! I have a few comments. I still have to 
run this PR with dynamic allocation and see it in action!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51600669
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava,
+  filters)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, 
List[MesosTaskInfo]] = {
+// offerID -> tasks
+val tasks = new HashMap[OfferID, 
List[MesosTaskInfo]].withDefaultValue(Nil)
+
+// offerID -> resources
+val remainingResources = HashMap[String, 
JList[Resource]](offers.map(offer =>
+  (offer.getId.getValue, offer.getResourcesList)): _*)
+
+var launchTasks = true
+
+// TODO(mgummelt): combine offers for a single slave
+// round-robin create executors on the available offers
+while (launchTasks) {
+  launchTasks = false
+
+  for (offer <- offers) {
 val slaveId = offer.getSlaveId.getValue
-val mem = getResource(offer.getResourcesList, "mem")
-val cpus = getResource(offer.getResourcesList, "cpus").toInt
-val id = offer.getId.getValue
-if (meetsConstraints) {
-  if (taskIdToSlaveId.size < executorLimit &&
-  totalCoresAcquired < maxCores &&
-  mem >= calculateTotalMemory(sc) &&
-  cpus >= 1 &&
-  failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES 
&&
-  !slaveIdsWithExecutors.contains(slaveId)) {
-// Launch an executor on the slave
-val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-totalCoresAcquired += cpusToUse
-val taskId = newMesosTaskId()
-taskIdToSlaveId.put(taskId, slaveId)
-

[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51592789
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
--- End diff --

I know this is not your code, but it would be good to document this. Why do 
we filter out offers for 5 seconds on the offers we use?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51593473
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+val tasks = getTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val filters = Filters.newBuilder().setRefuseSeconds(5).build()
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava,
+  filters)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, 
List[MesosTaskInfo]] = {
--- End diff --

Please add a short scaladoc explaining what this method does. Also, 
`getTasks` is confusing (this is not a getter), maybe `buildTasks`?

I'd also return an immutable Map (does the caller need to remove or add 
things to this map?). Or at least, use `collection.Map`, which both mutable and 
immutable maps implement (but has no mutating methods itself).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51593041
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
--- End diff --

Please use explicit return types (`: Unit = {...`). Procedure syntax is 
deprecated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51593225
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]) {
--- End diff --

Also, a short scaladoc would help. `handle` is very light on meaning. What 
about `launchExecutors` or something along those lines?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...

2016-02-02 Thread dragos
Github user dragos commented on a diff in the pull request:

https://github.com/apache/spark/pull/10993#discussion_r51686128
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -245,113 +240,207 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 stateLock.synchronized {
-  val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-  for (offer <- offers.asScala) {
+  logDebug(s"Received ${offers.size} resource offers.")
+
+  val (matchedOffers, unmatchedOffers) = offers.asScala.partition { 
offer =>
 val offerAttributes = toAttributeMap(offer.getAttributesList)
-val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+  }
+
+  declineUnmatchedOffers(d, unmatchedOffers)
+  handleMatchedOffers(d, matchedOffers)
+}
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]): Unit = {
+for (offer <- offers) {
+  val id = offer.getId.getValue
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val mem = getResource(offer.getResourcesList, "mem")
+  val cpus = getResource(offer.getResourcesList, "cpus")
+  val filters = Filters.newBuilder()
+.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+  logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
++ s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+  d.declineOffer(offer.getId, filters)
+}
+  }
+
+  /**
+* Launches executors on accepted offers, and declines unused offers. 
Executors are launched
+* round-robin on offers.
+*
+* @param d SchedulerDriver
+* @param offers Mesos offers that match attribute constraints
+*/
+  private def handleMatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]): Unit = {
+val tasks = getMesosTasks(offers)
+for (offer <- offers) {
+  val offerAttributes = toAttributeMap(offer.getAttributesList)
+  val offerMem = getResource(offer.getResourcesList, "mem")
+  val offerCpus = getResource(offer.getResourcesList, "cpus")
+  val id = offer.getId.getValue
+
+  if (tasks.contains(offer.getId)) { // accept
+val offerTasks = tasks(offer.getId)
+
+logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} 
Mesos tasks.")
+
+for (task <- offerTasks) {
+  val taskId = task.getTaskId
+  val mem = getResource(task.getResourcesList, "mem")
+  val cpus = getResource(task.getResourcesList, "cpus")
+
+  logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: 
$mem cpu: $cpus.")
+}
+
+d.launchTasks(
+  Collections.singleton(offer.getId),
+  offerTasks.asJava)
+  } else { // decline
+logDebug(s"Declining offer: $id with attributes: $offerAttributes 
" +
+  s"mem: $offerMem cpu: $offerCpus")
+
+d.declineOffer(offer.getId)
+  }
+}
+  }
+
+  /**
+* Returns a map from OfferIDs to the tasks to launch on those offers.  
In order to maximize
+* per-task memory and IO, tasks are round-robin assigned to offers.
+*
+* @param offers Mesos offers that match attribute constraints
+* @return A map from OfferID to a list of Mesos tasks to launch on 
that offer
+*/
+  private def getMesosTasks(offers: Buffer[Offer]): Map[OfferID, 
List[MesosTaskInfo]] = {
--- End diff --

I find it non-productive to quibble over a name. That being said, this 
method doesn't just get tasks from somewhere. It produces them itself, based on 
a round-robin scheduling strategy over the given offers. I don't think `get` is 
the best verb to describe that action.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >