Repository: spark
Updated Branches:
  refs/heads/master 49351c7f5 -> d86bbb4e2


[SPARK-6284] [MESOS] Add mesos role, principal and secret

Mesos supports framework authentication and role to be set per framework, which 
the role is used to identify the framework's role which impacts the sharing 
weight of resource allocation and optional authentication information to allow 
the framework to be connected to the master.

Author: Timothy Chen <tnac...@gmail.com>

Closes #4960 from tnachen/mesos_fw_auth and squashes the following commits:

0f9f03e [Timothy Chen] Fix review comments.
8f9488a [Timothy Chen] Fix rebase
f7fc2a9 [Timothy Chen] Add mesos role, auth and secret.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d86bbb4e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d86bbb4e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d86bbb4e

Branch: refs/heads/master
Commit: d86bbb4e286f16f77ba125452b07827684eafeed
Parents: 49351c7
Author: Timothy Chen <tnac...@gmail.com>
Authored: Thu Jul 16 19:36:45 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Jul 16 19:37:15 2015 -0700

----------------------------------------------------------------------
 .../mesos/CoarseMesosSchedulerBackend.scala     |  35 +++---
 .../cluster/mesos/MesosClusterScheduler.scala   |  28 ++---
 .../cluster/mesos/MesosSchedulerBackend.scala   | 118 ++++++++++-------
 .../cluster/mesos/MesosSchedulerUtils.scala     | 126 ++++++++++++++++---
 .../CoarseMesosSchedulerBackendSuite.scala      |  19 ++-
 .../mesos/MesosSchedulerBackendSuite.scala      | 106 +++++++++++++++-
 docs/running-on-mesos.md                        |  22 ++++
 7 files changed, 358 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d86bbb4e/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index cbade13..b7fde0d 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.scheduler.cluster.mesos
 
 import java.io.File
-import java.util.{List => JList, Collections}
 import java.util.concurrent.locks.ReentrantLock
+import java.util.{Collections, List => JList}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{HashMap, HashSet}
@@ -27,12 +27,11 @@ import scala.collection.mutable.{HashMap, HashSet}
 import com.google.common.collect.HashBiMap
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
 import org.apache.mesos.{Scheduler => MScheduler, _}
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 import org.apache.spark.rpc.RpcAddress
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
+import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 
 /**
  * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" 
tasks, where it holds
@@ -69,7 +68,7 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   /**
    * The total number of executors we aim to have. Undefined when not using 
dynamic allocation
-   * and before the ExecutorAllocatorManager calls [[doRequesTotalExecutors]].
+   * and before the ExecutorAllocatorManager calls [[doRequestTotalExecutors]].
    */
   private var executorLimitOption: Option[Int] = None
 
@@ -103,8 +102,9 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   override def start() {
     super.start()
-    val fwInfo = 
FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
-    startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo)
+    val driver = createSchedulerDriver(
+      master, CoarseMesosSchedulerBackend.this, sc.sparkUser, sc.appName, 
sc.conf)
+    startScheduler(driver)
   }
 
   def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = {
@@ -224,24 +224,29 @@ private[spark] class CoarseMesosSchedulerBackend(
           taskIdToSlaveId(taskId) = slaveId
           slaveIdsWithExecutors += slaveId
           coresByTaskId(taskId) = cpusToUse
-          val task = MesosTaskInfo.newBuilder()
+          // Gather cpu resources from the available resources and use them in 
the task.
+          val (remainingResources, cpuResourcesToUse) =
+            partitionResources(offer.getResourcesList, "cpus", cpusToUse)
+          val (_, memResourcesToUse) =
+            partitionResources(remainingResources, "mem", 
calculateTotalMemory(sc))
+          val taskBuilder = MesosTaskInfo.newBuilder()
             .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
             .setSlaveId(offer.getSlaveId)
             .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, 
taskId))
             .setName("Task " + taskId)
