[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r52793626 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} /** - * Called when a slave is lost or a Mesos task finished. Update local view on - * what tasks are running and remove the terminated slave from the list of pending - * slave IDs that we might have asked to be killed. It also notifies the driver - * that an executor was removed. + * Called when a slave is lost or a Mesos task finished. Updates local view on + * what tasks are running. It also notifies the driver that an executor was removed. */ - private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = { + private def executorTerminated(d: SchedulerDriver, + slaveId: String, + taskId: String, + reason: String): Unit = { stateLock.synchronized { - if (slaveIdsWithExecutors.contains(slaveId)) { -val slaveIdToTaskId = taskIdToSlaveId.inverse() -if (slaveIdToTaskId.containsKey(slaveId)) { - val taskId: Int = slaveIdToTaskId.get(slaveId) - taskIdToSlaveId.remove(taskId) - removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason)) -} -// TODO: This assumes one Spark executor per Mesos slave, -// which may no longer be true after SPARK-5095 -pendingRemovedSlaveIds -= slaveId -slaveIdsWithExecutors -= slaveId - } + removeExecutor(taskId, SlaveLost(reason)) + slaves(slaveId).taskIDs.remove(taskId) --- End diff -- 1) It's not any more of an issue than it was before. I didn't add any 2) Total memory accrual is O(slaves on which this driver ever launches an executor). This is bounded by the number of slaves in the cluster. Largest known Mesos cluster is ~50k slaves. If a Spark job somehow spans the entire cluster (highly unlikely), this object will grow to ~50k * sizeof(slaveID) ~= 1.5MB. I think we're fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r52503772 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -89,13 +82,11 @@ private[spark] class CoarseMesosSchedulerBackend( */ private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue) - private val pendingRemovedSlaveIds = new HashSet[String] --- End diff -- nice --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r52504852 --- Diff: core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala --- @@ -37,6 +41,223 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with BeforeAndAfter { + var sparkConf: SparkConf = _ + var driver: SchedulerDriver = _ + var taskScheduler: TaskSchedulerImpl = _ + var backend: CoarseMesosSchedulerBackend = _ + var externalShuffleClient: MesosExternalShuffleClient = _ + var driverEndpoint: RpcEndpointRef = _ --- End diff -- please make all of these private --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r52504373 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -410,40 +507,25 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} /** - * Called when a slave is lost or a Mesos task finished. Update local view on - * what tasks are running and remove the terminated slave from the list of pending - * slave IDs that we might have asked to be killed. It also notifies the driver - * that an executor was removed. + * Called when a slave is lost or a Mesos task finished. Updates local view on + * what tasks are running. It also notifies the driver that an executor was removed. */ - private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = { + private def executorTerminated(d: SchedulerDriver, + slaveId: String, + taskId: String, + reason: String): Unit = { --- End diff -- style: ``` private def executorTerminated( d: SchedulerDriver, slaveId: String, taskId: String, reason: String): Unit = { } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/10993 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r52503444 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -113,27 +107,31 @@ private[spark] class CoarseMesosSchedulerBackend( private val rejectOfferDurationForUnmetConstraints = getRejectOfferDurationForUnmetConstraints(sc) - // A client for talking to the external shuffle service, if it is a + // A client for talking to the external shuffle service private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = { if (shuffleServiceEnabled) { - Some(new MesosExternalShuffleClient( -SparkTransportConf.fromSparkConf(conf, "shuffle"), -securityManager, -securityManager.isAuthenticationEnabled(), -securityManager.isSaslEncryptionEnabled())) + Some(getShuffleClient()) } else { None } } + protected def getShuffleClient(): MesosExternalShuffleClient = { --- End diff -- is this exposed so we can test it? If so we should add a comment and say so. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r52503364 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -73,17 +73,13 @@ private[spark] class CoarseMesosSchedulerBackend( private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) // Cores we have acquired with each Mesos task ID - val coresByTaskId = new HashMap[Int, Int] + val coresByTaskId = new HashMap[String, Int] var totalCoresAcquired = 0 - val slaveIdsWithExecutors = new HashSet[String] - - // Maping from slave Id to hostname - private val slaveIdToHost = new HashMap[String, String] - - val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String] - // How many times tasks on each slave failed - val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int] + // SlaveID -> Slave + // This map accumulates entries for the duration of the job. Slaves are never deleted, because + // we need to maintain e.g. failure state and connection state. + private val slaves = new HashMap[String, Slave] --- End diff -- elsewhere in Spark we would call this class `SlaveInfo` instead of just `Slave`, so we don't confuse it with the Mesos Slave --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182528576 LGTM merging into master. @mgummelt feel free to address the remainder of the comments in a follow-up patch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r52504536 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} /** - * Called when a slave is lost or a Mesos task finished. Update local view on - * what tasks are running and remove the terminated slave from the list of pending - * slave IDs that we might have asked to be killed. It also notifies the driver - * that an executor was removed. + * Called when a slave is lost or a Mesos task finished. Updates local view on + * what tasks are running. It also notifies the driver that an executor was removed. */ - private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = { + private def executorTerminated(d: SchedulerDriver, + slaveId: String, + taskId: String, + reason: String): Unit = { stateLock.synchronized { - if (slaveIdsWithExecutors.contains(slaveId)) { -val slaveIdToTaskId = taskIdToSlaveId.inverse() -if (slaveIdToTaskId.containsKey(slaveId)) { - val taskId: Int = slaveIdToTaskId.get(slaveId) - taskIdToSlaveId.remove(taskId) - removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason)) -} -// TODO: This assumes one Spark executor per Mesos slave, -// which may no longer be true after SPARK-5095 -pendingRemovedSlaveIds -= slaveId -slaveIdsWithExecutors -= slaveId - } + removeExecutor(taskId, SlaveLost(reason)) + slaves(slaveId).taskIDs.remove(taskId) --- End diff -- is this still an issue (though not related to this patch)? If so can one of you file a JIRA about this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182589806 Thanks for merging. Will this go into 1.6.1, or not until 2.0? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182592130 This is a big new feature. It will not go into a maintenance release (1.6.1). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r52504157 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -260,113 +257,209 @@ private[spark] class CoarseMesosSchedulerBackend( offers.asScala.map(_.getId).foreach(d.declineOffer) return } - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + /** +* Launches executors on accepted offers, and declines unused offers. Executors are launched +* round-robin on offers. +* +* @param d SchedulerDriver +* @param offers Mesos offers that match attribute constraints +*/ + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { +val tasks = buildMesosTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + /** +* Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize +* per-task memory and IO, tasks are round-robin assigned to offers. +* +* @param offers Mesos offers that match attribute constraints +* @return A map from OfferID to a list of Mesos tasks to launch on that offer +*/ + private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = mutable.Map(offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + +var launchTasks = true + +// TODO(mgummelt): combine offers for a single slave +// +// round-robin create executors on the available offers +while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue -val mem = getResource(offer.getResourcesList, "mem") -val cpus = getResource(offer.getResourcesList, "cpus").toInt -val id = offer.getId.getValue -if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired <
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182645311 @mgummelt looks like this caused a flaky test: https://spark-tests.appspot.com/tests/org.apache.spark.scheduler.cluster.mesos.CoarseMesosSchedulerBackendSuite/mesos%20kills%20an%20executor%20when%20told Do you have the bandwidth to fix it quickly? If not I'll just revert this patch for now and we can resubmit it later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user Astralidea commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182673482 @mgummelt Great Work! I think this feature will allow more people to use mesos. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182673851 I haven't found the problem, but here's a PR to remove the test in the interim #11164 It's a strange test to be flaky. It's very simple. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182671529 looking into it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182674567 Ah, I see the issue. There's a thread causing a race. I won't be able to fix until tomorrow, though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r52352874 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -260,113 +257,208 @@ private[spark] class CoarseMesosSchedulerBackend( offers.asScala.map(_.getId).foreach(d.declineOffer) return } - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + /** +* Launches executors on accepted offers, and declines unused offers. Executors are launched +* round-robin on offers. +* +* @param d SchedulerDriver +* @param offers Mesos offers that match attribute constraints +*/ + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { +val tasks = buildMesosTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + /** +* Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize +* per-task memory and IO, tasks are round-robin assigned to offers. +* +* @param offers Mesos offers that match attribute constraints +* @return A map from OfferID to a list of Mesos tasks to launch on that offer +*/ + private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = mutable.Map(offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + +var launchTasks = true + +// TODO(mgummelt): combine offers for a single slave +// +// round-robin create executors on the available offers +while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue -val mem = getResource(offer.getResourcesList, "mem") -val cpus = getResource(offer.getResourcesList, "cpus").toInt -val id = offer.getId.getValue -if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired <
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182003212 **[Test build #50985 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50985/consoleFull)** for PR 10993 at commit [`ecad77a`](https://github.com/apache/spark/commit/ecad77a6ac85892f1155f596e84729342e484088). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-181999051 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182059314 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182059321 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50985/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182065042 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182065047 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50984/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182064048 **[Test build #50984 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50984/consoleFull)** for PR 10993 at commit [`ecad77a`](https://github.com/apache/spark/commit/ecad77a6ac85892f1155f596e84729342e484088). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182058487 **[Test build #50985 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50985/consoleFull)** for PR 10993 at commit [`ecad77a`](https://github.com/apache/spark/commit/ecad77a6ac85892f1155f596e84729342e484088). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-182001548 **[Test build #50984 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50984/consoleFull)** for PR 10993 at commit [`ecad77a`](https://github.com/apache/spark/commit/ecad77a6ac85892f1155f596e84729342e484088). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r52422595 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -260,113 +257,208 @@ private[spark] class CoarseMesosSchedulerBackend( offers.asScala.map(_.getId).foreach(d.declineOffer) return } - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + /** +* Launches executors on accepted offers, and declines unused offers. Executors are launched +* round-robin on offers. +* +* @param d SchedulerDriver +* @param offers Mesos offers that match attribute constraints +*/ + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { +val tasks = buildMesosTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + /** +* Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize +* per-task memory and IO, tasks are round-robin assigned to offers. +* +* @param offers Mesos offers that match attribute constraints +* @return A map from OfferID to a list of Mesos tasks to launch on that offer +*/ + private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = mutable.Map(offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + +var launchTasks = true + +// TODO(mgummelt): combine offers for a single slave +// +// round-robin create executors on the available offers +while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue -val mem = getResource(offer.getResourcesList, "mem") -val cpus = getResource(offer.getResourcesList, "cpus").toInt -val id = offer.getId.getValue -if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired <
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r52188015 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -260,113 +257,208 @@ private[spark] class CoarseMesosSchedulerBackend( offers.asScala.map(_.getId).foreach(d.declineOffer) return } - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + /** +* Launches executors on accepted offers, and declines unused offers. Executors are launched +* round-robin on offers. +* +* @param d SchedulerDriver +* @param offers Mesos offers that match attribute constraints +*/ + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { +val tasks = buildMesosTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + /** +* Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize +* per-task memory and IO, tasks are round-robin assigned to offers. +* +* @param offers Mesos offers that match attribute constraints +* @return A map from OfferID to a list of Mesos tasks to launch on that offer +*/ + private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = mutable.Map(offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + +var launchTasks = true + +// TODO(mgummelt): combine offers for a single slave +// +// round-robin create executors on the available offers +while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue -val mem = getResource(offer.getResourcesList, "mem") -val cpus = getResource(offer.getResourcesList, "cpus").toInt -val id = offer.getId.getValue -if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired <
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user tnachen commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-181444389 Just one comment, overall LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r52188465 --- Diff: docs/configuration.md --- @@ -825,13 +825,18 @@ Apart from these, the following properties are also available, and may be useful spark.executor.cores - 1 in YARN mode, all the available cores on the worker in standalone mode. -The number of cores to use on each executor. For YARN and standalone mode only. +1 in YARN mode, all the available cores on the worker in +standalone and Mesos coarse-grained modes. --- End diff -- We don't have multiple coarse-grained modes right? Just mode? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180752078 LGTM! Great work, @mgummelt! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180510322 **[Test build #50833 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50833/consoleFull)** for PR 10993 at commit [`7e3f39d`](https://github.com/apache/spark/commit/7e3f39d92b3f2159b5b2682ab7bdbc0954cc3adb). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180505172 Yeah @mgummelt we have lots of those. Welcome to Spark :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r52059914 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} /** - * Called when a slave is lost or a Mesos task finished. Update local view on - * what tasks are running and remove the terminated slave from the list of pending - * slave IDs that we might have asked to be killed. It also notifies the driver - * that an executor was removed. + * Called when a slave is lost or a Mesos task finished. Updates local view on + * what tasks are running. It also notifies the driver that an executor was removed. */ - private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = { + private def executorTerminated(d: SchedulerDriver, + slaveId: String, + taskId: String, + reason: String): Unit = { stateLock.synchronized { - if (slaveIdsWithExecutors.contains(slaveId)) { -val slaveIdToTaskId = taskIdToSlaveId.inverse() -if (slaveIdToTaskId.containsKey(slaveId)) { - val taskId: Int = slaveIdToTaskId.get(slaveId) - taskIdToSlaveId.remove(taskId) - removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason)) -} -// TODO: This assumes one Spark executor per Mesos slave, -// which may no longer be true after SPARK-5095 -pendingRemovedSlaveIds -= slaveId -slaveIdsWithExecutors -= slaveId - } + removeExecutor(taskId, SlaveLost(reason)) + slaves(slaveId).taskIDs.remove(taskId) --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180508893 @andrewor14 Glad to be here! Flaky tests or no I think all concerns have been addressed except for dynamic allocation testing, which seems to be broken entirely: SPARK-12583 @dragos Any other comments? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180505004 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180511868 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180560007 **[Test build #50833 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50833/consoleFull)** for PR 10993 at commit [`7e3f39d`](https://github.com/apache/spark/commit/7e3f39d92b3f2159b5b2682ab7bdbc0954cc3adb). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180511871 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50831/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180560552 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180560557 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50833/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51989924 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} /** - * Called when a slave is lost or a Mesos task finished. Update local view on - * what tasks are running and remove the terminated slave from the list of pending - * slave IDs that we might have asked to be killed. It also notifies the driver - * that an executor was removed. + * Called when a slave is lost or a Mesos task finished. Updates local view on + * what tasks are running. It also notifies the driver that an executor was removed. */ - private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = { + private def executorTerminated(d: SchedulerDriver, + slaveId: String, + taskId: String, + reason: String): Unit = { stateLock.synchronized { - if (slaveIdsWithExecutors.contains(slaveId)) { -val slaveIdToTaskId = taskIdToSlaveId.inverse() -if (slaveIdToTaskId.containsKey(slaveId)) { - val taskId: Int = slaveIdToTaskId.get(slaveId) - taskIdToSlaveId.remove(taskId) - removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason)) -} -// TODO: This assumes one Spark executor per Mesos slave, -// which may no longer be true after SPARK-5095 -pendingRemovedSlaveIds -= slaveId -slaveIdsWithExecutors -= slaveId - } + removeExecutor(taskId, SlaveLost(reason)) + slaves(slaveId).taskIDs.remove(taskId) --- End diff -- I missed this fact, I agree it makes sense to keep slave instances without tasks. Thanks for explaining it! Would you mind adding a line about it in the definition of `slaves`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180051489 **[Test build #50770 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50770/consoleFull)** for PR 10993 at commit [`b587f8f`](https://github.com/apache/spark/commit/b587f8fb98edaaedc9e4cfc93dee5cc79ce2e362). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180076930 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180076934 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50764/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180088967 **[Test build #50770 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50770/consoleFull)** for PR 10993 at commit [`b587f8f`](https://github.com/apache/spark/commit/b587f8fb98edaaedc9e4cfc93dee5cc79ce2e362). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180089190 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180089197 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50770/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180076446 **[Test build #50764 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50764/consoleFull)** for PR 10993 at commit [`4857e57`](https://github.com/apache/spark/commit/4857e570489f61f207e62246e6fb1ceeff943095). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180103124 Looks like it failed a network test. Flaky test? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-179843161 @Astralidea it will deploy more than one executor on the same slave if there are enough resources and `spark.cores.max` wasn't reached yet. It's just that it will first spawn executors on each eligible slave in the current set of offers, and continue iterating until one of the stop conditions is hit (not enough resources or the `spark.cores.max` limit). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51923243 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,113 +240,207 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + /** +* Launches executors on accepted offers, and declines unused offers. Executors are launched +* round-robin on offers. +* +* @param d SchedulerDriver +* @param offers Mesos offers that match attribute constraints +*/ + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { +val tasks = getMesosTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + /** +* Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize +* per-task memory and IO, tasks are round-robin assigned to offers. +* +* @param offers Mesos offers that match attribute constraints +* @return A map from OfferID to a list of Mesos tasks to launch on that offer +*/ + private def getMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = { --- End diff -- Sure. Changed to `build` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51923784 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} /** - * Called when a slave is lost or a Mesos task finished. Update local view on - * what tasks are running and remove the terminated slave from the list of pending - * slave IDs that we might have asked to be killed. It also notifies the driver - * that an executor was removed. + * Called when a slave is lost or a Mesos task finished. Updates local view on + * what tasks are running. It also notifies the driver that an executor was removed. */ - private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = { + private def executorTerminated(d: SchedulerDriver, + slaveId: String, + taskId: String, + reason: String): Unit = { stateLock.synchronized { - if (slaveIdsWithExecutors.contains(slaveId)) { -val slaveIdToTaskId = taskIdToSlaveId.inverse() -if (slaveIdToTaskId.containsKey(slaveId)) { - val taskId: Int = slaveIdToTaskId.get(slaveId) - taskIdToSlaveId.remove(taskId) - removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason)) -} -// TODO: This assumes one Spark executor per Mesos slave, -// which may no longer be true after SPARK-5095 -pendingRemovedSlaveIds -= slaveId -slaveIdsWithExecutors -= slaveId - } + removeExecutor(taskId, SlaveLost(reason)) + slaves(slaveId).taskIDs.remove(taskId) --- End diff -- We want to keep the `taskFailures` and `shuffleRegistered` even when there are no `taskIDs`, so I don't think this will work --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51924965 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} /** - * Called when a slave is lost or a Mesos task finished. Update local view on - * what tasks are running and remove the terminated slave from the list of pending - * slave IDs that we might have asked to be killed. It also notifies the driver - * that an executor was removed. + * Called when a slave is lost or a Mesos task finished. Updates local view on + * what tasks are running. It also notifies the driver that an executor was removed. */ - private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = { + private def executorTerminated(d: SchedulerDriver, + slaveId: String, + taskId: String, + reason: String): Unit = { stateLock.synchronized { - if (slaveIdsWithExecutors.contains(slaveId)) { -val slaveIdToTaskId = taskIdToSlaveId.inverse() -if (slaveIdToTaskId.containsKey(slaveId)) { - val taskId: Int = slaveIdToTaskId.get(slaveId) - taskIdToSlaveId.remove(taskId) - removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason)) -} -// TODO: This assumes one Spark executor per Mesos slave, -// which may no longer be true after SPARK-5095 -pendingRemovedSlaveIds -= slaveId -slaveIdsWithExecutors -= slaveId - } + removeExecutor(taskId, SlaveLost(reason)) + slaves(slaveId).taskIDs.remove(taskId) --- End diff -- a bigger issue is with streaming jobs, which may run for many days. If there's state we never clean up then this will cause an OOM on the driver. This kind of leak has happened before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180021501 resolved merge conflicts with #10319 fixed the method naming issue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51927502 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} /** - * Called when a slave is lost or a Mesos task finished. Update local view on - * what tasks are running and remove the terminated slave from the list of pending - * slave IDs that we might have asked to be killed. It also notifies the driver - * that an executor was removed. + * Called when a slave is lost or a Mesos task finished. Updates local view on + * what tasks are running. It also notifies the driver that an executor was removed. */ - private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = { + private def executorTerminated(d: SchedulerDriver, + slaveId: String, + taskId: String, + reason: String): Unit = { stateLock.synchronized { - if (slaveIdsWithExecutors.contains(slaveId)) { -val slaveIdToTaskId = taskIdToSlaveId.inverse() -if (slaveIdToTaskId.containsKey(slaveId)) { - val taskId: Int = slaveIdToTaskId.get(slaveId) - taskIdToSlaveId.remove(taskId) - removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason)) -} -// TODO: This assumes one Spark executor per Mesos slave, -// which may no longer be true after SPARK-5095 -pendingRemovedSlaveIds -= slaveId -slaveIdsWithExecutors -= slaveId - } + removeExecutor(taskId, SlaveLost(reason)) + slaves(slaveId).taskIDs.remove(taskId) --- End diff -- At the very least, this isn't a regression, because the previous SlaveID HashMaps were not cleaned up either. But even if we wanted to change that in this PR, how would we maintain the current behaviour where the driver blacklists slaves and only registered with a shuffle service once? We need `taskFailures` and `shuffleRegistered` state for the lifetime of the driver. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180023827 @Astralidea You can't guarantee that receivers run on different nodes even with Coarse-Grained Spark as it exists today. One executor running on a slave does not guarantee that one Spark task will run on a slave. I have some new config vars in mind that will solve this problem, as well as other scheduling problems, though: spark.mesos.executor.max_memory spark.mesos.memory.min_per_core spark.mesos.memory.max_per_core spark.mesos.cores.max_per_node I think these 4 new config vars will capture any constraints a user has. For example, you can guarantee one receiver per node by setting spark.mesos.cores.max_per_node == spark.task.cores But this is a discussion that should be moved to JIRA --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180026167 **[Test build #50764 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50764/consoleFull)** for PR 10993 at commit [`4857e57`](https://github.com/apache/spark/commit/4857e570489f61f207e62246e6fb1ceeff943095). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user drcrallen commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180024934 @mgummelt I had done limits for memory per core in https://github.com/apache/spark/pull/10232 in response to https://issues.apache.org/jira/browse/SPARK-12248 but totally forgot to fix the spark PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51934657 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava, + filters) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = HashMap[String, JList[Resource]](offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + +var launchTasks = true + +// TODO(mgummelt): combine offers for a single slave +// round-robin create executors on the available offers +while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue -val mem = getResource(offer.getResourcesList, "mem") -val cpus = getResource(offer.getResourcesList, "cpus").toInt -val id = offer.getId.getValue -if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { -// Launch an executor on the slave -val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) -totalCoresAcquired += cpusToUse -val taskId = newMesosTaskId() -taskIdToSlaveId.put(taskId, slaveId) -
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180043255 **[Test build #50769 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50769/consoleFull)** for PR 10993 at commit [`88e6322`](https://github.com/apache/spark/commit/88e63229e2f5fbc2ebf5119afffe3bd338d644a1). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180045237 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50769/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180045220 **[Test build #50769 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50769/consoleFull)** for PR 10993 at commit [`88e6322`](https://github.com/apache/spark/commit/88e63229e2f5fbc2ebf5119afffe3bd338d644a1). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-180045231 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51704761 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} /** - * Called when a slave is lost or a Mesos task finished. Update local view on - * what tasks are running and remove the terminated slave from the list of pending - * slave IDs that we might have asked to be killed. It also notifies the driver - * that an executor was removed. + * Called when a slave is lost or a Mesos task finished. Updates local view on + * what tasks are running. It also notifies the driver that an executor was removed. */ - private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = { + private def executorTerminated(d: SchedulerDriver, + slaveId: String, + taskId: String, + reason: String): Unit = { stateLock.synchronized { - if (slaveIdsWithExecutors.contains(slaveId)) { -val slaveIdToTaskId = taskIdToSlaveId.inverse() -if (slaveIdToTaskId.containsKey(slaveId)) { - val taskId: Int = slaveIdToTaskId.get(slaveId) - taskIdToSlaveId.remove(taskId) - removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason)) -} -// TODO: This assumes one Spark executor per Mesos slave, -// which may no longer be true after SPARK-5095 -pendingRemovedSlaveIds -= slaveId -slaveIdsWithExecutors -= slaveId - } + removeExecutor(taskId, SlaveLost(reason)) + slaves(slaveId).taskIDs.remove(taskId) --- End diff -- You're right that memory-wise it's not a big loss. But I prefer clean code. For instance, what's the meaning of having a Slave record for a host that doesn't run any tasks? Can this become a source of confusion down the road? Since it seems that it's not that complex to clean up, I'd go for it now. ``` def removeTask(slave: Slave, taskId: String) = { slave.taskIDs.remove(taskId) if (slave.taskIDs.isEmpty) { slaves.remove(slaveId) } } ``` In fact, this method may go inside `Slave`, who could properly encapsulate `taskIDs`. It may even be an inner class, so it can update `slaves`. Unless I'm missing something, we're talking about two lines of code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51703515 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava, + filters) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = HashMap[String, JList[Resource]](offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + +var launchTasks = true + +// TODO(mgummelt): combine offers for a single slave +// round-robin create executors on the available offers +while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue -val mem = getResource(offer.getResourcesList, "mem") -val cpus = getResource(offer.getResourcesList, "cpus").toInt -val id = offer.getId.getValue -if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { -// Launch an executor on the slave -val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) -totalCoresAcquired += cpusToUse -val taskId = newMesosTaskId() -taskIdToSlaveId.put(taskId, slaveId) -
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-179266568 I see the same behavior with master. I think this is a regression introduced when Akka was removed, and communication has switched to Netty. Here's what happens: the connection between the driver and each shuffle server is idle, and controlled by the general `spark.network.timeout`, defaulting to 120s. That's exactly what can be seen in the logs: the application disconnects exactly 2 minutes after registration. We'd need a TCP connection without a timeout, or have heartbeats exchanged between the two. I'll file a Jira ticket. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-179250730 @Astralidea this PR implements round-robin on the received offers. That means it will try to schedule executors on all slaves in the current set of offers, before deploying a second executor on a given slave. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-179294468 It seems this was reported already [SPARK-12583](https://issues.apache.org/jira/browse/SPARK-12583), I somehow missed it... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-179251806 I'm having troubles running this with dynamic allocation. Did you test it in that scenario? I'm seeing disconnects from the driver, leading to ``` 6/02/03 15:03:29 WARN TaskSetManager: Lost task 3.2 in stage 4.0 (TID 4015, 10.0.1.205): java.io.FileNotFoundException: /tmp/blockmgr-f008b463-1d87-406b-b879-bae73c915907/27/shuffle_2_3_0.data.607ce66e-b528-4fc8-97e2-5028fc7b8e99 (No such file or directory) ``` In the Shuffle Service logs I see ``` 16/02/03 14:58:32 DEBUG MesosExternalShuffleBlockHandler: Received registration request from app 1521e408-d8fe-416d-898b-3801e73a8293-0119 (remote address /10.0.1.47:52808). 16/02/03 14:58:34 INFO ExternalShuffleBlockResolver: Registered executor AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=4} with ExecutorShuffleInfo{localDirs=[/tmp/blockmgr-248a584a-89b7-461a-8d8d-3363bd0f1a1b], subDirsPerLocalDir=64, shuffleManager=sort} 16/02/03 14:58:34 WARN MesosExternalShuffleBlockHandler: Unknown /10.0.1.208:42483 disconnected. 16/02/03 14:58:43 INFO ExternalShuffleBlockResolver: Registered executor AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=2} with ExecutorShuffleInfo{localDirs=[/tmp/blockmgr-d9865194-5c38-46ae-bce7-de5605cbb4f6], subDirsPerLocalDir=64, shuffleManager=sort} 16/02/03 14:58:43 WARN MesosExternalShuffleBlockHandler: Unknown /10.0.1.208:42498 disconnected. 16/02/03 14:58:43 INFO ExternalShuffleBlockResolver: Registered executor AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=0} with ExecutorShuffleInfo{localDirs=[/tmp/blockmgr-b8350cfd-fa2e-4a29-92c2-a88f1bec17ca], subDirsPerLocalDir=64, shuffleManager=sort} 16/02/03 14:58:43 WARN MesosExternalShuffleBlockHandler: Unknown /10.0.1.208:42499 disconnected. 16/02/03 14:59:20 WARN MesosExternalShuffleBlockHandler: Unknown /10.0.1.208:42509 disconnected. 16/02/03 14:59:20 WARN MesosExternalShuffleBlockHandler: Unknown /10.0.1.205:35465 disconnected. 16/02/03 14:59:20 WARN MesosExternalShuffleBlockHandler: Unknown /10.0.1.205:35462 disconnected. 16/02/03 15:00:09 INFO ExternalShuffleBlockResolver: Registered executor AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=7} with ExecutorShuffleInfo{localDirs=[/tmp/blockmgr-19a734ac-496a-4b7d-b304-acf16f4b5a78], subDirsPerLocalDir=64, shuffleManager=sort} 16/02/03 15:00:09 WARN MesosExternalShuffleBlockHandler: Unknown /10.0.1.208:42522 disconnected. 16/02/03 15:00:32 INFO MesosExternalShuffleBlockHandler: Application 1521e408-d8fe-416d-898b-3801e73a8293-0119 disconnected (address was /10.0.1.47:52808). 16/02/03 15:00:32 INFO ExternalShuffleBlockResolver: Application 1521e408-d8fe-416d-898b-3801e73a8293-0119 removed, cleanupLocalDirs = true 16/02/03 15:00:32 INFO ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=4}'s 1 local dirs 16/02/03 15:00:32 INFO ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=2}'s 1 local dirs 16/02/03 15:00:32 INFO ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=0}'s 1 local dirs 16/02/03 15:00:32 INFO ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=1521e408-d8fe-416d-898b-3801e73a8293-0119, execId=7}'s 1 local dirs ``` I am not sure if it's related to this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user Astralidea commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-179579452 @dragos oh, but if I want to deploy 2 executor per slave . This PR can not do that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51649684 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { --- End diff -- Added explicit return types. This method both accepts and declines offers, which is why I called it `handle`. I've added a scaladoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51652453 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} /** - * Called when a slave is lost or a Mesos task finished. Update local view on - * what tasks are running and remove the terminated slave from the list of pending - * slave IDs that we might have asked to be killed. It also notifies the driver - * that an executor was removed. + * Called when a slave is lost or a Mesos task finished. Updates local view on + * what tasks are running. It also notifies the driver that an executor was removed. */ - private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = { + private def executorTerminated(d: SchedulerDriver, + slaveId: String, + taskId: String, + reason: String): Unit = { stateLock.synchronized { - if (slaveIdsWithExecutors.contains(slaveId)) { -val slaveIdToTaskId = taskIdToSlaveId.inverse() -if (slaveIdToTaskId.containsKey(slaveId)) { - val taskId: Int = slaveIdToTaskId.get(slaveId) - taskIdToSlaveId.remove(taskId) - removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason)) -} -// TODO: This assumes one Spark executor per Mesos slave, -// which may no longer be true after SPARK-5095 -pendingRemovedSlaveIds -= slaveId -slaveIdsWithExecutors -= slaveId - } + removeExecutor(taskId, SlaveLost(reason)) + slaves(slaveId).taskIDs.remove(taskId) --- End diff -- Are you worried about memory? Even w/ 1k executors, this should only be ~20k of memory. I'd prefer to take that tradeoff in order to keep the code as simple as possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-178882985 **[Test build #50608 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50608/consoleFull)** for PR 10993 at commit [`0a1181a`](https://github.com/apache/spark/commit/0a1181a9160e72a164efcc05459326b4e01f8f5c). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51650781 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava, + filters) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = HashMap[String, JList[Resource]](offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) --- End diff -- Ah, yes. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51651630 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava, + filters) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = HashMap[String, JList[Resource]](offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + +var launchTasks = true + +// TODO(mgummelt): combine offers for a single slave +// round-robin create executors on the available offers +while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue -val mem = getResource(offer.getResourcesList, "mem") -val cpus = getResource(offer.getResourcesList, "cpus").toInt -val id = offer.getId.getValue -if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { -// Launch an executor on the slave -val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) -totalCoresAcquired += cpusToUse -val taskId = newMesosTaskId() -taskIdToSlaveId.put(taskId, slaveId) -
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51652632 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -426,23 +489,23 @@ private[spark] class CoarseMesosSchedulerBackend( override def doKillExecutors(executorIds: Seq[String]): Boolean = { if (mesosDriver == null) { logWarning("Asked to kill executors before the Mesos driver was started.") - return false -} - -val slaveIdToTaskId = taskIdToSlaveId.inverse() -for (executorId <- executorIds) { - val slaveId = executorId.split("/")(0) - if (slaveIdToTaskId.containsKey(slaveId)) { -mesosDriver.killTask( - TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build()) -pendingRemovedSlaveIds += slaveId - } else { -logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler") + false +} else { + for (executorId <- executorIds) { +val taskId = TaskID.newBuilder().setValue(executorId).build() +mesosDriver.killTask(taskId) } + // no need to adjust `executorLimitOption` since the AllocationManager already communicated + // the desired limit through a call to `doRequestTotalExecutors`. + // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]] + true } -// no need to adjust `executorLimitOption` since the AllocationManager already communicated -// the desired limit through a call to `doRequestTotalExecutors`. -// See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]] -true } } + +private class Slave(val hostname: String) { + var taskFailures = 0 + val taskIDs = new HashSet[String]() + var pendingRemoval = false --- End diff -- removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51652611 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava, + filters) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = HashMap[String, JList[Resource]](offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + +var launchTasks = true + +// TODO(mgummelt): combine offers for a single slave +// round-robin create executors on the available offers +while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue -val mem = getResource(offer.getResourcesList, "mem") -val cpus = getResource(offer.getResourcesList, "cpus").toInt -val id = offer.getId.getValue -if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { -// Launch an executor on the slave -val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) -totalCoresAcquired += cpusToUse -val taskId = newMesosTaskId() -taskIdToSlaveId.put(taskId, slaveId) -
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51648592 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() --- End diff -- I was thinking about this when I ran into it. The default is actually 5: https://github.com/apache/mesos/blob/master/include/mesos/mesos.proto#L1211 So I'll just remove it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51650484 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava, + filters) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, List[MesosTaskInfo]] = { --- End diff -- Changed to immutable map. Added scaladoc. I prefer the get* naming even for non-getters. Whether or not a function is backed by a single instance variable seems to be an implementation detail that shouldn't be exposed by any particular naming scheme. There's also other lots of precedence in the codebase for non-getter get* methods. Though I have renamed it to `getMesosTasks` to disambiguate between Spark tasks and Mesos tasks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51650895 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava, + filters) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = HashMap[String, JList[Resource]](offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + +var launchTasks = true + +// TODO(mgummelt): combine offers for a single slave +// round-robin create executors on the available offers +while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue -val mem = getResource(offer.getResourcesList, "mem") -val cpus = getResource(offer.getResourcesList, "cpus").toInt -val id = offer.getId.getValue -if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { -// Launch an executor on the slave -val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) -totalCoresAcquired += cpusToUse -val taskId = newMesosTaskId() -taskIdToSlaveId.put(taskId, slaveId) -
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51651674 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava, + filters) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = HashMap[String, JList[Resource]](offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + +var launchTasks = true + +// TODO(mgummelt): combine offers for a single slave +// round-robin create executors on the available offers +while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue -val mem = getResource(offer.getResourcesList, "mem") -val cpus = getResource(offer.getResourcesList, "cpus").toInt -val id = offer.getId.getValue -if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { -// Launch an executor on the slave -val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) -totalCoresAcquired += cpusToUse -val taskId = newMesosTaskId() -taskIdToSlaveId.put(taskId, slaveId) -
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-178931774 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50608/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-178931643 **[Test build #50608 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50608/consoleFull)** for PR 10993 at commit [`0a1181a`](https://github.com/apache/spark/commit/0a1181a9160e72a164efcc05459326b4e01f8f5c). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-178931772 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-178503833 I didn't have time to look at this in detail, I'll do so this afternoon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51594185 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava, + filters) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = HashMap[String, JList[Resource]](offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) --- End diff -- A simpler way to do the same thing: `offers.map(offer => (offer.getId.getValue, offer.getResourcesList)).toMap` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51599060 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava, + filters) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = HashMap[String, JList[Resource]](offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + +var launchTasks = true + +// TODO(mgummelt): combine offers for a single slave +// round-robin create executors on the available offers +while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue -val mem = getResource(offer.getResourcesList, "mem") -val cpus = getResource(offer.getResourcesList, "cpus").toInt -val id = offer.getId.getValue -if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { -// Launch an executor on the slave -val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) -totalCoresAcquired += cpusToUse -val taskId = newMesosTaskId() -taskIdToSlaveId.put(taskId, slaveId) -
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51599986 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava, + filters) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = HashMap[String, JList[Resource]](offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + +var launchTasks = true + +// TODO(mgummelt): combine offers for a single slave +// round-robin create executors on the available offers +while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue -val mem = getResource(offer.getResourcesList, "mem") -val cpus = getResource(offer.getResourcesList, "cpus").toInt -val id = offer.getId.getValue -if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { -// Launch an executor on the slave -val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) -totalCoresAcquired += cpusToUse -val taskId = newMesosTaskId() -taskIdToSlaveId.put(taskId, slaveId) -
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51600410 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava, + filters) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = HashMap[String, JList[Resource]](offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + +var launchTasks = true + +// TODO(mgummelt): combine offers for a single slave +// round-robin create executors on the available offers +while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue -val mem = getResource(offer.getResourcesList, "mem") -val cpus = getResource(offer.getResourcesList, "cpus").toInt -val id = offer.getId.getValue -if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { -// Launch an executor on the slave -val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) -totalCoresAcquired += cpusToUse -val taskId = newMesosTaskId() -taskIdToSlaveId.put(taskId, slaveId) -
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51600547 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -373,40 +451,25 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} /** - * Called when a slave is lost or a Mesos task finished. Update local view on - * what tasks are running and remove the terminated slave from the list of pending - * slave IDs that we might have asked to be killed. It also notifies the driver - * that an executor was removed. + * Called when a slave is lost or a Mesos task finished. Updates local view on + * what tasks are running. It also notifies the driver that an executor was removed. */ - private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = { + private def executorTerminated(d: SchedulerDriver, + slaveId: String, + taskId: String, + reason: String): Unit = { stateLock.synchronized { - if (slaveIdsWithExecutors.contains(slaveId)) { -val slaveIdToTaskId = taskIdToSlaveId.inverse() -if (slaveIdToTaskId.containsKey(slaveId)) { - val taskId: Int = slaveIdToTaskId.get(slaveId) - taskIdToSlaveId.remove(taskId) - removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason)) -} -// TODO: This assumes one Spark executor per Mesos slave, -// which may no longer be true after SPARK-5095 -pendingRemovedSlaveIds -= slaveId -slaveIdsWithExecutors -= slaveId - } + removeExecutor(taskId, SlaveLost(reason)) + slaves(slaveId).taskIDs.remove(taskId) --- End diff -- `slaves` is never cleaned up. Keys will continue to accumulate for the duration of this Job. Would it make sense to check if `taskIDs` is empty and remove the `slaveId` from the map? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51600867 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -426,23 +489,23 @@ private[spark] class CoarseMesosSchedulerBackend( override def doKillExecutors(executorIds: Seq[String]): Boolean = { if (mesosDriver == null) { logWarning("Asked to kill executors before the Mesos driver was started.") - return false -} - -val slaveIdToTaskId = taskIdToSlaveId.inverse() -for (executorId <- executorIds) { - val slaveId = executorId.split("/")(0) - if (slaveIdToTaskId.containsKey(slaveId)) { -mesosDriver.killTask( - TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build()) -pendingRemovedSlaveIds += slaveId - } else { -logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler") + false +} else { + for (executorId <- executorIds) { +val taskId = TaskID.newBuilder().setValue(executorId).build() +mesosDriver.killTask(taskId) } + // no need to adjust `executorLimitOption` since the AllocationManager already communicated + // the desired limit through a call to `doRequestTotalExecutors`. + // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]] + true } -// no need to adjust `executorLimitOption` since the AllocationManager already communicated -// the desired limit through a call to `doRequestTotalExecutors`. -// See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]] -true } } + +private class Slave(val hostname: String) { + var taskFailures = 0 + val taskIDs = new HashSet[String]() + var pendingRemoval = false --- End diff -- This field is never used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on the pull request: https://github.com/apache/spark/pull/10993#issuecomment-178696871 @mgummelt this looks really good! I have a few comments. I still have to run this PR with dynamic allocation and see it in action! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51600669 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava, + filters) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, List[MesosTaskInfo]] = { +// offerID -> tasks +val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + +// offerID -> resources +val remainingResources = HashMap[String, JList[Resource]](offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + +var launchTasks = true + +// TODO(mgummelt): combine offers for a single slave +// round-robin create executors on the available offers +while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue -val mem = getResource(offer.getResourcesList, "mem") -val cpus = getResource(offer.getResourcesList, "cpus").toInt -val id = offer.getId.getValue -if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { -// Launch an executor on the slave -val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) -totalCoresAcquired += cpusToUse -val taskId = newMesosTaskId() -taskIdToSlaveId.put(taskId, slaveId) -
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51592789 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() --- End diff -- I know this is not your code, but it would be good to document this. Why do we filter out offers for 5 seconds on the offers we use? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51593473 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +val tasks = getTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val filters = Filters.newBuilder().setRefuseSeconds(5).build() +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava, + filters) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + private def getTasks(offers: Buffer[Offer]): mutable.Map[OfferID, List[MesosTaskInfo]] = { --- End diff -- Please add a short scaladoc explaining what this method does. Also, `getTasks` is confusing (this is not a getter), maybe `buildTasks`? I'd also return an immutable Map (does the caller need to remove or add things to this map?). Or at least, use `collection.Map`, which both mutable and immutable maps implement (but has no mutating methods itself). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51593041 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { --- End diff -- Please use explicit return types (`: Unit = {...`). Procedure syntax is deprecated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51593225 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,99 +239,182 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]) { --- End diff -- Also, a short scaladoc would help. `handle` is very light on meaning. What about `launchExecutors` or something along those lines? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5095] [Mesos] Support launching multipl...
Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/10993#discussion_r51686128 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -245,113 +240,207 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) -val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) +matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) +} + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { +for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() +.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" ++ s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) +} + } + + /** +* Launches executors on accepted offers, and declines unused offers. Executors are launched +* round-robin on offers. +* +* @param d SchedulerDriver +* @param offers Mesos offers that match attribute constraints +*/ + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { +val tasks = getMesosTasks(offers) +for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept +val offerTasks = tasks(offer.getId) + +logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + +for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") +} + +d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava) + } else { // decline +logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + +d.declineOffer(offer.getId) + } +} + } + + /** +* Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize +* per-task memory and IO, tasks are round-robin assigned to offers. +* +* @param offers Mesos offers that match attribute constraints +* @return A map from OfferID to a list of Mesos tasks to launch on that offer +*/ + private def getMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = { --- End diff -- I find it non-productive to quibble over a name. That being said, this method doesn't just get tasks from somewhere. It produces them itself, based on a round-robin scheduling strategy over the given offers. I don't think `get` is the best verb to describe that action. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org