-            .addResources(createResource("cpus", cpusToUse))
-            .addResources(createResource("mem", calculateTotalMemory(sc)))
+            .addAllResources(cpuResourcesToUse)
+            .addAllResources(memResourcesToUse)
 
           sc.conf.getOption("spark.mesos.executor.docker.image").foreach { 
image =>
             MesosSchedulerBackendUtil
-              .setupContainerBuilderDockerInfo(image, sc.conf, 
task.getContainerBuilder)
+              .setupContainerBuilderDockerInfo(image, sc.conf, 
taskBuilder.getContainerBuilder())
           }
 
           // accept the offer and launch the task
           logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus")
           d.launchTasks(
             Collections.singleton(offer.getId),
-            Collections.singleton(task.build()), filters)
+            Collections.singleton(taskBuilder.build()), filters)
         } else {
           // Decline the offer
           logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus")
@@ -255,7 +260,7 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
     val taskId = status.getTaskId.getValue.toInt
     val state = status.getState
-    logInfo("Mesos task " + taskId + " is now " + state)
+    logInfo(s"Mesos task $taskId is now $state")
     stateLock.synchronized {
       if (TaskState.isFinished(TaskState.fromMesos(state))) {
         val slaveId = taskIdToSlaveId(taskId)
@@ -270,7 +275,7 @@ private[spark] class CoarseMesosSchedulerBackend(
         if (TaskState.isFailed(TaskState.fromMesos(state))) {
           failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) 
+ 1
           if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
-            logInfo("Blacklisting Mesos slave " + slaveId + " due to too many 
failures; " +
+            logInfo(s"Blacklisting Mesos slave $slaveId due to too many 
failures; " +
                 "is Spark installed on it?")
           }
         }
@@ -282,7 +287,7 @@ private[spark] class CoarseMesosSchedulerBackend(
   }
 
   override def error(d: SchedulerDriver, message: String) {
-    logError("Mesos error: " + message)
+    logError(s"Mesos error: $message")
     scheduler.error(message)
   }
 
@@ -323,7 +328,7 @@ private[spark] class CoarseMesosSchedulerBackend(
   }
 
   override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
-    logInfo("Mesos slave lost: " + slaveId.getValue)
+    logInfo(s"Mesos slave lost: ${slaveId.getValue}")
     executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + 
slaveId.getValue)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d86bbb4e/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index d3a20f8..f078547 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -295,20 +295,24 @@ private[spark] class MesosClusterScheduler(
   def start(): Unit = {
     // TODO: Implement leader election to make sure only one framework running 
in the cluster.
     val fwId = schedulerState.fetch[String]("frameworkId")
-    val builder = FrameworkInfo.newBuilder()
-      .setUser(Utils.getCurrentUserName())
-      .setName(appName)
-      .setWebuiUrl(frameworkUrl)
-      .setCheckpoint(true)
-      .setFailoverTimeout(Integer.MAX_VALUE) // Setting to max so tasks keep 
running on crash
     fwId.foreach { id =>
-      builder.setId(FrameworkID.newBuilder().setValue(id).build())
       frameworkId = id
     }
     recoverState()
     metricsSystem.registerSource(new MesosClusterSchedulerSource(this))
     metricsSystem.start()
-    startScheduler(master, MesosClusterScheduler.this, builder.build())
+    val driver = createSchedulerDriver(
+      master,
+      MesosClusterScheduler.this,
+      Utils.getCurrentUserName(),
+      appName,
+      conf,
+      Some(frameworkUrl),
+      Some(true),
+      Some(Integer.MAX_VALUE),
+      fwId)
+
+    startScheduler(driver)
     ready = true
   }
 
@@ -449,12 +453,8 @@ private[spark] class MesosClusterScheduler(
         offer.cpu -= driverCpu
         offer.mem -= driverMem
         val taskId = 
TaskID.newBuilder().setValue(submission.submissionId).build()
-        val cpuResource = Resource.newBuilder()
-          .setName("cpus").setType(Value.Type.SCALAR)
-          .setScalar(Value.Scalar.newBuilder().setValue(driverCpu)).build()
-        val memResource = Resource.newBuilder()
-          .setName("mem").setType(Value.Type.SCALAR)
-          .setScalar(Value.Scalar.newBuilder().setValue(driverMem)).build()
+        val cpuResource = createResource("cpus", driverCpu)
+        val memResource = createResource("mem", driverMem)
         val commandInfo = buildDriverCommand(submission)
         val appName = submission.schedulerProperties("spark.app.name")
         val taskInfo = TaskInfo.newBuilder()

http://git-wip-us.apache.org/repos/asf/spark/blob/d86bbb4e/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index d72e2af..3f63ec1 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -32,6 +32,7 @@ import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.Utils
 
+
 /**
  * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task 
is mapped to a
  * separate Mesos task, allowing multiple applications to share cluster nodes 
both in space (tasks
@@ -45,8 +46,8 @@ private[spark] class MesosSchedulerBackend(
   with MScheduler
   with MesosSchedulerUtils {
 
-  // Which slave IDs we have executors on
-  val slaveIdsWithExecutors = new HashSet[String]
+  // Stores the slave ids that has launched a Mesos executor.
+  val slaveIdToExecutorInfo = new HashMap[String, MesosExecutorInfo]
   val taskIdToSlaveId = new HashMap[Long, String]
 
   // An ExecutorInfo for our tasks
@@ -66,12 +67,21 @@ private[spark] class MesosSchedulerBackend(
   @volatile var appId: String = _
 
   override def start() {
-    val fwInfo = 
FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
     classLoader = Thread.currentThread.getContextClassLoader
-    startScheduler(master, MesosSchedulerBackend.this, fwInfo)
+    val driver = createSchedulerDriver(
+      master, MesosSchedulerBackend.this, sc.sparkUser, sc.appName, sc.conf)
+    startScheduler(driver)
   }
 
-  def createExecutorInfo(execId: String): MesosExecutorInfo = {
+  /**
+   * Creates a MesosExecutorInfo that is used to launch a Mesos executor.
+   * @param availableResources Available resources that is offered by Mesos
+   * @param execId The executor id to assign to this new executor.
+   * @return A tuple of the new mesos executor info and the remaining 
available resources.
+   */
+  def createExecutorInfo(
+      availableResources: JList[Resource],
+      execId: String): (MesosExecutorInfo, JList[Resource]) = {
     val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
       .orElse(sc.getSparkHome()) // Fall back to driver Spark home for 
backward compatibility
       .getOrElse {
@@ -115,32 +125,25 @@ private[spark] class MesosSchedulerBackend(
       command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class 
$executorBackendName")
       command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
     }
-    val cpus = Resource.newBuilder()
-      .setName("cpus")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder()
-      .setValue(mesosExecutorCores).build())
-      .build()
-    val memory = Resource.newBuilder()
-      .setName("mem")
-      .setType(Value.Type.SCALAR)
-      .setScalar(
-        Value.Scalar.newBuilder()
-          .setValue(calculateTotalMemory(sc)).build())
-      .build()
-    val executorInfo = MesosExecutorInfo.newBuilder()
+    val builder = MesosExecutorInfo.newBuilder()
+    val (resourcesAfterCpu, usedCpuResources) =
+      partitionResources(availableResources, "cpus", scheduler.CPUS_PER_TASK)
+    val (resourcesAfterMem, usedMemResources) =
+      partitionResources(resourcesAfterCpu, "mem", calculateTotalMemory(sc))
+
+    builder.addAllResources(usedCpuResources)
+    builder.addAllResources(usedMemResources)
+    val executorInfo = builder
       .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
       .setCommand(command)
       .setData(ByteString.copyFrom(createExecArg()))
-      .addResources(cpus)
-      .addResources(memory)
 
     sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
       MesosSchedulerBackendUtil
         .setupContainerBuilderDockerInfo(image, sc.conf, 
executorInfo.getContainerBuilder())
     }
 
-    executorInfo.build()
+    (executorInfo.build(), resourcesAfterMem)
   }
 
   /**
@@ -183,6 +186,18 @@ private[spark] class MesosSchedulerBackend(
 
   override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
 
+  private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = {
+    val builder = new StringBuilder
+    tasks.foreach { t =>
+      builder.append("Task id: ").append(t.getTaskId.getValue).append("\n")
+        .append("Slave id: ").append(t.getSlaveId.getValue).append("\n")
+        .append("Task resources: ").append(t.getResourcesList).append("\n")
+        .append("Executor resources: ").append(t.getExecutor.getResourcesList)
+        .append("---------------------------------------------\n")
+    }
+    builder.toString()
+  }
+
   /**
    * Method called by Mesos to offer resources on slaves. We respond by asking 
our active task sets
    * for tasks in order of priority. We fill each node with tasks in a 
round-robin manner so that
@@ -207,7 +222,7 @@ private[spark] class MesosSchedulerBackend(
 
         val meetsRequirements =
           (meetsConstraints && meetsMemoryRequirements && 
meetsCPURequirements) ||
-          (slaveIdsWithExecutors.contains(slaveId) && cpus >= 
scheduler.CPUS_PER_TASK)
+          (slaveIdToExecutorInfo.contains(slaveId) && cpus >= 
scheduler.CPUS_PER_TASK)
 
         // add some debug messaging
         val debugstr = if (meetsRequirements) "Accepting" else "Declining"
@@ -221,7 +236,7 @@ private[spark] class MesosSchedulerBackend(
       unUsableOffers.foreach(o => d.declineOffer(o.getId))
 
       val workerOffers = usableOffers.map { o =>
-        val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
+        val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
           getResource(o.getResourcesList, "cpus").toInt
         } else {
           // If the Mesos executor has not been started on this slave yet, set 
aside a few
@@ -236,6 +251,10 @@ private[spark] class MesosSchedulerBackend(
 
       val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> 
o).toMap
       val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
+      val slaveIdToResources = new HashMap[String, JList[Resource]]()
+      usableOffers.foreach { o =>
+        slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList
+      }
 
       val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
 
@@ -245,15 +264,19 @@ private[spark] class MesosSchedulerBackend(
       val acceptedOffers = 
scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
       acceptedOffers
         .foreach { offer =>
-        offer.foreach { taskDesc =>
-          val slaveId = taskDesc.executorId
-          slaveIdsWithExecutors += slaveId
-          slavesIdsOfAcceptedOffers += slaveId
-          taskIdToSlaveId(taskDesc.taskId) = slaveId
-          mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
-            .add(createMesosTask(taskDesc, slaveId))
+          offer.foreach { taskDesc =>
+            val slaveId = taskDesc.executorId
+            slavesIdsOfAcceptedOffers += slaveId
+            taskIdToSlaveId(taskDesc.taskId) = slaveId
+            val (mesosTask, remainingResources) = createMesosTask(
+              taskDesc,
+              slaveIdToResources(slaveId),
+              slaveId)
+            mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
+              .add(mesosTask)
+            slaveIdToResources(slaveId) = remainingResources
+          }
         }
-      }
 
       // Reply to the offers
       val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: 
lower timeout?
@@ -264,6 +287,7 @@ private[spark] class MesosSchedulerBackend(
             // TODO: Add support for log urls for Mesos
             new ExecutorInfo(o.host, o.cores, Map.empty)))
         )
+        logTrace(s"Launching Mesos tasks on slave '$slaveId', 
tasks:\n${getTasksSummary(tasks)}")
         d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), 
tasks, filters)
       }
 
@@ -272,26 +296,32 @@ private[spark] class MesosSchedulerBackend(
       for (o <- usableOffers if 
!slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) {
         d.declineOffer(o.getId)
       }
-
     }
   }
 
-  /** Turn a Spark TaskDescription into a Mesos task */
-  def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = 
{
+  /** Turn a Spark TaskDescription into a Mesos task and also resources unused 
by the task */
+  def createMesosTask(
+      task: TaskDescription,
+      resources: JList[Resource],
+      slaveId: String): (MesosTaskInfo, JList[Resource]) = {
     val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
-    val cpuResource = Resource.newBuilder()
-      .setName("cpus")
-      .setType(Value.Type.SCALAR)
-      
.setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build())
-      .build()
-    MesosTaskInfo.newBuilder()
+    val (executorInfo, remainingResources) = if 
(slaveIdToExecutorInfo.contains(slaveId)) {
+      (slaveIdToExecutorInfo(slaveId), resources)
+    } else {
+      createExecutorInfo(resources, slaveId)
+    }
+    slaveIdToExecutorInfo(slaveId) = executorInfo
+    val (finalResources, cpuResources) =
+      partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK)
+    val taskInfo = MesosTaskInfo.newBuilder()
       .setTaskId(taskId)
       .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
-      .setExecutor(createExecutorInfo(slaveId))
+      .setExecutor(executorInfo)
       .setName(task.name)
-      .addResources(cpuResource)
+      .addAllResources(cpuResources)
       .setData(MesosTaskLaunchData(task.serializedTask, 
task.attemptNumber).toByteString)
       .build()
+    (taskInfo, finalResources)
   }
 
   override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
@@ -337,7 +367,7 @@ private[spark] class MesosSchedulerBackend(
   private def removeExecutor(slaveId: String, reason: String) = {
     synchronized {
       
listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), 
slaveId, reason))
-      slaveIdsWithExecutors -= slaveId
+      slaveIdToExecutorInfo -= slaveId
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d86bbb4e/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 925702e..c04920e 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -21,15 +21,17 @@ import java.util.{List => JList}
 import java.util.concurrent.CountDownLatch
 
 import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
 import com.google.common.base.Splitter
 import org.apache.mesos.{MesosSchedulerDriver, SchedulerDriver, Scheduler, 
Protos}
 import org.apache.mesos.Protos._
-import org.apache.mesos.protobuf.GeneratedMessage
-import org.apache.spark.{Logging, SparkContext}
+import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
+import org.apache.spark.{SparkException, SparkConf, Logging, SparkContext}
 import org.apache.spark.util.Utils
 
+
 /**
  * Shared trait for implementing a Mesos Scheduler. This holds common state 
and helper
  * methods and Mesos scheduler will use.
@@ -42,13 +44,63 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
   protected var mesosDriver: SchedulerDriver = null
 
   /**
-   * Starts the MesosSchedulerDriver with the provided information. This 
method returns
-   * only after the scheduler has registered with Mesos.
-   * @param masterUrl Mesos master connection URL
-   * @param scheduler Scheduler object
-   * @param fwInfo FrameworkInfo to pass to the Mesos master
+   * Creates a new MesosSchedulerDriver that communicates to the Mesos master.
+   * @param masterUrl The url to connect to Mesos master
+   * @param scheduler the scheduler class to receive scheduler callbacks
+   * @param sparkUser User to impersonate with when running tasks
+   * @param appName The framework name to display on the Mesos UI
+   * @param conf Spark configuration
+   * @param webuiUrl The WebUI url to link from Mesos UI
+   * @param checkpoint Option to checkpoint tasks for failover
+   * @param failoverTimeout Duration Mesos master expect scheduler to 
reconnect on disconnect
+   * @param frameworkId The id of the new framework
    */
-  def startScheduler(masterUrl: String, scheduler: Scheduler, fwInfo: 
FrameworkInfo): Unit = {
+  protected def createSchedulerDriver(
+      masterUrl: String,
+      scheduler: Scheduler,
+      sparkUser: String,
+      appName: String,
+      conf: SparkConf,
+      webuiUrl: Option[String] = None,
+      checkpoint: Option[Boolean] = None,
+      failoverTimeout: Option[Double] = None,
+      frameworkId: Option[String] = None): SchedulerDriver = {
+    val fwInfoBuilder = 
FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName)
+    val credBuilder = Credential.newBuilder()
+    webuiUrl.foreach { url => fwInfoBuilder.setWebuiUrl(url) }
+    checkpoint.foreach { checkpoint => fwInfoBuilder.setCheckpoint(checkpoint) 
}
+    failoverTimeout.foreach { timeout => 
fwInfoBuilder.setFailoverTimeout(timeout) }
+    frameworkId.foreach { id =>
+      fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build())
+    }
+    conf.getOption("spark.mesos.principal").foreach { principal =>
+      fwInfoBuilder.setPrincipal(principal)
+      credBuilder.setPrincipal(principal)
+    }
+    conf.getOption("spark.mesos.secret").foreach { secret =>
+      credBuilder.setSecret(ByteString.copyFromUtf8(secret))
+    }
+    if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
+      throw new SparkException(
+        "spark.mesos.principal must be configured when spark.mesos.secret is 
set")
+    }
+    conf.getOption("spark.mesos.role").foreach { role =>
+      fwInfoBuilder.setRole(role)
+    }
+    if (credBuilder.hasPrincipal) {
+      new MesosSchedulerDriver(
+        scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())
+    } else {
+      new MesosSchedulerDriver(scheduler, fwInfoBuilder.build(), masterUrl)
+    }
+  }
+
+  /**
+   * Starts the MesosSchedulerDriver and stores the current running driver to 
this new instance.
+   * This driver is expected to not be running.
+   * This method returns only after the scheduler has registered with Mesos.
+   */
+  def startScheduler(newDriver: SchedulerDriver): Unit = {
     synchronized {
       if (mesosDriver != null) {
         registerLatch.await()
@@ -59,11 +111,11 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
         setDaemon(true)
 
         override def run() {
-          mesosDriver = new MesosSchedulerDriver(scheduler, fwInfo, masterUrl)
+          mesosDriver = newDriver
           try {
             val ret = mesosDriver.run()
             logInfo("driver.run() returned with code " + ret)
-            if (ret.equals(Status.DRIVER_ABORTED)) {
+            if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
               System.exit(1)
             }
           } catch {
@@ -82,18 +134,62 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
   /**
    * Signal that the scheduler has registered with Mesos.
    */
+  protected def getResource(res: JList[Resource], name: String): Double = {
+    // A resource can have multiple values in the offer since it can either be 
from
+    // a specific role or wildcard.
+    res.filter(_.getName == name).map(_.getScalar.getValue).sum
+  }
+
   protected def markRegistered(): Unit = {
     registerLatch.countDown()
   }
 
+  def createResource(name: String, amount: Double, role: Option[String] = 
None): Resource = {
+    val builder = Resource.newBuilder()
+      .setName(name)
+      .setType(Value.Type.SCALAR)
+      .setScalar(Value.Scalar.newBuilder().setValue(amount).build())
+
+    role.foreach { r => builder.setRole(r) }
+
+    builder.build()
+  }
+
   /**
-   * Get the amount of resources for the specified type from the resource list
+   * Partition the existing set of resources into two groups, those remaining 
to be
+   * scheduled and those requested to be used for a new task.
+   * @param resources The full list of available resources
+   * @param resourceName The name of the resource to take from the available 
resources
+   * @param amountToUse The amount of resources to take from the available 
resources
+   * @return The remaining resources list and the used resources list.
    */
-  protected def getResource(res: JList[Resource], name: String): Double = {
-    for (r <- res if r.getName == name) {
-      return r.getScalar.getValue
+  def partitionResources(
+      resources: JList[Resource],
+      resourceName: String,
+      amountToUse: Double): (List[Resource], List[Resource]) = {
+    var remain = amountToUse
+    var requestedResources = new ArrayBuffer[Resource]
+    val remainingResources = resources.map {
+      case r => {
+        if (remain > 0 &&
+          r.getType == Value.Type.SCALAR &&
+          r.getScalar.getValue > 0.0 &&
+          r.getName == resourceName) {
+          val usage = Math.min(remain, r.getScalar.getValue)
+          requestedResources += createResource(resourceName, usage, 
Some(r.getRole))
+          remain -= usage
+          createResource(resourceName, r.getScalar.getValue - usage, 
Some(r.getRole))
+        } else {
+          r
+        }
+      }
     }
-    0.0
+
+    // Filter any resource that has depleted.
+    val filteredResources =
+      remainingResources.filter(r => r.getType != Value.Type.SCALAR || 
r.getScalar.getValue > 0.0)
+
+    (filteredResources.toList, requestedResources.toList)
   }
 
   /** Helper method to get the key,value-set pair for a Mesos Attribute 
protobuf */

http://git-wip-us.apache.org/repos/asf/spark/blob/d86bbb4e/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
index 3f16929..4b504df 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -22,7 +22,7 @@ import java.util.Collections
 
 import org.apache.mesos.Protos.Value.Scalar
 import org.apache.mesos.Protos._
-import org.apache.mesos.SchedulerDriver
+import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.mockito.Matchers
@@ -60,7 +60,16 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
       taskScheduler: TaskSchedulerImpl,
       driver: SchedulerDriver): CoarseMesosSchedulerBackend = {
     val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") 
{
-      mesosDriver = driver
+      override protected def createSchedulerDriver(
+        masterUrl: String,
+        scheduler: Scheduler,
+        sparkUser: String,
+        appName: String,
+        conf: SparkConf,
+        webuiUrl: Option[String] = None,
+        checkpoint: Option[Boolean] = None,
+        failoverTimeout: Option[Double] = None,
+        frameworkId: Option[String] = None): SchedulerDriver = driver
       markRegistered()
     }
     backend.start()
@@ -80,6 +89,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
 
   test("mesos supports killing and limiting executors") {
     val driver = mock[SchedulerDriver]
+    when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
     val taskScheduler = mock[TaskSchedulerImpl]
     when(taskScheduler.sc).thenReturn(sc)
 
@@ -87,7 +97,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
     sparkConf.set("spark.driver.port", "1234")
 
     val backend = createSchedulerBackend(taskScheduler, driver)
-    val minMem = backend.calculateTotalMemory(sc).toInt
+    val minMem = backend.calculateTotalMemory(sc)
     val minCpu = 4
 
     val mesosOffers = new java.util.ArrayList[Offer]
@@ -130,11 +140,12 @@ class CoarseMesosSchedulerBackendSuite extends 
SparkFunSuite
 
   test("mesos supports killing and relaunching tasks with executors") {
     val driver = mock[SchedulerDriver]
+    when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
     val taskScheduler = mock[TaskSchedulerImpl]
     when(taskScheduler.sc).thenReturn(sc)
 
     val backend = createSchedulerBackend(taskScheduler, driver)
-    val minMem = backend.calculateTotalMemory(sc).toInt + 1024
+    val minMem = backend.calculateTotalMemory(sc) + 1024
     val minCpu = 4
 
     val mesosOffers = new java.util.ArrayList[Offer]

http://git-wip-us.apache.org/repos/asf/spark/blob/d86bbb4e/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index d01837f..5ed30f6 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
 import java.util
 import java.util.Collections
 
+import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
@@ -60,14 +61,17 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext wi
 
     val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, 
"master")
 
+    val resources = List(
+      mesosSchedulerBackend.createResource("cpus", 4),
+      mesosSchedulerBackend.createResource("mem", 1024))
     // uri is null.
-    val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id")
+    val (executorInfo, _) = 
mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
     assert(executorInfo.getCommand.getValue ===
       s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
 
     // uri exists.
     conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
-    val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id")
+    val (executorInfo1, _) = 
mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
     assert(executorInfo1.getCommand.getValue ===
       s"cd test-app-1*;  ./bin/spark-class 
${classOf[MesosExecutorBackend].getName}")
   }
@@ -93,7 +97,8 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext wi
 
     val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
 
-    val execInfo = backend.createExecutorInfo("mockExecutor")
+    val (execInfo, _) = backend.createExecutorInfo(
+      List(backend.createResource("cpus", 4)), "mockExecutor")
     assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
     val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
     assert(portmaps.get(0).getHostPort.equals(80))
@@ -194,7 +199,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext wi
     )
     verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId)
     verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId)
-    assert(capture.getValue.size() == 1)
+    assert(capture.getValue.size() === 1)
     val taskInfo = capture.getValue.iterator().next()
     assert(taskInfo.getName.equals("n1"))
     val cpus = taskInfo.getResourcesList.get(0)
@@ -214,4 +219,97 @@ class MesosSchedulerBackendSuite extends SparkFunSuite 
with LocalSparkContext wi
     backend.resourceOffers(driver, mesosOffers2)
     verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId)
   }
+
+  test("can handle multiple roles") {
+    val driver = mock[SchedulerDriver]
+    val taskScheduler = mock[TaskSchedulerImpl]
+
+    val listenerBus = mock[LiveListenerBus]
+    listenerBus.post(
+      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 
Map.empty)))
+
+    val sc = mock[SparkContext]
+    when(sc.executorMemory).thenReturn(100)
+    when(sc.getSparkHome()).thenReturn(Option("/path"))
+    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+    when(sc.conf).thenReturn(new SparkConf)
+    when(sc.listenerBus).thenReturn(listenerBus)
+
+    val id = 1
+    val builder = Offer.newBuilder()
+    builder.addResourcesBuilder()
+      .setName("mem")
+      .setType(Value.Type.SCALAR)
+      .setRole("prod")
+      .setScalar(Scalar.newBuilder().setValue(500))
+    builder.addResourcesBuilder()
+      .setName("cpus")
+      .setRole("prod")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(1))
+    builder.addResourcesBuilder()
+      .setName("mem")
+      .setRole("dev")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(600))
+    builder.addResourcesBuilder()
+      .setName("cpus")
+      .setRole("dev")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(2))
+    val offer = 
builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
+      .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+      .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
+      .setHostname(s"host${id.toString}").build()
+
+
+    val mesosOffers = new java.util.ArrayList[Offer]
+    mesosOffers.add(offer)
+
+    val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+
+    val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
+    expectedWorkerOffers.append(new WorkerOffer(
+      mesosOffers.get(0).getSlaveId.getValue,
+      mesosOffers.get(0).getHostname,
+      2 // Deducting 1 for executor
+    ))
+
+    val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, 
ByteBuffer.wrap(new Array[Byte](0)))
+    
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
+    when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
+
+    val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]])
+    when(
+      driver.launchTasks(
+        Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+        capture.capture(),
+        any(classOf[Filters])
+      )
+    ).thenReturn(Status.valueOf(1))
+
+    backend.resourceOffers(driver, mesosOffers)
+
+    verify(driver, times(1)).launchTasks(
+      Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+      capture.capture(),
+      any(classOf[Filters])
+    )
+
+    assert(capture.getValue.size() === 1)
+    val taskInfo = capture.getValue.iterator().next()
+    assert(taskInfo.getName.equals("n1"))
+    assert(taskInfo.getResourcesCount === 1)
+    val cpusDev = taskInfo.getResourcesList.get(0)
+    assert(cpusDev.getName.equals("cpus"))
+    assert(cpusDev.getScalar.getValue.equals(1.0))
+    assert(cpusDev.getRole.equals("dev"))
+    val executorResources = taskInfo.getExecutor.getResourcesList
+    assert(executorResources.exists { r =>
+      r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && 
r.getRole.equals("prod")
+    })
+    assert(executorResources.exists { r =>
+      r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && 
r.getRole.equals("prod")
+    })
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d86bbb4e/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 1f915d8..debdd2a 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -307,6 +307,28 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
 </tr>
 <tr>
+  <td><code>spark.mesos.principal</code></td>
+  <td>Framework principal to authenticate to Mesos</td>
+  <td>
+    Set the principal with which Spark framework will use to authenticate with 
Mesos.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.mesos.secret</code></td>
+  <td>Framework secret to authenticate to Mesos</td>
+  <td>
+    Set the secret with which Spark framework will use to authenticate with 
Mesos.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.mesos.role</code></td>
+  <td>Role for the Spark framework</td>
+  <td>
+    Set the role of this Spark framework for Mesos. Roles are used in Mesos 
for reservations
+    and resource weight sharing.
+  </td>
+</tr>
+<tr>
   <td><code>spark.mesos.constraints</code></td>
   <td>Attribute based constraints to be matched against when accepting 
resource offers.</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to