This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cbad616  [SPARK-27371][CORE] Support GPU-aware resources scheduling in 
Standalone
cbad616 is described below

commit cbad616d4cb0c58993a88df14b5e30778c7f7e85
Author: wuyi <ngone_5...@163.com>
AuthorDate: Fri Aug 9 07:49:03 2019 -0500

    [SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone
    
    ## What changes were proposed in this pull request?
    
    In this PR, we implements a complete process of GPU-aware resources 
scheduling
    in Standalone. The whole process looks like: Worker sets up isolated 
resources
    when it starts up and registers to master along with its resources. And, 
Master
    picks up usable workers according to driver/executor's resource 
requirements to
    launch driver/executor on them. Then, Worker launches the driver/executor 
after
    preparing resources file, which is created under driver/executor's working 
directory,
    with specified resource addresses(told by master). When driver/executor 
finished,
    their resources could be recycled to worker. Finally, if a worker stops, it
    should always release its resources firstly.
    
    For the case of Workers and Drivers in **client** mode run on the same 
host, we introduce
    a config option named `spark.resources.coordinate.enable`(default true) to 
indicate
    whether Spark should coordinate resources for user. If 
`spark.resources.coordinate.enable=false`, user should be responsible for 
configuring different resources for Workers and Drivers when use resourcesFile 
or discovery script. If true, Spark would help user to assign different  
resources for Workers and Drivers.
    
    The solution for Spark to coordinate resources among Workers and Drivers is:
    
    Generally, use a shared file named *____allocated_resources____.json* to 
sync allocated
    resources info among Workers and Drivers on the same host.
    
    After a Worker or Driver found all resources using the configured 
resourcesFile and/or
    discovery script during launching, it should filter out available resources 
by excluding resources already allocated in *____allocated_resources____.json* 
and acquire resources from available resources according to its own 
requirement. After that, it should write its allocated resources along with its 
process id (pid) into *____allocated_resources____.json*.  Pid (proposed by 
tgravescs) here used to check whether the allocated resources are still valid 
in case of Worker or Driver cras [...]
    
    Note that we'll always get a file lock before any access to file 
*____allocated_resources____.json*
    and release the lock finally.
    
    Futhermore, we appended resources info in `WorkerSchedulerStateResponse` to 
work
    around master change behaviour in HA mode.
    
    ## How was this patch tested?
    
    Added unit tests in WorkerSuite, MasterSuite, SparkContextSuite.
    
    Manually tested with client/cluster mode (e.g. multiple workers) in a 
single node Standalone.
    
    Closes #25047 from Ngone51/SPARK-27371.
    
    Authored-by: wuyi <ngone_5...@163.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .gitignore                                         |   1 +
 .../main/scala/org/apache/spark/SparkContext.scala |  34 +-
 .../spark/deploy/ApplicationDescription.scala      |   5 +-
 .../scala/org/apache/spark/deploy/Client.scala     |   7 +-
 .../org/apache/spark/deploy/DeployMessage.scala    |  28 +-
 .../apache/spark/deploy/DriverDescription.scala    |   5 +-
 .../apache/spark/deploy/LocalSparkCluster.scala    |   3 +-
 .../spark/deploy/StandaloneResourceUtils.scala     | 348 +++++++++++++++++++++
 .../spark/deploy/master/ApplicationInfo.scala      |   5 +-
 .../apache/spark/deploy/master/DriverInfo.scala    |   8 +
 .../apache/spark/deploy/master/ExecutorDesc.scala  |   6 +-
 .../org/apache/spark/deploy/master/Master.scala    | 108 +++++--
 .../apache/spark/deploy/master/WorkerInfo.scala    |  58 +++-
 .../spark/deploy/rest/StandaloneRestServer.scala   |   6 +-
 .../apache/spark/deploy/worker/DriverRunner.scala  |  14 +-
 .../spark/deploy/worker/ExecutorRunner.scala       |  11 +-
 .../org/apache/spark/deploy/worker/Worker.scala    |  77 +++--
 .../executor/CoarseGrainedExecutorBackend.scala    |   4 +-
 .../org/apache/spark/internal/config/package.scala |  17 +
 .../ResourceAllocator.scala}                       |  17 +-
 .../org/apache/spark/resource/ResourceUtils.scala  |  42 ++-
 .../spark/scheduler/ExecutorResourceInfo.scala     |  77 +----
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |   7 +-
 .../cluster/StandaloneSchedulerBackend.scala       |   6 +-
 .../main/scala/org/apache/spark/util/Utils.scala   |  42 +++
 .../scala/org/apache/spark/SparkConfSuite.scala    |   6 +-
 .../scala/org/apache/spark/SparkContextSuite.scala |  15 +-
 .../org/apache/spark/deploy/DeployTestUtils.scala  |   4 +-
 .../apache/spark/deploy/master/MasterSuite.scala   | 159 +++++++---
 .../deploy/master/PersistenceEngineSuite.scala     |   3 +-
 .../apache/spark/deploy/worker/WorkerSuite.scala   | 170 +++++++++-
 .../CoarseGrainedExecutorBackendSuite.scala        |   2 +-
 .../apache/spark/resource/ResourceUtilsSuite.scala |   2 +-
 .../apache/spark/resource/TestResourceIDs.scala    |   4 +
 docs/configuration.md                              |  23 +-
 docs/spark-standalone.md                           |  31 ++
 python/pyspark/tests/test_context.py               |   3 +-
 python/pyspark/tests/test_taskcontext.py           |   8 +-
 38 files changed, 1127 insertions(+), 239 deletions(-)

diff --git a/.gitignore b/.gitignore
index 4b1ba1c..ae20c85 100644
--- a/.gitignore
+++ b/.gitignore
@@ -71,6 +71,7 @@ scalastyle-on-compile.generated.xml
 scalastyle-output.xml
 scalastyle.txt
 spark-*-bin-*.tgz
+spark-resources/
 spark-tests.log
 src_managed/
 streaming-tests.log
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index aa71b21..396d712 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat 
=> NewFileInputFor
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
+import org.apache.spark.deploy.StandaloneResourceUtils._
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.input.{FixedLengthBinaryInputFormat, 
PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
 import org.apache.spark.internal.Logging
@@ -245,6 +246,15 @@ class SparkContext(config: SparkConf) extends Logging {
 
   def isLocal: Boolean = Utils.isLocalMaster(_conf)
 
+  private def isClientStandalone: Boolean = {
+    val isSparkCluster = master match {
+      case SparkMasterRegex.SPARK_REGEX(_) => true
+      case SparkMasterRegex.LOCAL_CLUSTER_REGEX(_, _, _) => true
+      case _ => false
+    }
+    deployMode == "client" && isSparkCluster
+  }
+
   /**
    * @return true if context is stopped or in the midst of stopping.
    */
@@ -380,7 +390,18 @@ class SparkContext(config: SparkConf) extends Logging {
     _driverLogger = DriverLogger(_conf)
 
     val resourcesFileOpt = conf.get(DRIVER_RESOURCES_FILE)
-    _resources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, 
resourcesFileOpt)
+    val allResources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, 
resourcesFileOpt)
+    _resources = {
+      // driver submitted in client mode under Standalone may have conflicting 
resources with
+      // other drivers/workers on this host. We should sync driver's resources 
info into
+      // SPARK_RESOURCES/SPARK_RESOURCES_COORDINATE_DIR/ to avoid collision.
+      if (isClientStandalone) {
+        acquireResources(_conf, SPARK_DRIVER_PREFIX, allResources, 
Utils.getProcessId)
+      } else {
+        allResources
+      }
+    }
+    logResourceInfo(SPARK_DRIVER_PREFIX, _resources)
 
     // log out spark.app.name in the Spark driver logs
     logInfo(s"Submitted application: $appName")
@@ -1911,8 +1932,10 @@ class SparkContext(config: SparkConf) extends Logging {
       ShutdownHookManager.removeShutdownHook(_shutdownHookRef)
     }
 
-    Utils.tryLogNonFatalError {
-      postApplicationEnd()
+    if (listenerBus != null) {
+      Utils.tryLogNonFatalError {
+        postApplicationEnd()
+      }
     }
     Utils.tryLogNonFatalError {
       _driverLogger.foreach(_.stop())
@@ -1960,6 +1983,9 @@ class SparkContext(config: SparkConf) extends Logging {
     Utils.tryLogNonFatalError {
       _progressBar.foreach(_.stop())
     }
+    if (isClientStandalone) {
+      releaseResources(_conf, SPARK_DRIVER_PREFIX, _resources, 
Utils.getProcessId)
+    }
     _taskScheduler = null
     // TODO: Cache.stop()?
     if (_env != null) {
@@ -2726,7 +2752,7 @@ object SparkContext extends Logging {
 
       // Calculate the max slots each executor can provide based on resources 
available on each
       // executor and resources required by each task.
-      val taskResourceRequirements = parseTaskResourceRequirements(sc.conf)
+      val taskResourceRequirements = parseResourceRequirements(sc.conf, 
SPARK_TASK_PREFIX)
       val executorResourcesAndAmounts =
         parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX)
           .map(request => (request.id.resourceName, request.amount)).toMap
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala 
b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index c5c5c60..e11f497 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -19,6 +19,8 @@ package org.apache.spark.deploy
 
 import java.net.URI
 
+import org.apache.spark.resource.ResourceRequirement
+
 private[spark] case class ApplicationDescription(
     name: String,
     maxCores: Option[Int],
@@ -32,7 +34,8 @@ private[spark] case class ApplicationDescription(
     // number of executors this application wants to start with,
     // only used if dynamic allocation is enabled
     initialExecutorLimit: Option[Int] = None,
-    user: String = System.getProperty("user.name", "<unknown>")) {
+    user: String = System.getProperty("user.name", "<unknown>"),
+    resourceReqsPerExecutor: Seq[ResourceRequirement] = Seq.empty) {
 
   override def toString: String = "ApplicationDescription(" + name + ")"
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index ea7c902..648a8b1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -29,6 +29,7 @@ import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT
+import org.apache.spark.resource.ResourceUtils
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
 import org.apache.spark.util.{SparkExitCode, ThreadUtils, Utils}
 
@@ -92,13 +93,15 @@ private class ClientEndpoint(
         val command = new Command(mainClass,
           Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ 
driverArgs.driverOptions,
           sys.env, classPathEntries, libraryPathEntries, javaOpts)
-
+        val driverResourceReqs = ResourceUtils.parseResourceRequirements(conf,
+          config.SPARK_DRIVER_PREFIX)
         val driverDescription = new DriverDescription(
           driverArgs.jarUrl,
           driverArgs.memory,
           driverArgs.cores,
           driverArgs.supervise,
-          command)
+          command,
+          driverResourceReqs)
         asyncSendToMasterAndForwardReply[SubmitDriverResponse](
           RequestSubmitDriver(driverDescription))
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala 
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 5723b0f..3f1d1ae 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -24,6 +24,7 @@ import org.apache.spark.deploy.master.{ApplicationInfo, 
DriverInfo, WorkerInfo}
 import org.apache.spark.deploy.master.DriverState.DriverState
 import org.apache.spark.deploy.master.RecoveryState.MasterState
 import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
+import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
 import org.apache.spark.util.Utils
 
@@ -31,7 +32,6 @@ private[deploy] sealed trait DeployMessage extends 
Serializable
 
 /** Contains messages sent between Scheduler endpoint nodes. */
 private[deploy] object DeployMessages {
-
   // Worker to Master
 
   /**
@@ -43,6 +43,7 @@ private[deploy] object DeployMessages {
    * @param memory the memory size of worker
    * @param workerWebUiUrl the worker Web UI address
    * @param masterAddress the master address used by the worker to connect
+   * @param resources the resources of worker
    */
   case class RegisterWorker(
       id: String,
@@ -52,7 +53,8 @@ private[deploy] object DeployMessages {
       cores: Int,
       memory: Int,
       workerWebUiUrl: String,
-      masterAddress: RpcAddress)
+      masterAddress: RpcAddress,
+      resources: Map[String, ResourceInformation] = Map.empty)
     extends DeployMessage {
     Utils.checkHost(host)
     assert (port > 0)
@@ -72,8 +74,18 @@ private[deploy] object DeployMessages {
       exception: Option[Exception])
     extends DeployMessage
 
-  case class WorkerSchedulerStateResponse(id: String, executors: 
List[ExecutorDescription],
-     driverIds: Seq[String])
+  case class WorkerExecutorStateResponse(
+      desc: ExecutorDescription,
+      resources: Map[String, ResourceInformation])
+
+  case class WorkerDriverStateResponse(
+      driverId: String,
+      resources: Map[String, ResourceInformation])
+
+  case class WorkerSchedulerStateResponse(
+      id: String,
+      execResponses: List[WorkerExecutorStateResponse],
+      driverResponses: Seq[WorkerDriverStateResponse])
 
   /**
    * A worker will send this message to the master when it registers with the 
master. Then the
@@ -118,10 +130,14 @@ private[deploy] object DeployMessages {
       execId: Int,
       appDesc: ApplicationDescription,
       cores: Int,
-      memory: Int)
+      memory: Int,
+      resources: Map[String, ResourceInformation] = Map.empty)
     extends DeployMessage
 
-  case class LaunchDriver(driverId: String, driverDesc: DriverDescription) 
extends DeployMessage
+  case class LaunchDriver(
+      driverId: String,
+      driverDesc: DriverDescription,
+      resources: Map[String, ResourceInformation] = Map.empty) extends 
DeployMessage
 
   case class KillDriver(driverId: String) extends DeployMessage
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala 
b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
index 1f5626a..02c166b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.deploy
 
+import org.apache.spark.resource.ResourceRequirement
+
 private[deploy] case class DriverDescription(
     jarUrl: String,
     mem: Int,
     cores: Int,
     supervise: Boolean,
-    command: Command) {
+    command: Command,
+    resourceReqs: Seq[ResourceRequirement] = Seq.empty) {
 
   override def toString: String = s"DriverDescription (${command.mainClass})"
 }
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala 
b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index c1866b4..f1b58eb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -64,7 +64,8 @@ class LocalSparkCluster(
     /* Start the Workers */
     for (workerNum <- 1 to numWorkers) {
       val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, 
coresPerWorker,
-        memoryPerWorker, masters, null, Some(workerNum), _conf)
+        memoryPerWorker, masters, null, Some(workerNum), _conf,
+        conf.get(config.Worker.SPARK_WORKER_RESOURCE_FILE))
       workerRpcEnvs += workerEnv
     }
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala 
b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
new file mode 100644
index 0000000..b64a36f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.{File, RandomAccessFile}
+import java.nio.channels.{FileLock, OverlappingFileLockException}
+import java.nio.file.Files
+
+import scala.collection.mutable
+import scala.util.Random
+import scala.util.control.NonFatal
+
+import org.json4s.{DefaultFormats, Extraction}
+import org.json4s.jackson.JsonMethods.{compact, parse, render}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{SPARK_RESOURCES_COORDINATE, 
SPARK_RESOURCES_DIR}
+import org.apache.spark.resource.{ResourceAllocation, ResourceID, 
ResourceInformation, ResourceRequirement}
+import org.apache.spark.resource.ResourceUtils.{parseResourceRequirements, 
withResourcesJson}
+import org.apache.spark.util.Utils
+
+private[spark] object StandaloneResourceUtils extends Logging {
+  // These directory/files are used to coordinate the resources between
+  // the drivers/workers on the host in Spark Standalone.
+  val SPARK_RESOURCES_COORDINATE_DIR = "spark-resources"
+  val ALLOCATED_RESOURCES_FILE = "__allocated_resources__.json"
+  val RESOURCES_LOCK_FILE = "__allocated_resources__.lock"
+
+  /**
+   * Resource allocation used in Standalone only, which tracks assignments with
+   * worker/driver(client only) pid.
+   */
+  case class StandaloneResourceAllocation(pid: Int, allocations: 
Seq[ResourceAllocation]) {
+    // convert allocations to a resource information map
+    def toResourceInformationMap: Map[String, ResourceInformation] = {
+      allocations.map { allocation =>
+        allocation.id.resourceName -> allocation.toResourceInformation
+      }.toMap
+    }
+  }
+
+  /**
+   * Assigns (if coordinate needed) resources to workers/drivers from the same 
host to avoid
+   * address conflict.
+   *
+   * This function works in three steps. First, acquiring the lock on 
RESOURCES_LOCK_FILE
+   * to achieve synchronization among workers and drivers. Second, getting all 
allocated
+   * resources from ALLOCATED_RESOURCES_FILE and assigning isolated resources 
to the worker
+   * or driver after differentiating available resources in discovered 
resources from
+   * allocated resources. If available resources don't meet worker's or 
driver's requirement,
+   * try to update allocated resources by excluding the resource allocation if 
its related
+   * process has already terminated and do the assignment again. If still 
don't meet requirement,
+   * exception should be thrown. Third, updating ALLOCATED_RESOURCES_FILE with 
new allocated
+   * resources along with pid for the worker or driver. Then, return allocated 
resources
+   * information after releasing the lock.
+   *
+   * @param conf SparkConf
+   * @param componentName spark.driver / spark.worker
+   * @param resources the resources found by worker/driver on the host
+   * @param pid the process id of worker/driver to acquire resources.
+   * @return allocated resources for the worker/driver or throws exception if 
can't
+   *         meet worker/driver's requirement
+   */
+  def acquireResources(
+      conf: SparkConf,
+      componentName: String,
+      resources: Map[String, ResourceInformation],
+      pid: Int)
+    : Map[String, ResourceInformation] = {
+    if (!needCoordinate(conf)) {
+      return resources
+    }
+    val resourceRequirements = parseResourceRequirements(conf, componentName)
+    if (resourceRequirements.isEmpty) {
+      return Map.empty
+    }
+    val lock = acquireLock(conf)
+    try {
+      val resourcesFile = new File(getOrCreateResourcesDir(conf), 
ALLOCATED_RESOURCES_FILE)
+      // all allocated resources in ALLOCATED_RESOURCES_FILE, can be updated 
if any allocations'
+      // related processes detected to be terminated while checking pids below.
+      var origAllocation = Seq.empty[StandaloneResourceAllocation]
+      // Map[pid -> Map[resourceName -> Addresses[]]]
+      var allocated = {
+        if (resourcesFile.exists()) {
+          origAllocation = allocatedStandaloneResources(resourcesFile.getPath)
+          val allocations = origAllocation.map { resource =>
+            val resourceMap = {
+              resource.allocations.map { allocation =>
+                allocation.id.resourceName -> allocation.addresses.toArray
+              }.toMap
+            }
+            resource.pid -> resourceMap
+          }.toMap
+          allocations
+        } else {
+          Map.empty[Int, Map[String, Array[String]]]
+        }
+      }
+
+      // new allocated resources for worker or driver,
+      // map from resource name to its allocated addresses.
+      var newAssignments: Map[String, Array[String]] = null
+      // Whether we've checked process status and we'll only do the check at 
most once.
+      // Do the check iff the available resources can't meet the requirements 
at the first time.
+      var checked = false
+      // Whether we need to keep allocating for the worker/driver and we'll 
only go through
+      // the loop at most twice.
+      var keepAllocating = true
+      while (keepAllocating) {
+        keepAllocating = false
+        // store the pid whose related allocated resources conflict with
+        // discovered resources passed in.
+        val pidsToCheck = mutable.Set[Int]()
+        newAssignments = resourceRequirements.map { req =>
+          val rName = req.resourceName
+          val amount = req.amount
+          // initially, we must have available.length >= amount as we've done 
pre-check previously
+          var available = resources(rName).addresses
+          // gets available resource addresses by excluding all
+          // allocated resource addresses from discovered resources
+          allocated.foreach { a =>
+            val thePid = a._1
+            val resourceMap = a._2
+            val assigned = resourceMap.getOrElse(rName, Array.empty)
+            val retained = available.diff(assigned)
+            // if len(retained) < len(available) after differ to assigned, 
then, there must be
+            // some conflicting resources addresses between available and 
assigned. So, we should
+            // store its pid here to check whether it's alive in case we don't 
find enough
+            // resources after traversal all allocated resources.
+            if (retained.length < available.length && !checked) {
+              pidsToCheck += thePid
+            }
+            if (retained.length >= amount) {
+              available = retained
+            } else if (checked) {
+              keepAllocating = false
+              throw new SparkException(s"No more resources available since 
they've already" +
+                s" assigned to other workers/drivers.")
+            } else {
+              keepAllocating = true
+            }
+          }
+          val assigned = {
+            if (keepAllocating) { // can't meet the requirement
+              // excludes the allocation whose related process has already 
been terminated.
+              val (invalid, valid) = allocated.partition { a =>
+                pidsToCheck(a._1) && !(Utils.isTesting || 
Utils.isProcessRunning(a._1))}
+              allocated = valid
+              origAllocation = origAllocation.filter(
+                allocation => !invalid.contains(allocation.pid))
+              checked = true
+              // note this is a meaningless return value, just to avoid 
creating any new object
+              available
+            } else {
+              available.take(amount)
+            }
+          }
+          rName -> assigned
+        }.toMap
+      }
+      val newAllocation = {
+        val allocations = newAssignments.map { case (rName, addresses) =>
+          ResourceAllocation(ResourceID(componentName, rName), addresses)
+        }.toSeq
+        StandaloneResourceAllocation(pid, allocations)
+      }
+      writeResourceAllocationJson(
+        componentName, origAllocation ++ Seq(newAllocation), resourcesFile)
+      newAllocation.toResourceInformationMap
+    } finally {
+      releaseLock(lock)
+    }
+  }
+
+  /**
+   * Frees (if coordinate needed) all the resources a worker/driver (pid) has 
in one shot
+   * to make those resources be available for other workers/drivers on the 
same host.
+   * @param conf SparkConf
+   * @param componentName spark.driver / spark.worker
+   * @param toRelease the resources expected to release
+   * @param pid the process id of worker/driver to release resources.
+   */
+  def releaseResources(
+      conf: SparkConf,
+      componentName: String,
+      toRelease: Map[String, ResourceInformation],
+      pid: Int)
+    : Unit = {
+    if (!needCoordinate(conf)) {
+      return
+    }
+    if (toRelease != null && toRelease.nonEmpty) {
+      val lock = acquireLock(conf)
+      try {
+        val resourcesFile = new File(getOrCreateResourcesDir(conf), 
ALLOCATED_RESOURCES_FILE)
+        if (resourcesFile.exists()) {
+          val (target, others) =
+            
allocatedStandaloneResources(resourcesFile.getPath).partition(_.pid == pid)
+          if (target.nonEmpty) {
+            if (others.isEmpty) {
+              if (!resourcesFile.delete()) {
+                logError(s"Failed to delete $ALLOCATED_RESOURCES_FILE.")
+              }
+            } else {
+              writeResourceAllocationJson(componentName, others, resourcesFile)
+            }
+            logDebug(s"$componentName(pid=$pid) released resources: 
${toRelease.mkString("\n")}")
+          } else {
+            logWarning(s"$componentName(pid=$pid) has already released its 
resources.")
+          }
+        }
+      } finally {
+        releaseLock(lock)
+      }
+    }
+  }
+
+  private def acquireLock(conf: SparkConf): FileLock = {
+    val resourcesDir = getOrCreateResourcesDir(conf)
+    val lockFile = new File(resourcesDir, RESOURCES_LOCK_FILE)
+    val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel
+    var keepTry = true
+    var lock: FileLock = null
+    while (keepTry) {
+      try {
+        lock = lockFileChannel.lock()
+        logInfo(s"Acquired lock on $RESOURCES_LOCK_FILE.")
+        keepTry = false
+      } catch {
+        case e: OverlappingFileLockException =>
+          // This exception throws when we're in LocalSparkCluster mode. 
FileLock is designed
+          // to be used across JVMs, but our LocalSparkCluster is designed to 
launch multiple
+          // workers in the same JVM. As a result, when an worker in 
LocalSparkCluster try to
+          // acquire the lock on `resources.lock` which already locked by 
other worker, we'll
+          // hit this exception. So, we should manually control it.
+          keepTry = true
+          // there may be multiple workers race for the lock,
+          // so, sleep for a random time to avoid possible conflict
+          val duration = Random.nextInt(1000) + 1000
+          Thread.sleep(duration)
+      }
+    }
+    assert(lock != null, s"Acquired null lock on $RESOURCES_LOCK_FILE.")
+    lock
+  }
+
+  private def releaseLock(lock: FileLock): Unit = {
+    try {
+      lock.release()
+      lock.channel().close()
+      logInfo(s"Released lock on $RESOURCES_LOCK_FILE.")
+    } catch {
+      case e: Exception =>
+        logError(s"Error while releasing lock on $RESOURCES_LOCK_FILE.", e)
+    }
+  }
+
+  private def getOrCreateResourcesDir(conf: SparkConf): File = {
+    val coordinateDir = new File(conf.get(SPARK_RESOURCES_DIR).getOrElse {
+      val sparkHome = if (Utils.isTesting) {
+        assert(sys.props.contains("spark.test.home") ||
+          sys.env.contains("SPARK_HOME"), "spark.test.home or SPARK_HOME is 
not set.")
+        sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME"))
+      } else {
+        sys.env.getOrElse("SPARK_HOME", ".")
+      }
+      sparkHome
+    })
+    val resourceDir = new File(coordinateDir, SPARK_RESOURCES_COORDINATE_DIR)
+    if (!resourceDir.exists()) {
+      Utils.createDirectory(resourceDir)
+    }
+    resourceDir
+  }
+
+  private def allocatedStandaloneResources(resourcesFile: String)
+  : Seq[StandaloneResourceAllocation] = {
+    withResourcesJson[StandaloneResourceAllocation](resourcesFile) { json =>
+      implicit val formats = DefaultFormats
+      parse(json).extract[Seq[StandaloneResourceAllocation]]
+    }
+  }
+
+  /**
+   * Save the allocated resources of driver(cluster only) or executor into a 
JSON formatted
+   * resources file. Used in Standalone only.
+   * @param componentName spark.driver / spark.executor
+   * @param resources allocated resources for driver(cluster only) or executor
+   * @param dir the target directory used to place the resources file
+   * @return None if resources is empty or Some(file) which represents the 
resources file
+   */
+  def prepareResourcesFile(
+      componentName: String,
+      resources: Map[String, ResourceInformation],
+      dir: File): Option[File] = {
+    if (resources.isEmpty) {
+      return None
+    }
+
+    val compShortName = componentName.substring(componentName.lastIndexOf(".") 
+ 1)
+    val tmpFile = Utils.tempFileWith(dir)
+    val allocations = resources.map { case (rName, rInfo) =>
+      ResourceAllocation(ResourceID(componentName, rName), rInfo.addresses)
+    }.toSeq
+    try {
+      writeResourceAllocationJson(componentName, allocations, tmpFile)
+    } catch {
+      case NonFatal(e) =>
+        val errMsg = s"Exception threw while preparing resource file for 
$compShortName"
+        logError(errMsg, e)
+        throw new SparkException(errMsg, e)
+    }
+    val resourcesFile = File.createTempFile(s"resource-$compShortName-", 
".json", dir)
+    tmpFile.renameTo(resourcesFile)
+    Some(resourcesFile)
+  }
+
+  private def writeResourceAllocationJson[T](
+      componentName: String,
+      allocations: Seq[T],
+      jsonFile: File): Unit = {
+    implicit val formats = DefaultFormats
+    val allocationJson = Extraction.decompose(allocations)
+    Files.write(jsonFile.toPath, compact(render(allocationJson)).getBytes())
+  }
+
+  /** Whether needs to coordinate resources among workers and drivers for user 
*/
+  def needCoordinate(conf: SparkConf): Boolean = {
+    conf.get(SPARK_RESOURCES_COORDINATE)
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index 53564d0..6c56807 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.deploy.ApplicationDescription
+import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.util.Utils
 
@@ -82,8 +83,10 @@ private[spark] class ApplicationInfo(
   private[master] def addExecutor(
       worker: WorkerInfo,
       cores: Int,
+      resources: Map[String, ResourceInformation],
       useID: Option[Int] = None): ExecutorDesc = {
-    val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, 
desc.memoryPerExecutorMB)
+    val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores,
+      desc.memoryPerExecutorMB, resources)
     executors(exec.id) = exec
     coresGranted += cores
     exec
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
index 8d5edae..bf68ba8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
 import java.util.Date
 
 import org.apache.spark.deploy.DriverDescription
+import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.util.Utils
 
 private[deploy] class DriverInfo(
@@ -34,6 +35,9 @@ private[deploy] class DriverInfo(
   @transient var exception: Option[Exception] = None
   /* Most recent worker assigned to this driver */
   @transient var worker: Option[WorkerInfo] = None
+  // resources(e.f. gpu/fpga) allocated to this driver
+  // map from resource name to ResourceInformation
+  private var _resources: Map[String, ResourceInformation] = _
 
   init()
 
@@ -47,4 +51,8 @@ private[deploy] class DriverInfo(
     worker = None
     exception = None
   }
+
+  def withResources(r: Map[String, ResourceInformation]): Unit = _resources = r
+
+  def resources: Map[String, ResourceInformation] = _resources
 }
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala
index fc62b09..a8f8492 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala
@@ -18,13 +18,17 @@
 package org.apache.spark.deploy.master
 
 import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
+import org.apache.spark.resource.ResourceInformation
 
 private[master] class ExecutorDesc(
     val id: Int,
     val application: ApplicationInfo,
     val worker: WorkerInfo,
     val cores: Int,
-    val memory: Int) {
+    val memory: Int,
+    // resources(e.f. gpu/fpga) allocated to this executor
+    // map from resource name to ResourceInformation
+    val resources: Map[String, ResourceInformation]) {
 
   var state = ExecutorState.LAUNCHING
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 3c0a49e..6765519 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -25,8 +25,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, 
HashSet}
 import scala.util.Random
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
-  ExecutorState, SparkHadoopUtil}
+import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, 
ExecutorState, SparkHadoopUtil}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.DriverState.DriverState
 import org.apache.spark.deploy.master.MasterMessages._
@@ -38,6 +37,7 @@ import org.apache.spark.internal.config.Deploy._
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.internal.config.Worker._
 import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
+import org.apache.spark.resource.{ResourceRequirement, ResourceUtils}
 import org.apache.spark.rpc._
 import org.apache.spark.serializer.{JavaSerializer, Serializer}
 import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, 
Utils}
@@ -244,7 +244,8 @@ private[deploy] class Master(
       System.exit(0)
 
     case RegisterWorker(
-      id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, 
masterAddress) =>
+      id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl,
+      masterAddress, resources) =>
       logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
         workerHost, workerPort, cores, Utils.megabytesToString(memory)))
       if (state == RecoveryState.STANDBY) {
@@ -252,8 +253,9 @@ private[deploy] class Master(
       } else if (idToWorker.contains(id)) {
         workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, 
true))
       } else {
+        val workerResources = resources.map(r => r._1 -> 
WorkerResourceInfo(r._1, r._2.addresses))
         val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
-          workerRef, workerWebUiUrl)
+          workerRef, workerWebUiUrl, workerResources)
         if (registerWorker(worker)) {
           persistenceEngine.addWorker(worker)
           workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, 
false))
@@ -361,24 +363,31 @@ private[deploy] class Master(
 
       if (canCompleteRecovery) { completeRecovery() }
 
-    case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
+    case WorkerSchedulerStateResponse(workerId, execResponses, 
driverResponses) =>
       idToWorker.get(workerId) match {
         case Some(worker) =>
           logInfo("Worker has been re-registered: " + workerId)
           worker.state = WorkerState.ALIVE
 
-          val validExecutors = executors.filter(exec => 
idToApp.get(exec.appId).isDefined)
+          val validExecutors = execResponses.filter(
+            exec => idToApp.get(exec.desc.appId).isDefined)
           for (exec <- validExecutors) {
-            val app = idToApp(exec.appId)
-            val execInfo = app.addExecutor(worker, exec.cores, 
Some(exec.execId))
+            val (execDesc, execResources) = (exec.desc, exec.resources)
+            val app = idToApp(execDesc.appId)
+            val execInfo = app.addExecutor(
+              worker, execDesc.cores, execResources, Some(execDesc.execId))
             worker.addExecutor(execInfo)
-            execInfo.copyState(exec)
+            worker.recoverResources(execResources)
+            execInfo.copyState(execDesc)
           }
 
-          for (driverId <- driverIds) {
+          for (driver <- driverResponses) {
+            val (driverId, driverResource) = (driver.driverId, 
driver.resources)
             drivers.find(_.id == driverId).foreach { driver =>
               driver.worker = Some(worker)
               driver.state = DriverState.RUNNING
+              driver.withResources(driverResource)
+              worker.recoverResources(driverResource)
               worker.addDriver(driver)
             }
           }
@@ -614,24 +623,34 @@ private[deploy] class Master(
     val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
     val oneExecutorPerWorker = coresPerExecutor.isEmpty
     val memoryPerExecutor = app.desc.memoryPerExecutorMB
+    val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor
     val numUsable = usableWorkers.length
     val assignedCores = new Array[Int](numUsable) // Number of cores to give 
to each worker
     val assignedExecutors = new Array[Int](numUsable) // Number of new 
executors on each worker
     var coresToAssign = math.min(app.coresLeft, 
usableWorkers.map(_.coresFree).sum)
 
     /** Return whether the specified worker can launch an executor for this 
app. */
-    def canLaunchExecutor(pos: Int): Boolean = {
+    def canLaunchExecutorForApp(pos: Int): Boolean = {
       val keepScheduling = coresToAssign >= minCoresPerExecutor
       val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= 
minCoresPerExecutor
+      val assignedExecutorNum = assignedExecutors(pos)
 
       // If we allow multiple executors per worker, then we can always launch 
new executors.
       // Otherwise, if there is already an executor on this worker, just give 
it more cores.
-      val launchingNewExecutor = !oneExecutorPerWorker || 
assignedExecutors(pos) == 0
+      val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum 
== 0
       if (launchingNewExecutor) {
-        val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
+        val assignedMemory = assignedExecutorNum * memoryPerExecutor
         val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= 
memoryPerExecutor
+        val assignedResources = resourceReqsPerExecutor.map {
+          req => req.resourceName -> req.amount * assignedExecutorNum
+        }.toMap
+        val resourcesFree = usableWorkers(pos).resourcesFree.map {
+          case (rName, free) => rName -> (free - 
assignedResources.getOrElse(rName, 0))
+        }
+        val enoughResources = ResourceUtils.resourcesMeetRequirements(
+          resourcesFree, resourceReqsPerExecutor)
         val underLimit = assignedExecutors.sum + app.executors.size < 
app.executorLimit
-        keepScheduling && enoughCores && enoughMemory && underLimit
+        keepScheduling && enoughCores && enoughMemory && enoughResources && 
underLimit
       } else {
         // We're adding cores to an existing executor, so no need
         // to check memory and executor limits
@@ -641,11 +660,11 @@ private[deploy] class Master(
 
     // Keep launching executors until no more workers can accommodate any
     // more executors, or if we have reached this application's limits
-    var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
+    var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)
     while (freeWorkers.nonEmpty) {
       freeWorkers.foreach { pos =>
         var keepScheduling = true
-        while (keepScheduling && canLaunchExecutor(pos)) {
+        while (keepScheduling && canLaunchExecutorForApp(pos)) {
           coresToAssign -= minCoresPerExecutor
           assignedCores(pos) += minCoresPerExecutor
 
@@ -666,7 +685,7 @@ private[deploy] class Master(
           }
         }
       }
-      freeWorkers = freeWorkers.filter(canLaunchExecutor)
+      freeWorkers = freeWorkers.filter(canLaunchExecutorForApp)
     }
     assignedCores
   }
@@ -683,9 +702,11 @@ private[deploy] class Master(
       if (app.coresLeft >= coresPerExecutor) {
         // Filter out workers that don't have enough resources to launch an 
executor
         val usableWorkers = workers.toArray.filter(_.state == 
WorkerState.ALIVE)
-          .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB 
&&
-            worker.coresFree >= coresPerExecutor)
+          .filter(canLaunchExecutor(_, app.desc))
           .sortBy(_.coresFree).reverse
+        if (waitingApps.length == 1 && usableWorkers.isEmpty) {
+          logWarning(s"App ${app.id} requires more resource than any of 
Workers could have.")
+        }
         val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, 
spreadOutApps)
 
         // Now that we've decided how many cores to allocate on each worker, 
let's allocate them
@@ -715,12 +736,44 @@ private[deploy] class Master(
     val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
     val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
     for (i <- 1 to numExecutors) {
-      val exec = app.addExecutor(worker, coresToAssign)
+      val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor)
+      val exec = app.addExecutor(worker, coresToAssign, allocated)
       launchExecutor(worker, exec)
       app.state = ApplicationState.RUNNING
     }
   }
 
+  private def canLaunch(
+      worker: WorkerInfo,
+      memoryReq: Int,
+      coresReq: Int,
+      resourceRequirements: Seq[ResourceRequirement])
+    : Boolean = {
+    val enoughMem = worker.memoryFree >= memoryReq
+    val enoughCores = worker.coresFree >= coresReq
+    val enoughResources = ResourceUtils.resourcesMeetRequirements(
+      worker.resourcesFree, resourceRequirements)
+    enoughMem && enoughCores && enoughResources
+  }
+
+  /**
+   * @return whether the worker could launch the driver represented by 
DriverDescription
+   */
+  private def canLaunchDriver(worker: WorkerInfo, desc: DriverDescription): 
Boolean = {
+    canLaunch(worker, desc.mem, desc.cores, desc.resourceReqs)
+  }
+
+  /**
+   * @return whether the worker could launch the executor according to 
application's requirement
+   */
+  private def canLaunchExecutor(worker: WorkerInfo, desc: 
ApplicationDescription): Boolean = {
+    canLaunch(
+      worker,
+      desc.memoryPerExecutorMB,
+      desc.coresPerExecutor.getOrElse(1),
+      desc.resourceReqsPerExecutor)
+  }
+
   /**
    * Schedule the currently available resources among waiting apps. This 
method will be called
    * every time a new app joins or resource availability changes.
@@ -738,17 +791,24 @@ private[deploy] class Master(
       // start from the last worker that was assigned a driver, and continue 
onwards until we have
       // explored all alive workers.
       var launched = false
+      var isClusterIdle = true
       var numWorkersVisited = 0
       while (numWorkersVisited < numWorkersAlive && !launched) {
         val worker = shuffledAliveWorkers(curPos)
+        isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmpty
         numWorkersVisited += 1
-        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= 
driver.desc.cores) {
+        if (canLaunchDriver(worker, driver.desc)) {
+          val allocated = worker.acquireResources(driver.desc.resourceReqs)
+          driver.withResources(allocated)
           launchDriver(worker, driver)
           waitingDrivers -= driver
           launched = true
         }
         curPos = (curPos + 1) % numWorkersAlive
       }
+      if (!launched && isClusterIdle) {
+        logWarning(s"Driver ${driver.id} requires more resource than any of 
Workers could have.")
+      }
     }
     startExecutorsOnWorkers()
   }
@@ -756,8 +816,8 @@ private[deploy] class Master(
   private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
     logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
     worker.addExecutor(exec)
-    worker.endpoint.send(LaunchExecutor(masterUrl,
-      exec.application.id, exec.id, exec.application.desc, exec.cores, 
exec.memory))
+    worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, 
exec.id,
+      exec.application.desc, exec.cores, exec.memory, exec.resources))
     exec.application.driver.send(
       ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, 
exec.memory))
   }
@@ -1021,7 +1081,7 @@ private[deploy] class Master(
     logInfo("Launching driver " + driver.id + " on worker " + worker.id)
     worker.addDriver(driver)
     driver.worker = Some(worker)
-    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
+    worker.endpoint.send(LaunchDriver(driver.id, driver.desc, 
driver.resources))
     driver.state = DriverState.RUNNING
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index c87d6e2..d485db4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -19,9 +19,24 @@ package org.apache.spark.deploy.master
 
 import scala.collection.mutable
 
+import org.apache.spark.resource.{ResourceAllocator, ResourceInformation, 
ResourceRequirement}
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.util.Utils
 
+private[spark] case class WorkerResourceInfo(name: String, addresses: 
Seq[String])
+  extends ResourceAllocator(name, addresses) {
+
+  def toResourceInformation(): ResourceInformation = {
+    new ResourceInformation(name, addresses.toArray)
+  }
+
+  def acquire(amount: Int): ResourceInformation = {
+    val allocated = availableAddrs.take(amount)
+    acquire(allocated)
+    new ResourceInformation(name, allocated.toArray)
+  }
+}
+
 private[spark] class WorkerInfo(
     val id: String,
     val host: String,
@@ -29,7 +44,9 @@ private[spark] class WorkerInfo(
     val cores: Int,
     val memory: Int,
     val endpoint: RpcEndpointRef,
-    val webUiAddress: String)
+    val webUiAddress: String,
+    val resources: Map[String, WorkerResourceInfo],
+    val pid: Int = 0)
   extends Serializable {
 
   Utils.checkHost(host)
@@ -47,6 +64,11 @@ private[spark] class WorkerInfo(
 
   def coresFree: Int = cores - coresUsed
   def memoryFree: Int = memory - memoryUsed
+  def resourcesFree: Map[String, Int] = {
+    resources.map { case (rName, rInfo) =>
+      rName -> rInfo.availableAddrs.length
+    }
+  }
 
   private def readObject(in: java.io.ObjectInputStream): Unit = 
Utils.tryOrIOException {
     in.defaultReadObject()
@@ -78,6 +100,7 @@ private[spark] class WorkerInfo(
       executors -= exec.fullId
       coresUsed -= exec.cores
       memoryUsed -= exec.memory
+      releaseResources(exec.resources)
     }
   }
 
@@ -95,6 +118,7 @@ private[spark] class WorkerInfo(
     drivers -= driver.id
     memoryUsed -= driver.desc.mem
     coresUsed -= driver.desc.cores
+    releaseResources(driver.resources)
   }
 
   def setState(state: WorkerState.Value): Unit = {
@@ -102,4 +126,36 @@ private[spark] class WorkerInfo(
   }
 
   def isAlive(): Boolean = this.state == WorkerState.ALIVE
+
+  /**
+   * acquire specified amount resources for driver/executor from the worker
+   * @param resourceReqs the resources requirement from driver/executor
+   */
+  def acquireResources(resourceReqs: Seq[ResourceRequirement])
+    : Map[String, ResourceInformation] = {
+    resourceReqs.map { req =>
+      val rName = req.resourceName
+      val amount = req.amount
+      rName -> resources(rName).acquire(amount)
+    }.toMap
+  }
+
+  /**
+   * used during master recovery
+   */
+  def recoverResources(expected: Map[String, ResourceInformation]): Unit = {
+    expected.foreach { case (rName, rInfo) =>
+      resources(rName).acquire(rInfo.addresses)
+    }
+  }
+
+  /**
+   * release resources to worker from the driver/executor
+   * @param allocated the resources which allocated to driver/executor 
previously
+   */
+  private def releaseResources(allocated: Map[String, ResourceInformation]): 
Unit = {
+    allocated.foreach { case (rName, rInfo) =>
+      resources(rName).release(rInfo.addresses)
+    }
+  }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index f912ed6..c060ef9 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -25,6 +25,7 @@ import org.apache.spark.deploy.{Command, DeployMessages, 
DriverDescription}
 import org.apache.spark.deploy.ClientArguments._
 import org.apache.spark.internal.config
 import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.resource.ResourceUtils
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.util.Utils
 
@@ -174,8 +175,11 @@ private[rest] class StandaloneSubmitRequestServlet(
     val actualDriverMemory = 
driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
     val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)
     val actualSuperviseDriver = 
superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
+    val driverResourceReqs = ResourceUtils.parseResourceRequirements(conf,
+      config.SPARK_DRIVER_PREFIX)
     new DriverDescription(
-      appResource, actualDriverMemory, actualDriverCores, 
actualSuperviseDriver, command)
+      appResource, actualDriverMemory, actualDriverCores, 
actualSuperviseDriver, command,
+      driverResourceReqs)
   }
 
   /**
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 0c88119..4934722 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -28,10 +28,13 @@ import com.google.common.io.Files
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
 import org.apache.spark.deploy.DeployMessages.DriverStateChanged
+import org.apache.spark.deploy.StandaloneResourceUtils.prepareResourcesFile
 import org.apache.spark.deploy.master.DriverState
 import org.apache.spark.deploy.master.DriverState.DriverState
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{DRIVER_RESOURCES_FILE, 
SPARK_DRIVER_PREFIX}
 import org.apache.spark.internal.config.Worker.WORKER_DRIVER_TERMINATE_TIMEOUT
+import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils}
 
@@ -47,7 +50,8 @@ private[deploy] class DriverRunner(
     val driverDesc: DriverDescription,
     val worker: RpcEndpointRef,
     val workerUrl: String,
-    val securityManager: SecurityManager)
+    val securityManager: SecurityManager,
+    val resources: Map[String, ResourceInformation] = Map.empty)
   extends Logging {
 
   @volatile private var process: Option[Process] = None
@@ -171,6 +175,7 @@ private[deploy] class DriverRunner(
   private[worker] def prepareAndRunDriver(): Int = {
     val driverDir = createWorkingDirectory()
     val localJarFilename = downloadUserJar(driverDir)
+    val resourceFileOpt = prepareResourcesFile(SPARK_DRIVER_PREFIX, resources, 
driverDir)
 
     def substituteVariables(argument: String): String = argument match {
       case "{{WORKER_URL}}" => workerUrl
@@ -178,9 +183,12 @@ private[deploy] class DriverRunner(
       case other => other
     }
 
+    // config resource file for driver, which would be used to load resources 
when driver starts up
+    val javaOpts = driverDesc.command.javaOpts ++ resourceFileOpt.map(f =>
+      
Seq(s"-D${DRIVER_RESOURCES_FILE.key}=${f.getAbsolutePath}")).getOrElse(Seq.empty)
     // TODO: If we add ability to submit multiple jars they should also be 
added here
-    val builder = CommandUtils.buildProcessBuilder(driverDesc.command, 
securityManager,
-      driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
+    val builder = 
CommandUtils.buildProcessBuilder(driverDesc.command.copy(javaOpts = javaOpts),
+      securityManager, driverDesc.mem, sparkHome.getAbsolutePath, 
substituteVariables)
 
     runDriver(builder, driverDir, driverDesc.supervise)
   }
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 6f1484c..9793910 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -27,8 +27,11 @@ import com.google.common.io.Files
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import org.apache.spark.deploy.StandaloneResourceUtils.prepareResourcesFile
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.SPARK_EXECUTOR_PREFIX
 import org.apache.spark.internal.config.UI._
+import org.apache.spark.resource.{ResourceInformation, ResourceUtils}
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.util.{ShutdownHookManager, Utils}
 import org.apache.spark.util.logging.FileAppender
@@ -54,7 +57,8 @@ private[deploy] class ExecutorRunner(
     val workerUrl: String,
     conf: SparkConf,
     val appLocalDirs: Seq[String],
-    @volatile var state: ExecutorState.Value)
+    @volatile var state: ExecutorState.Value,
+    val resources: Map[String, ResourceInformation] = Map.empty)
   extends Logging {
 
   private val fullId = appId + "/" + execId
@@ -143,11 +147,14 @@ private[deploy] class ExecutorRunner(
    */
   private def fetchAndRunExecutor() {
     try {
+      val resourceFileOpt = prepareResourcesFile(SPARK_EXECUTOR_PREFIX, 
resources, executorDir)
       // Launch the process
+      val arguments = appDesc.command.arguments ++ resourceFileOpt.map(f =>
+        Seq("--resourcesFile", f.getAbsolutePath)).getOrElse(Seq.empty)
       val subsOpts = appDesc.command.javaOpts.map {
         Utils.substituteAppNExecIds(_, appId, execId.toString)
       }
-      val subsCommand = appDesc.command.copy(javaOpts = subsOpts)
+      val subsCommand = appDesc.command.copy(arguments = arguments, javaOpts = 
subsOpts)
       val builder = CommandUtils.buildProcessBuilder(subsCommand, new 
SecurityManager(conf),
         memory, sparkHome.getAbsolutePath, substituteVariables)
       val command = builder.command()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index ac7a1b9..899593d 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.deploy.worker
 
-import java.io.File
-import java.io.IOException
+import java.io.{File, IOException}
 import java.text.SimpleDateFormat
 import java.util.{Date, Locale, UUID}
 import java.util.concurrent._
@@ -34,6 +33,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.ExternalShuffleService
+import org.apache.spark.deploy.StandaloneResourceUtils._
 import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.internal.{config, Logging}
@@ -44,7 +44,7 @@ import org.apache.spark.metrics.{MetricsSystem, 
MetricsSystemInstances}
 import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.rpc._
-import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, 
Utils}
+import org.apache.spark.util.{SignalUtils, SparkUncaughtExceptionHandler, 
ThreadUtils, Utils}
 
 private[deploy] class Worker(
     override val rpcEnv: RpcEnv,
@@ -57,7 +57,8 @@ private[deploy] class Worker(
     val conf: SparkConf,
     val securityMgr: SecurityManager,
     resourceFileOpt: Option[String] = None,
-    externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
+    externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null,
+    pid: Int = Utils.getProcessId)
   extends ThreadSafeRpcEndpoint with Logging {
 
   private val host = rpcEnv.address.host
@@ -180,7 +181,7 @@ private[deploy] class Worker(
   )
 
   // visible for tests
-  private[deploy] var resources: Map[String, ResourceInformation] = _
+  private[deploy] var resources: Map[String, ResourceInformation] = Map.empty
 
   var coresUsed = 0
   var memoryUsed = 0
@@ -190,19 +191,8 @@ private[deploy] class Worker(
 
   private def createWorkDir() {
     workDir = Option(workDirPath).map(new File(_)).getOrElse(new 
File(sparkHome, "work"))
-    try {
-      // This sporadically fails - not sure why ... !workDir.exists() && 
!workDir.mkdirs()
-      // So attempting to create and then check if directory was created or 
not.
-      workDir.mkdirs()
-      if ( !workDir.exists() || !workDir.isDirectory) {
-        logError("Failed to create work directory " + workDir)
-        System.exit(1)
-      }
-      assert (workDir.isDirectory)
-    } catch {
-      case e: Exception =>
-        logError("Failed to create work directory " + workDir, e)
-        System.exit(1)
+    if (!Utils.createDirectory(workDir)) {
+      System.exit(1)
     }
   }
 
@@ -214,6 +204,7 @@ private[deploy] class Worker(
     logInfo("Spark home: " + sparkHome)
     createWorkDir()
     startExternalShuffleService()
+    releaseResourcesOnInterrupt()
     setupWorkerResources()
     webUi = new WorkerWebUI(this, workDir, webUiPort)
     webUi.bind()
@@ -227,13 +218,29 @@ private[deploy] class Worker(
     metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
   }
 
+  /**
+   * Used to catch the TERM signal from sbin/stop-slave.sh and
+   * release resources before Worker exits
+   */
+  private def releaseResourcesOnInterrupt(): Unit = {
+    SignalUtils.register("TERM") {
+      releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
+      false
+    }
+  }
+
   private def setupWorkerResources(): Unit = {
     try {
-      resources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX, 
resourceFileOpt)
+      val allResources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX, 
resourceFileOpt)
+      resources = acquireResources(conf, SPARK_WORKER_PREFIX, allResources, 
pid)
+      logResourceInfo(SPARK_WORKER_PREFIX, resources)
     } catch {
       case e: Exception =>
         logError("Failed to setup worker resources: ", e)
-        System.exit(1)
+        releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
+        if (!Utils.isTesting) {
+          System.exit(1)
+        }
     }
   }
 
@@ -349,6 +356,7 @@ private[deploy] class Worker(
               TimeUnit.SECONDS))
         }
       } else {
+        releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
         logError("All masters are unresponsive! Giving up.")
         System.exit(1)
       }
@@ -405,7 +413,8 @@ private[deploy] class Worker(
       cores,
       memory,
       workerWebUiUrl,
-      masterEndpoint.address))
+      masterEndpoint.address,
+      resources))
   }
 
   private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = 
synchronized {
@@ -446,6 +455,7 @@ private[deploy] class Worker(
       case RegisterWorkerFailed(message) =>
         if (!registered) {
           logError("Worker registration failed: " + message)
+          releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
           System.exit(1)
         }
 
@@ -506,15 +516,20 @@ private[deploy] class Worker(
       logInfo("Master has changed, new master is at " + 
masterRef.address.toSparkURL)
       changeMaster(masterRef, masterWebUiUrl, masterRef.address)
 
-      val execs = executors.values.
-        map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
-      masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, 
drivers.keys.toSeq))
+      val executorResponses = executors.values.map { e =>
+        WorkerExecutorStateResponse(new ExecutorDescription(
+          e.appId, e.execId, e.cores, e.state), e.resources)
+      }
+      val driverResponses = drivers.keys.map { id =>
+        WorkerDriverStateResponse(id, drivers(id).resources)}
+      masterRef.send(WorkerSchedulerStateResponse(
+        workerId, executorResponses.toList, driverResponses.toSeq))
 
     case ReconnectWorker(masterUrl) =>
       logInfo(s"Master with url $masterUrl requested this worker to 
reconnect.")
       registerWithMaster()
 
-    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
+    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, 
resources_) =>
       if (masterUrl != activeMasterUrl) {
         logWarning("Invalid Master (" + masterUrl + ") attempted to launch 
executor.")
       } else {
@@ -567,7 +582,8 @@ private[deploy] class Worker(
             workerUri,
             conf,
             appLocalDirs,
-            ExecutorState.LAUNCHING)
+            ExecutorState.LAUNCHING,
+            resources_)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_
@@ -601,7 +617,7 @@ private[deploy] class Worker(
         }
       }
 
-    case LaunchDriver(driverId, driverDesc) =>
+    case LaunchDriver(driverId, driverDesc, resources_) =>
       logInfo(s"Asked to launch driver $driverId")
       val driver = new DriverRunner(
         conf,
@@ -611,7 +627,8 @@ private[deploy] class Worker(
         driverDesc.copy(command = 
Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
         self,
         workerUri,
-        securityMgr)
+        securityMgr,
+        resources_)
       drivers(driverId) = driver
       driver.start()
 
@@ -701,6 +718,7 @@ private[deploy] class Worker(
   }
 
   override def onStop() {
+    releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
     cleanupThreadExecutor.shutdownNow()
     metricsSystem.report()
     cancelLastRegistrationRetry()
@@ -835,8 +853,9 @@ private[deploy] object Worker extends Logging {
     val securityMgr = new SecurityManager(conf)
     val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
     val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)
+    val pid = if (Utils.isTesting) workerNumber.get else Utils.getProcessId
     rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, 
memory,
-      masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, 
resourceFileOpt))
+      masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, 
resourceFileOpt, pid = pid))
     rpcEnv
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 98e5aa6..a42a928 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -90,11 +90,13 @@ private[spark] class CoarseGrainedExecutorBackend(
   // visible for testing
   def parseOrFindResources(resourcesFileOpt: Option[String]): Map[String, 
ResourceInformation] = {
     // only parse the resources if a task requires them
-    val resourceInfo = if (parseTaskResourceRequirements(env.conf).nonEmpty) {
+    val resourceInfo = if (parseResourceRequirements(env.conf, 
SPARK_TASK_PREFIX).nonEmpty) {
       val resources = getOrDiscoverAllResources(env.conf, 
SPARK_EXECUTOR_PREFIX, resourcesFileOpt)
       if (resources.isEmpty) {
         throw new SparkException("User specified resources per task via: " +
           s"$SPARK_TASK_PREFIX, but can't find any resources available on the 
executor.")
+      } else {
+        logResourceInfo(SPARK_EXECUTOR_PREFIX, resources)
       }
       resources
     } else {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index e014721..214675b 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -36,6 +36,23 @@ package object config {
   private[spark] val SPARK_EXECUTOR_PREFIX = "spark.executor"
   private[spark] val SPARK_TASK_PREFIX = "spark.task"
 
+  private[spark] val SPARK_RESOURCES_COORDINATE =
+    ConfigBuilder("spark.resources.coordinate.enable")
+      .doc("Whether to coordinate resources automatically among 
workers/drivers(client only) " +
+        "in Standalone. If false, the user is responsible for configuring 
different resources " +
+        "for workers/drivers that run on the same host.")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val SPARK_RESOURCES_DIR =
+    ConfigBuilder("spark.resources.dir")
+      .doc("Directory used to coordinate resources among 
workers/drivers(client only) in " +
+        "Standalone. Default is SPARK_HOME. Make sure to use the same 
directory for worker " +
+        "and drivers in client mode that run on the same host. Don't clean up 
this directory " +
+        "while workers/drivers are still alive to avoid the most likely 
resources conflict. ")
+      .stringConf
+      .createOptional
+
   private[spark] val DRIVER_RESOURCES_FILE =
     ConfigBuilder("spark.driver.resourcesFile")
       .internal()
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
similarity index 86%
copy from 
core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
copy to core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
index c75931d..719f34db 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler
+package org.apache.spark.resource
 
 import scala.collection.mutable
 
@@ -23,17 +23,12 @@ import org.apache.spark.SparkException
 import org.apache.spark.util.collection.OpenHashMap
 
 /**
- * Class to hold information about a type of Resource on an Executor. This 
information is managed
- * by SchedulerBackend, and TaskScheduler shall schedule tasks on idle 
Executors based on the
- * information.
+ * Class used to help executor/worker allocate resources
  * Please note that this class is intended to be used in a single thread.
- * @param name Resource name
- * @param addresses Resource addresses provided by the executor
+ * @param name Resource name, e.g. gpu/fpga
+ * @param addresses Resource addresses provided by the executor/worker
  */
-private[spark] class ExecutorResourceInfo(
-    val name: String,
-    addresses: Seq[String]) extends Serializable {
-
+class ResourceAllocator(name: String, addresses: Seq[String]) extends 
Serializable {
   /**
    * Map from an address to its availability, the value `true` means the 
address is available,
    * while value `false` means the address is assigned.
@@ -52,7 +47,7 @@ private[spark] class ExecutorResourceInfo(
    * Sequence of currently assigned resource addresses.
    * Exposed for testing only.
    */
-  private[scheduler] def assignedAddrs: Seq[String] = addressAvailabilityMap
+  private[spark] def assignedAddrs: Seq[String] = addressAvailabilityMap
     .flatMap { case (addr, available) =>
       if (!available) Some(addr) else None
     }.toSeq
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index 6926586..150ba09 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -27,7 +27,6 @@ import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
 import org.apache.spark.util.Utils.executeAndGetOutput
 
 /**
@@ -48,7 +47,7 @@ private[spark] case class ResourceRequest(
     discoveryScript: Option[String],
     vendor: Option[String])
 
-private[spark] case class TaskResourceRequirement(resourceName: String, 
amount: Int)
+private[spark] case class ResourceRequirement(resourceName: String, amount: 
Int)
 
 /**
  * Case class representing allocated resource addresses for a specific 
resource.
@@ -62,7 +61,6 @@ private[spark] case class ResourceAllocation(id: ResourceID, 
addresses: Seq[Stri
 }
 
 private[spark] object ResourceUtils extends Logging {
-
   // config suffixes
   val DISCOVERY_SCRIPT = "discoveryScript"
   val VENDOR = "vendor"
@@ -94,23 +92,39 @@ private[spark] object ResourceUtils extends Logging {
     }
   }
 
-  def parseTaskResourceRequirements(sparkConf: SparkConf): 
Seq[TaskResourceRequirement] = {
-    parseAllResourceRequests(sparkConf, SPARK_TASK_PREFIX).map { request =>
-      TaskResourceRequirement(request.id.resourceName, request.amount)
+  def parseResourceRequirements(sparkConf: SparkConf, componentName: String)
+    : Seq[ResourceRequirement] = {
+    parseAllResourceRequests(sparkConf, componentName).map { request =>
+      ResourceRequirement(request.id.resourceName, request.amount)
+    }
+  }
+
+  def resourcesMeetRequirements(
+      resourcesFree: Map[String, Int],
+      resourceRequirements: Seq[ResourceRequirement])
+    : Boolean = {
+    resourceRequirements.forall { req =>
+      resourcesFree.getOrElse(req.resourceName, 0) >= req.amount
     }
   }
 
-  private def parseAllocatedFromJsonFile(resourcesFile: String): 
Seq[ResourceAllocation] = {
-    implicit val formats = DefaultFormats
+  def withResourcesJson[T](resourcesFile: String)(extract: String => Seq[T]): 
Seq[T] = {
     val json = new String(Files.readAllBytes(Paths.get(resourcesFile)))
     try {
-      parse(json).extract[Seq[ResourceAllocation]]
+      extract(json)
     } catch {
       case NonFatal(e) =>
         throw new SparkException(s"Error parsing resources file 
$resourcesFile", e)
     }
   }
 
+  def parseAllocatedFromJsonFile(resourcesFile: String): 
Seq[ResourceAllocation] = {
+    withResourcesJson[ResourceAllocation](resourcesFile) { json =>
+      implicit val formats = DefaultFormats
+      parse(json).extract[Seq[ResourceAllocation]]
+    }
+  }
+
   private def parseAllocatedOrDiscoverResources(
       sparkConf: SparkConf,
       componentName: String,
@@ -154,10 +168,14 @@ private[spark] object ResourceUtils extends Logging {
     val allocations = parseAllocatedOrDiscoverResources(sparkConf, 
componentName, resourcesFileOpt)
     assertAllResourceAllocationsMeetRequests(allocations, requests)
     val resourceInfoMap = allocations.map(a => (a.id.resourceName, 
a.toResourceInformation)).toMap
+    resourceInfoMap
+  }
+
+  def logResourceInfo(componentName: String, resources: Map[String, 
ResourceInformation])
+    : Unit = {
     logInfo("==============================================================")
-    logInfo(s"Resources for 
$componentName:\n${resourceInfoMap.mkString("\n")}")
+    logInfo(s"Resources for $componentName:\n${resources.mkString("\n")}")
     logInfo("==============================================================")
-    resourceInfoMap
   }
 
   // visible for test
@@ -175,7 +193,7 @@ private[spark] object ResourceUtils extends Logging {
           "doesn't exist!")
       }
     } else {
-      throw new SparkException(s"User is expecting to use resource: 
$resourceName but " +
+      throw new SparkException(s"User is expecting to use resource: 
$resourceName, but " +
         "didn't specify a discovery script!")
     }
     if (!result.name.equals(resourceName)) {
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
index c75931d..f05281e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
@@ -17,85 +17,14 @@
 
 package org.apache.spark.scheduler
 
-import scala.collection.mutable
-
-import org.apache.spark.SparkException
-import org.apache.spark.util.collection.OpenHashMap
+import org.apache.spark.resource.ResourceAllocator
 
 /**
  * Class to hold information about a type of Resource on an Executor. This 
information is managed
  * by SchedulerBackend, and TaskScheduler shall schedule tasks on idle 
Executors based on the
  * information.
- * Please note that this class is intended to be used in a single thread.
  * @param name Resource name
  * @param addresses Resource addresses provided by the executor
  */
-private[spark] class ExecutorResourceInfo(
-    val name: String,
-    addresses: Seq[String]) extends Serializable {
-
-  /**
-   * Map from an address to its availability, the value `true` means the 
address is available,
-   * while value `false` means the address is assigned.
-   * TODO Use [[OpenHashMap]] instead to gain better performance.
-   */
-  private val addressAvailabilityMap = mutable.HashMap(addresses.map(_ -> 
true): _*)
-
-  /**
-   * Sequence of currently available resource addresses.
-   */
-  def availableAddrs: Seq[String] = addressAvailabilityMap.flatMap { case 
(addr, available) =>
-    if (available) Some(addr) else None
-  }.toSeq
-
-  /**
-   * Sequence of currently assigned resource addresses.
-   * Exposed for testing only.
-   */
-  private[scheduler] def assignedAddrs: Seq[String] = addressAvailabilityMap
-    .flatMap { case (addr, available) =>
-      if (!available) Some(addr) else None
-    }.toSeq
-
-  /**
-   * Acquire a sequence of resource addresses (to a launched task), these 
addresses must be
-   * available. When the task finishes, it will return the acquired resource 
addresses.
-   * Throw an Exception if an address is not available or doesn't exist.
-   */
-  def acquire(addrs: Seq[String]): Unit = {
-    addrs.foreach { address =>
-      if (!addressAvailabilityMap.contains(address)) {
-        throw new SparkException(s"Try to acquire an address that doesn't 
exist. $name address " +
-          s"$address doesn't exist.")
-      }
-      val isAvailable = addressAvailabilityMap(address)
-      if (isAvailable) {
-        addressAvailabilityMap(address) = false
-      } else {
-        throw new SparkException(s"Try to acquire an address that is not 
available. $name " +
-          s"address $address is not available.")
-      }
-    }
-  }
-
-  /**
-   * Release a sequence of resource addresses, these addresses must have been 
assigned. Resource
-   * addresses are released when a task has finished.
-   * Throw an Exception if an address is not assigned or doesn't exist.
-   */
-  def release(addrs: Seq[String]): Unit = {
-    addrs.foreach { address =>
-      if (!addressAvailabilityMap.contains(address)) {
-        throw new SparkException(s"Try to release an address that doesn't 
exist. $name address " +
-          s"$address doesn't exist.")
-      }
-      val isAvailable = addressAvailabilityMap(address)
-      if (!isAvailable) {
-        addressAvailabilityMap(address) = true
-      } else {
-        throw new SparkException(s"Try to release an address that is not 
assigned. $name " +
-          s"address $address is not assigned.")
-      }
-    }
-  }
-}
+private[spark] class ExecutorResourceInfo(name: String, addresses: Seq[String])
+  extends ResourceAllocator(name, addresses)
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 2e3e0a2..1496dff 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -94,7 +94,7 @@ private[spark] class TaskSchedulerImpl(
   val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK)
 
   // Resources to request per task
-  val resourcesReqsPerTask = 
ResourceUtils.parseTaskResourceRequirements(sc.conf)
+  val resourcesReqsPerTask = ResourceUtils.parseResourceRequirements(sc.conf, 
SPARK_TASK_PREFIX)
 
   // TaskSetManagers are not thread safe, so any access to one should be 
synchronized
   // on this class.  Protected by `this`
@@ -383,9 +383,8 @@ private[spark] class TaskSchedulerImpl(
    * Check whether the resources from the WorkerOffer are enough to run at 
least one task.
    */
   private def resourcesMeetTaskRequirements(resources: Map[String, 
Buffer[String]]): Boolean = {
-    resourcesReqsPerTask.forall { req =>
-      resources.contains(req.resourceName) && resources(req.resourceName).size 
>= req.amount
-    }
+    val resourcesFree = resources.map(r => r._1 -> r._2.length)
+    ResourceUtils.resourcesMeetRequirements(resourcesFree, 
resourcesReqsPerTask)
   }
 
   /**
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index e0605fe..2025a7d 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -28,6 +28,7 @@ import org.apache.spark.deploy.client.{StandaloneAppClient, 
StandaloneAppClientL
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
+import org.apache.spark.resource.ResourceUtils
 import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
@@ -112,8 +113,11 @@ private[spark] class StandaloneSchedulerBackend(
       } else {
         None
       }
+    val executorResourceReqs = ResourceUtils.parseResourceRequirements(conf,
+      config.SPARK_EXECUTOR_PREFIX)
     val appDesc = ApplicationDescription(sc.appName, maxCores, 
sc.executorMemory, command,
-      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, 
initialExecutorLimit)
+      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, 
initialExecutorLimit,
+      resourceReqsPerExecutor = executorResourceReqs)
     client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, 
conf)
     client.start()
     launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3ad67f4..9c1f21f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -270,6 +270,26 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Create a directory given the abstract pathname
+   * @return true, if the directory is successfully created; otherwise, return 
false.
+   */
+  def createDirectory(dir: File): Boolean = {
+    try {
+      // This sporadically fails - not sure why ... !dir.exists() && 
!dir.mkdirs()
+      // So attempting to create and then check if directory was created or 
not.
+      dir.mkdirs()
+      if ( !dir.exists() || !dir.isDirectory) {
+        logError(s"Failed to create directory " + dir)
+      }
+      dir.isDirectory
+    } catch {
+      case e: Exception =>
+        logError(s"Failed to create directory " + dir, e)
+        false
+    }
+  }
+
+  /**
    * Create a directory inside the given parent directory. The directory is 
guaranteed to be
    * newly created, and is not marked for automatic deletion.
    */
@@ -2555,6 +2575,28 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Given a process id, return true if the process is still running.
+   */
+  def isProcessRunning(pid: Int): Boolean = {
+    val process = executeCommand(Seq("kill", "-0", pid.toString))
+    process.waitFor(10, TimeUnit.SECONDS)
+    process.exitValue() == 0
+  }
+
+  /**
+   * Returns the pid of this JVM process.
+   */
+  def getProcessId: Int = {
+    val PROCESS = "(\\d+)@(.*)".r
+    val name = getProcessName()
+    name match {
+      case PROCESS(pid, _) => pid.toInt
+      case _ =>
+        throw new SparkException(s"Unexpected process name: $name, expected to 
be PID@hostname.")
+    }
+  }
+
+  /**
    * Returns the name of this JVM process. This is OS dependent but typically 
(OSX, Linux, Windows),
    * this is formatted as PID@hostname.
    */
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 202b85d..9f00131 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -440,7 +440,8 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
     conf.set(TASK_GPU_ID.amountConf, "2")
     conf.set(TASK_FPGA_ID.amountConf, "1")
     var taskResourceRequirement =
-      parseTaskResourceRequirements(conf).map(req => (req.resourceName, 
req.amount)).toMap
+      parseResourceRequirements(conf, SPARK_TASK_PREFIX)
+        .map(req => (req.resourceName, req.amount)).toMap
 
     assert(taskResourceRequirement.size == 2)
     assert(taskResourceRequirement(GPU) == 2)
@@ -450,7 +451,8 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
     // Ignore invalid prefix
     conf.set(ResourceID("spark.invalid.prefix", FPGA).amountConf, "1")
     taskResourceRequirement =
-      parseTaskResourceRequirements(conf).map(req => (req.resourceName, 
req.amount)).toMap
+      parseResourceRequirements(conf, SPARK_TASK_PREFIX)
+        .map(req => (req.resourceName, req.amount)).toMap
     assert(taskResourceRequirement.size == 1)
     assert(taskResourceRequirement.get(FPGA).isEmpty)
   }
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index fed3ae3..c1402bd 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -756,7 +756,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
       val conf = new SparkConf()
         .setMaster("local-cluster[1, 1, 1024]")
         .setAppName("test-cluster")
-      conf.set(DRIVER_GPU_ID.amountConf, "1")
+      conf.set(DRIVER_GPU_ID.amountConf, "2")
       conf.set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath)
       sc = new SparkContext(conf)
 
@@ -783,7 +783,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
         .set(DRIVER_RESOURCES_FILE, resourcesFile)
         .setMaster("local-cluster[1, 1, 1024]")
         .setAppName("test-cluster")
-      conf.set(DRIVER_GPU_ID.amountConf, "1")
+      conf.set(DRIVER_GPU_ID.amountConf, "3")
       conf.set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath)
 
       sc = new SparkContext(conf)
@@ -850,26 +850,27 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
     assume(!(Utils.isWindows))
     withTempDir { dir =>
       val discoveryScript = createTempScriptWithExpectedOutput(dir, 
"resourceDiscoveryScript",
-        """{"name": "gpu","addresses":["0", "1", "2"]}""")
+        """{"name": "gpu","addresses":["0", "1", "2", "3", "4", "5", "6", "7", 
"8"]}""")
 
       val conf = new SparkConf()
         .setMaster("local-cluster[3, 3, 1024]")
         .setAppName("test-cluster")
-      conf.set(TASK_GPU_ID.amountConf, "1")
+      conf.set(WORKER_GPU_ID.amountConf, "3")
+      conf.set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
+      conf.set(TASK_GPU_ID.amountConf, "3")
       conf.set(EXECUTOR_GPU_ID.amountConf, "3")
-      conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, discoveryScript)
 
       sc = new SparkContext(conf)
 
       // Ensure all executors has started
       TestUtils.waitUntilExecutorsUp(sc, 3, 60000)
 
-      val rdd = sc.makeRDD(1 to 10, 9).mapPartitions { it =>
+      val rdd = sc.makeRDD(1 to 10, 3).mapPartitions { it =>
         val context = TaskContext.get()
         context.resources().get(GPU).get.addresses.iterator
       }
       val gpus = rdd.collect()
-      assert(gpus.sorted === Seq("0", "0", "0", "1", "1", "1", "2", "2", "2"))
+      assert(gpus.sorted === Seq("0", "1", "2", "3", "4", "5", "6", "7", "8"))
 
       eventually(timeout(10.seconds)) {
         assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum 
== 0)
diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala 
b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
index 784981e..a2c4669 100644
--- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.deploy
 
 import java.io.File
-import java.util.Date
 
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
@@ -50,7 +49,8 @@ private[deploy] object DeployTestUtils {
     createDriverDesc(), JsonConstants.submitDate)
 
   def createWorkerInfo(): WorkerInfo = {
-    val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 
"http://publicAddress:80";)
+    val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null,
+      "http://publicAddress:80";, Map.empty)
     workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
     workerInfo
   }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index f19e998..9ce046a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -42,6 +42,8 @@ import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Deploy._
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.internal.config.Worker._
+import org.apache.spark.resource.{ResourceInformation, ResourceRequirement}
+import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
 import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
 import org.apache.spark.serializer
 
@@ -68,17 +70,23 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = 
new SparkConf) extend
     })
   }
 
-  val appDesc = DeployTestUtils.createAppDesc()
+  var appDesc = DeployTestUtils.createAppDesc()
   val drivers = mutable.HashSet[String]()
+  val driverResources = new mutable.HashMap[String, Map[String, Set[String]]]
+  val execResources = new mutable.HashMap[String, Map[String, Set[String]]]
   override def receive: PartialFunction[Any, Unit] = {
     case RegisteredWorker(masterRef, _, _, _) =>
       masterRef.send(WorkerLatestState(id, Nil, drivers.toSeq))
-    case LaunchDriver(driverId, desc) =>
+    case LaunchExecutor(_, appId, execId, _, _, _, resources_) =>
+      execResources(appId + "/" + execId) = resources_.map(r => (r._1, 
r._2.addresses.toSet))
+    case LaunchDriver(driverId, desc, resources_) =>
       drivers += driverId
+      driverResources(driverId) = resources_.map(r => (r._1, 
r._2.addresses.toSet))
       master.send(RegisterApplication(appDesc, newDriver(driverId)))
     case KillDriver(driverId) =>
       master.send(DriverStateChanged(driverId, DriverState.KILLED, None))
       drivers -= driverId
+      driverResources.remove(driverId)
       driverIdToAppId.get(driverId) match {
         case Some(appId) =>
           apps.remove(appId)
@@ -93,7 +101,7 @@ class MockExecutorLaunchFailWorker(master: RpcEndpointRef, 
conf: SparkConf = new
   extends MockWorker(master, conf) {
   var failedCnt = 0
   override def receive: PartialFunction[Any, Unit] = {
-    case LaunchExecutor(_, appId, execId, _, _, _) =>
+    case LaunchExecutor(_, appId, execId, _, _, _, _) =>
       failedCnt += 1
       master.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, 
None, None))
     case otherMsg => super.receive(otherMsg)
@@ -167,7 +175,8 @@ class MasterSuite extends SparkFunSuite
       cores = 0,
       memory = 0,
       endpoint = null,
-      webUiAddress = "http://localhost:80";
+      webUiAddress = "http://localhost:80";,
+      Map.empty
     )
 
     val (rpcEnv, _, _) =
@@ -248,9 +257,12 @@ class MasterSuite extends SparkFunSuite
         // Application state should be WAITING when "MasterChangeAcknowledged" 
event executed.
         fakeAppInfo.state should be(ApplicationState.WAITING)
       }
-
-      master.self.send(
-        WorkerSchedulerStateResponse(fakeWorkerInfo.id, fakeExecutors, 
Seq(fakeDriverInfo.id)))
+      val execResponse = fakeExecutors.map(exec =>
+        WorkerExecutorStateResponse(exec, Map.empty[String, 
ResourceInformation]))
+      val driverResponse = WorkerDriverStateResponse(
+        fakeDriverInfo.id, Map.empty[String, ResourceInformation])
+      master.self.send(WorkerSchedulerStateResponse(
+        fakeWorkerInfo.id, execResponse, Seq(driverResponse)))
 
       eventually(timeout(5.seconds), interval(100.milliseconds)) {
         getState(master) should be(RecoveryState.ALIVE)
@@ -545,6 +557,16 @@ class MasterSuite extends SparkFunSuite
     _master
   }
 
+  def makeAliveMaster(conf: SparkConf = new SparkConf): Master = {
+    val master = makeMaster(conf)
+    master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+    eventually(timeout(10.seconds)) {
+      val masterState = 
master.self.askSync[MasterStateResponse](RequestMasterState)
+      assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
+    }
+    master
+  }
+
   private def makeAppInfo(
       memoryPerExecutorMb: Int,
       coresPerExecutor: Option[Int] = None,
@@ -563,7 +585,8 @@ class MasterSuite extends SparkFunSuite
     val endpointRef = mock(classOf[RpcEndpointRef])
     val mockAddress = mock(classOf[RpcAddress])
     when(endpointRef.address).thenReturn(mockAddress)
-    new WorkerInfo(workerId, "host", 100, cores, memoryMb, endpointRef, 
"http://localhost:80";)
+    new WorkerInfo(workerId, "host", 100, cores, memoryMb,
+      endpointRef, "http://localhost:80";, Map.empty)
   }
 
   private def scheduleExecutorsOnWorkers(
@@ -575,13 +598,7 @@ class MasterSuite extends SparkFunSuite
   }
 
   test("SPARK-13604: Master should ask Worker kill unknown executors and 
drivers") {
-    val master = makeMaster()
-    master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
-    eventually(timeout(10.seconds)) {
-      val masterState = 
master.self.askSync[MasterStateResponse](RequestMasterState)
-      assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
-    }
-
+    val master = makeAliveMaster()
     val killedExecutors = new ConcurrentLinkedQueue[(String, Int)]()
     val killedDrivers = new ConcurrentLinkedQueue[String]()
     val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint {
@@ -614,13 +631,7 @@ class MasterSuite extends SparkFunSuite
   }
 
   test("SPARK-20529: Master should reply the address received from worker") {
-    val master = makeMaster()
-    master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
-    eventually(timeout(10.seconds)) {
-      val masterState = 
master.self.askSync[MasterStateResponse](RequestMasterState)
-      assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
-    }
-
+    val master = makeAliveMaster()
     @volatile var receivedMasterAddress: RpcAddress = null
     val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint {
       override val rpcEnv: RpcEnv = master.rpcEnv
@@ -647,13 +658,7 @@ class MasterSuite extends SparkFunSuite
   }
 
   test("SPARK-27510: Master should avoid dead loop while launching executor 
failed in Worker") {
-    val master = makeMaster()
-    master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
-    eventually(timeout(10.seconds)) {
-      val masterState = 
master.self.askSync[MasterStateResponse](RequestMasterState)
-      assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
-    }
-
+    val master = makeAliveMaster()
     var worker: MockExecutorLaunchFailWorker = null
     try {
       worker = new MockExecutorLaunchFailWorker(master.self)
@@ -697,12 +702,7 @@ class MasterSuite extends SparkFunSuite
 
   test("SPARK-19900: there should be a corresponding driver for the app after 
relaunching driver") {
     val conf = new SparkConf().set(WORKER_TIMEOUT, 1L)
-    val master = makeMaster(conf)
-    master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
-    eventually(timeout(10.seconds)) {
-      val masterState = 
master.self.askSync[MasterStateResponse](RequestMasterState)
-      assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
-    }
+    val master = makeAliveMaster(conf)
     var worker1: MockWorker = null
     var worker2: MockWorker = null
     try {
@@ -770,6 +770,95 @@ class MasterSuite extends SparkFunSuite
     }
   }
 
+  test("assign/recycle resources to/from driver") {
+    val master = makeAliveMaster()
+    val masterRef = master.self
+    val resourceReqs = Seq(ResourceRequirement(GPU, 3), 
ResourceRequirement(FPGA, 3))
+    val driver = DeployTestUtils.createDriverDesc().copy(resourceReqs = 
resourceReqs)
+    val driverId = masterRef.askSync[SubmitDriverResponse](
+      RequestSubmitDriver(driver)).driverId.get
+    var status = 
masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
+    assert(status.state === Some(DriverState.SUBMITTED))
+    val worker = new MockWorker(masterRef)
+    worker.rpcEnv.setupEndpoint(s"worker", worker)
+    val resources = Map(GPU -> new ResourceInformation(GPU, Array("0", "1", 
"2")),
+      FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3")))
+    val regMsg = RegisterWorker(worker.id, "localhost", 7077, worker.self, 10, 
1024,
+      "http://localhost:8080";, RpcAddress("localhost", 10000), resources)
+    masterRef.send(regMsg)
+    eventually(timeout(10.seconds)) {
+      status = 
masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
+      assert(status.state === Some(DriverState.RUNNING))
+      assert(worker.drivers.head === driverId)
+      assert(worker.driverResources(driverId) === Map(GPU -> Set("0", "1", 
"2"),
+        FPGA -> Set("f1", "f2", "f3")))
+      val workerResources = master.workers.head.resources
+      assert(workerResources(GPU).availableAddrs.length === 0)
+      assert(workerResources(GPU).assignedAddrs.toSet === Set("0", "1", "2"))
+      assert(workerResources(FPGA).availableAddrs.length === 0)
+      assert(workerResources(FPGA).assignedAddrs.toSet === Set("f1", "f2", 
"f3"))
+    }
+    val driverFinished = DriverStateChanged(driverId, DriverState.FINISHED, 
None)
+    masterRef.send(driverFinished)
+    eventually(timeout(10.seconds)) {
+      val workerResources = master.workers.head.resources
+      assert(workerResources(GPU).availableAddrs.length === 3)
+      assert(workerResources(GPU).assignedAddrs.toSet === Set())
+      assert(workerResources(FPGA).availableAddrs.length === 3)
+      assert(workerResources(FPGA).assignedAddrs.toSet === Set())
+    }
+  }
+
+  test("assign/recycle resources to/from executor") {
+
+    def makeWorkerAndRegister(
+        master: RpcEndpointRef,
+        workerResourceReqs: Map[String, Int] = Map.empty)
+    : MockWorker = {
+      val worker = new MockWorker(master)
+      worker.rpcEnv.setupEndpoint(s"worker", worker)
+      val resources = workerResourceReqs.map { case (rName, amount) =>
+        val shortName = rName.charAt(0)
+        val addresses = (0 until amount).map(i => s"$shortName$i").toArray
+        rName -> new ResourceInformation(rName, addresses)
+      }
+      val reg = RegisterWorker(worker.id, "localhost", 8077, worker.self, 10, 
2048,
+        "http://localhost:8080";, RpcAddress("localhost", 10000), resources)
+      master.send(reg)
+      worker
+    }
+
+    val master = makeAliveMaster()
+    val masterRef = master.self
+    val resourceReqs = Seq(ResourceRequirement(GPU, 3), 
ResourceRequirement(FPGA, 3))
+    val worker = makeWorkerAndRegister(masterRef, Map(GPU -> 6, FPGA -> 6))
+    worker.appDesc = worker.appDesc.copy(resourceReqsPerExecutor = 
resourceReqs)
+    val driver = DeployTestUtils.createDriverDesc().copy(resourceReqs = 
resourceReqs)
+    val driverId = 
masterRef.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)).driverId
+    val status = 
masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId.get))
+    assert(status.state === Some(DriverState.RUNNING))
+    val workerResources = master.workers.head.resources
+    eventually(timeout(10.seconds)) {
+      assert(workerResources(GPU).availableAddrs.length === 0)
+      assert(workerResources(FPGA).availableAddrs.length === 0)
+      assert(worker.driverResources.size === 1)
+      assert(worker.execResources.size === 1)
+      val driverResources = worker.driverResources.head._2
+      val execResources = worker.execResources.head._2
+      val gpuAddrs = driverResources(GPU).union(execResources(GPU))
+      val fpgaAddrs = driverResources(FPGA).union(execResources(FPGA))
+      assert(gpuAddrs === Set("g0", "g1", "g2", "g3", "g4", "g5"))
+      assert(fpgaAddrs === Set("f0", "f1", "f2", "f3", "f4", "f5"))
+    }
+    val appId = worker.apps.head._1
+    masterRef.send(UnregisterApplication(appId))
+    masterRef.send(DriverStateChanged(driverId.get, DriverState.FINISHED, 
None))
+    eventually(timeout(10.seconds)) {
+      assert(workerResources(GPU).availableAddrs.length === 6)
+      assert(workerResources(FPGA).availableAddrs.length === 6)
+    }
+  }
+
   private def getDrivers(master: Master): HashSet[DriverInfo] = {
     master.invokePrivate(_drivers())
   }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index 3d8a46b..3960762 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -86,7 +86,8 @@ class PersistenceEngineSuite extends SparkFunSuite {
           cores = 0,
           memory = 0,
           endpoint = workerEndpoint,
-          webUiAddress = "http://localhost:80";)
+          webUiAddress = "http://localhost:80";,
+          Map.empty)
 
         persistenceEngine.addWorker(workerToPersist)
 
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index 37e5fbc..bb541b4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -23,6 +23,7 @@ import java.util.function.Supplier
 
 import scala.concurrent.duration._
 
+import org.json4s.{DefaultFormats, Extraction}
 import org.mockito.{Mock, MockitoAnnotations}
 import org.mockito.Answers.RETURNS_SMART_NULLS
 import org.mockito.ArgumentMatchers.any
@@ -32,11 +33,16 @@ import org.scalatest.{BeforeAndAfter, Matchers}
 import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.TestUtils.{createTempJsonFile, 
createTempScriptWithExpectedOutput}
 import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
 import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, 
ExecutorStateChanged, WorkDirCleanup}
+import 
org.apache.spark.deploy.StandaloneResourceUtils.{ALLOCATED_RESOURCES_FILE, 
SPARK_RESOURCES_COORDINATE_DIR}
 import org.apache.spark.deploy.master.DriverState
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Worker._
+import org.apache.spark.resource.{ResourceAllocation, ResourceInformation}
+import org.apache.spark.resource.ResourceUtils._
+import org.apache.spark.resource.TestResourceIDs.{WORKER_FPGA_ID, 
WORKER_GPU_ID}
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
 import org.apache.spark.util.Utils
 
@@ -51,17 +57,36 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
   }
   def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = 
false).setAll(opts)
 
+  implicit val formats = DefaultFormats
+
   private var _worker: Worker = _
 
   private def makeWorker(
-      conf: SparkConf,
-      shuffleServiceSupplier: Supplier[ExternalShuffleService] = null): Worker 
= {
+      conf: SparkConf = new SparkConf(),
+      shuffleServiceSupplier: Supplier[ExternalShuffleService] = null,
+      pid: Int = Utils.getProcessId,
+      local: Boolean = false): Worker = {
     assert(_worker === null, "Some Worker's RpcEnv is leaked in tests")
     val securityMgr = new SecurityManager(conf)
     val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr)
-    _worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, 
Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, securityMgr, None, shuffleServiceSupplier)
-    _worker
+    val resourcesFile = conf.get(SPARK_WORKER_RESOURCE_FILE)
+    val localWorker = new Worker(rpcEnv, 50000, 20, 1234 * 5,
+      Array.fill(1)(RpcAddress("1.2.3.4", 1234)), "Worker", "/tmp",
+      conf, securityMgr, resourcesFile, shuffleServiceSupplier, pid)
+    if (local) {
+      localWorker
+    } else {
+      _worker = localWorker
+      _worker
+    }
+  }
+
+  private def assertResourcesFileDeleted(): Unit = {
+    assert(sys.props.contains("spark.test.home"))
+    val sparkHome = sys.props.get("spark.test.home")
+    val resourceFile = new File(sparkHome + "/" + 
SPARK_RESOURCES_COORDINATE_DIR,
+      ALLOCATED_RESOURCES_FILE)
+    assert(!resourceFile.exists())
   }
 
   before {
@@ -218,6 +243,141 @@ class WorkerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfter {
     }
   }
 
+  test("worker could be launched without any resources") {
+    val worker = makeWorker()
+    worker.rpcEnv.setupEndpoint("worker", worker)
+    eventually(timeout(10.seconds)) {
+      assert(worker.resources === Map.empty)
+      worker.rpcEnv.shutdown()
+      worker.rpcEnv.awaitTermination()
+    }
+    assertResourcesFileDeleted()
+  }
+
+  test("worker could load resources from resources file while launching") {
+    val conf = new SparkConf()
+    withTempDir { dir =>
+      val gpuArgs = ResourceAllocation(WORKER_GPU_ID, Seq("0", "1"))
+      val fpgaArgs =
+        ResourceAllocation(WORKER_FPGA_ID, Seq("f1", "f2", "f3"))
+      val ja = Extraction.decompose(Seq(gpuArgs, fpgaArgs))
+      val f1 = createTempJsonFile(dir, "resources", ja)
+      conf.set(SPARK_WORKER_RESOURCE_FILE.key, f1)
+      conf.set(WORKER_GPU_ID.amountConf, "2")
+      conf.set(WORKER_FPGA_ID.amountConf, "3")
+      val worker = makeWorker(conf)
+      worker.rpcEnv.setupEndpoint("worker", worker)
+      eventually(timeout(10.seconds)) {
+        assert(worker.resources === Map(GPU -> gpuArgs.toResourceInformation,
+          FPGA -> fpgaArgs.toResourceInformation))
+        worker.rpcEnv.shutdown()
+        worker.rpcEnv.awaitTermination()
+      }
+      assertResourcesFileDeleted()
+    }
+  }
+
+  test("worker could load resources from discovery script while launching") {
+    val conf = new SparkConf()
+    withTempDir { dir =>
+      val scriptPath = createTempScriptWithExpectedOutput(dir, 
"fpgaDiscoverScript",
+        """{"name": "fpga","addresses":["f1", "f2", "f3"]}""")
+      conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath)
+      conf.set(WORKER_FPGA_ID.amountConf, "3")
+      val worker = makeWorker(conf)
+      worker.rpcEnv.setupEndpoint("worker", worker)
+      eventually(timeout(10.seconds)) {
+        assert(worker.resources === Map(FPGA ->
+          new ResourceInformation(FPGA, Array("f1", "f2", "f3"))))
+        worker.rpcEnv.shutdown()
+        worker.rpcEnv.awaitTermination()
+      }
+      assertResourcesFileDeleted()
+    }
+  }
+
+  test("worker could load resources from resources file and discovery script 
while launching") {
+    val conf = new SparkConf()
+    withTempDir { dir =>
+      val gpuArgs = ResourceAllocation(WORKER_GPU_ID, Seq("0", "1"))
+      val ja = Extraction.decompose(Seq(gpuArgs))
+      val resourcesPath = createTempJsonFile(dir, "resources", ja)
+      val scriptPath = createTempScriptWithExpectedOutput(dir, 
"fpgaDiscoverScript",
+        """{"name": "fpga","addresses":["f1", "f2", "f3"]}""")
+      conf.set(SPARK_WORKER_RESOURCE_FILE.key, resourcesPath)
+      conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath)
+      conf.set(WORKER_FPGA_ID.amountConf, "3")
+      conf.set(WORKER_GPU_ID.amountConf, "2")
+      val worker = makeWorker(conf)
+      worker.rpcEnv.setupEndpoint("worker", worker)
+      eventually(timeout(10.seconds)) {
+        assert(worker.resources === Map(GPU -> gpuArgs.toResourceInformation,
+          FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3"))))
+        worker.rpcEnv.shutdown()
+        worker.rpcEnv.awaitTermination()
+      }
+      assertResourcesFileDeleted()
+    }
+  }
+
+  test("Workers run on the same host should avoid resources conflict when 
coordinate is on") {
+    val conf = new SparkConf()
+    withTempDir { dir =>
+      val scriptPath = createTempScriptWithExpectedOutput(dir, 
"fpgaDiscoverScript",
+        """{"name": "fpga","addresses":["f1", "f2", "f3", "f4", "f5"]}""")
+      conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath)
+      conf.set(WORKER_FPGA_ID.amountConf, "2")
+      val workers = (0 until 3).map(id => makeWorker(conf, pid = id, local = 
true))
+      workers.zipWithIndex.foreach{case (w, i) => 
w.rpcEnv.setupEndpoint(s"worker$i", w)}
+      eventually(timeout(20.seconds)) {
+        val (empty, nonEmpty) = workers.partition(_.resources.isEmpty)
+        assert(empty.length === 1)
+        assert(nonEmpty.length === 2)
+        val totalResources = 
nonEmpty.flatMap(_.resources(FPGA).addresses).toSet.toSeq.sorted
+        assert(totalResources === Seq("f1", "f2", "f3", "f4"))
+        workers.foreach(_.rpcEnv.shutdown())
+        workers.foreach(_.rpcEnv.awaitTermination())
+      }
+      assertResourcesFileDeleted()
+    }
+  }
+
+  test("Workers run on the same host should load resources naively when 
coordinate is off") {
+    val conf = new SparkConf()
+    // disable coordination
+    conf.set(config.SPARK_RESOURCES_COORDINATE, false)
+    withTempDir { dir =>
+      val gpuArgs = ResourceAllocation(WORKER_GPU_ID, Seq("g0", "g1"))
+      val ja = Extraction.decompose(Seq(gpuArgs))
+      val resourcesPath = createTempJsonFile(dir, "resources", ja)
+      val scriptPath = createTempScriptWithExpectedOutput(dir, 
"fpgaDiscoverScript",
+        """{"name": "fpga","addresses":["f1", "f2", "f3", "f4", "f5"]}""")
+      conf.set(SPARK_WORKER_RESOURCE_FILE.key, resourcesPath)
+      conf.set(WORKER_GPU_ID.amountConf, "2")
+      conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath)
+      conf.set(WORKER_FPGA_ID.amountConf, "2")
+      val workers = (0 until 3).map(id => makeWorker(conf, pid = id, local = 
true))
+      workers.zipWithIndex.foreach{case (w, i) => 
w.rpcEnv.setupEndpoint(s"worker$i", w)}
+      eventually(timeout(20.seconds)) {
+        val (empty, nonEmpty) = workers.partition(_.resources.isEmpty)
+        assert(empty.length === 0)
+        assert(nonEmpty.length === 3)
+        // Each Worker should get the same resources from resources file and 
discovery script
+        // without coordination. Note that, normally, we must config different 
resources
+        // for workers run on the same host when coordinate config is off. 
Test here is used
+        // to validate the different behaviour comparing to the above test 
when coordinate config
+        // is on, so we admit the resources collision here.
+        nonEmpty.foreach { worker =>
+          assert(worker.resources === Map(GPU -> gpuArgs.toResourceInformation,
+            FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3", 
"f4", "f5"))))
+        }
+        workers.foreach(_.rpcEnv.shutdown())
+        workers.foreach(_.rpcEnv.awaitTermination())
+      }
+      assertResourcesFileDeleted()
+    }
+  }
+
   test("cleanup non-shuffle files after executor exits when config " +
       "spark.storage.cleanupFilesAfterExecutorExit=true") {
     testCleanupFilesWithConfig(true)
diff --git 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index 693b0ee..64d99a5 100644
--- 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -157,7 +157,7 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
         val parsedResources = backend.parseOrFindResources(Some(f1))
       }.getMessage()
 
-      assert(error.contains("User is expecting to use resource: gpu but didn't 
specify a " +
+      assert(error.contains("User is expecting to use resource: gpu, but 
didn't specify a " +
         "discovery script!"))
     }
   }
diff --git 
a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
index 51a92e0..c2ecc96 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
@@ -253,7 +253,7 @@ class ResourceUtilsSuite extends SparkFunSuite
       discoverResource(request)
     }.getMessage()
 
-    assert(error.contains("User is expecting to use resource: gpu but " +
+    assert(error.contains("User is expecting to use resource: gpu, but " +
       "didn't specify a discovery script!"))
   }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala 
b/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala
index 6d2c07d..c4509e9 100644
--- a/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala
+++ b/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala
@@ -18,14 +18,18 @@
 package org.apache.spark.resource
 
 import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, 
SPARK_EXECUTOR_PREFIX, SPARK_TASK_PREFIX}
+import org.apache.spark.internal.config.Worker.SPARK_WORKER_PREFIX
 import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
 
 object TestResourceIDs {
   val DRIVER_GPU_ID = ResourceID(SPARK_DRIVER_PREFIX, GPU)
   val EXECUTOR_GPU_ID = ResourceID(SPARK_EXECUTOR_PREFIX, GPU)
   val TASK_GPU_ID = ResourceID(SPARK_TASK_PREFIX, GPU)
+  val WORKER_GPU_ID = ResourceID(SPARK_WORKER_PREFIX, GPU)
 
   val DRIVER_FPGA_ID = ResourceID(SPARK_DRIVER_PREFIX, FPGA)
   val EXECUTOR_FPGA_ID = ResourceID(SPARK_EXECUTOR_PREFIX, FPGA)
   val TASK_FPGA_ID = ResourceID(SPARK_TASK_PREFIX, FPGA)
+  val WORKER_FPGA_ID = ResourceID(SPARK_WORKER_PREFIX, FPGA)
+
 }
diff --git a/docs/configuration.md b/docs/configuration.md
index 57a5321..8454547 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -194,6 +194,25 @@ of the most common options to set are:
   </td>
 </tr>
 <tr>
+ <td><code>spark.resources.coordinate.enable</code></td>
+  <td>true</td>
+  <td>
+    Whether to coordinate resources automatically among workers/drivers(client 
only) 
+    in Standalone. If false, the user is responsible for configuring different 
resources
+    for workers/drivers that run on the same host.
+  </td>
+</tr>
+<tr>
+ <td><code>spark.resources.dir</code></td>
+  <td>SPARK_HOME</td>
+  <td>
+    Directory used to coordinate resources among workers/drivers(client only) 
in Standalone.
+    Default is SPARK_HOME. Make sure to use the same directory for worker and 
drivers in
+    client mode that run on the same host. Don't clean up this directory while 
workers/drivers
+    are still alive to avoid the most likely resources conflict. 
+  </td>
+</tr>
+<tr>
  <td><code>spark.driver.resource.{resourceName}.amount</code></td>
   <td>0</td>
   <td>
@@ -209,7 +228,9 @@ of the most common options to set are:
   <td>
     A script for the driver to run to discover a particular resource type. 
This should
     write to STDOUT a JSON string in the format of the ResourceInformation 
class. This has a
-    name and an array of addresses.
+    name and an array of addresses. For a client-submitted driver in 
Standalone, discovery
+    script must assign different resource addresses to this driver comparing 
to workers' and
+    other dirvers' when <code>spark.resources.coordinate.enable</code> is off.
   </td>
 </tr>
 <tr>
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 2ca3ee6..bc77469 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -243,6 +243,37 @@ SPARK_MASTER_OPTS supports the following system properties:
     receives no heartbeats.
   </td>
 </tr>
+<tr>
+  <td><code>spark.worker.resource.{resourceName}.amount</code></td>
+  <td>(none)</td>
+  <td>
+    Amount of a particular resource to use on the worker.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.worker.resource.{resourceName}.discoveryScript</code></td>
+  <td>(none)</td>
+  <td>
+    Path to resource discovery script, which is used to find a particular 
resource while worker starting up.
+    And the output of the script should be formatted like the 
<code>ResourceInformation</code> class.
+    When <code>spark.resources.coordinate.enable</code> is off, the discovery 
script must assign different
+    resources for workers and drivers in client mode that run on the same host 
to avoid resource conflict.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.worker.resourcesFile</code></td>
+  <td>(none)</td>
+  <td>
+    Path to resources file which is used to find various resources while 
worker starting up.
+    The content of resources file should be formatted like <code>
+    [[{"id":{"componentName": 
"spark.worker","resourceName":"gpu"},"addresses":["0","1","2"]}]]</code>.
+    When <code>spark.resources.coordinate.enable</code> is off, resources file 
must assign different
+    resources for workers and drivers in client mode that run on the same host 
to avoid resource conflict.
+    If a particular resource is not found in the resources file, the discovery 
script would be used to
+    find that resource. If the discovery script also does not find the 
resources, the worker will fail
+    to start up.
+  </td>
+</tr>
 </table>
 
 SPARK_WORKER_OPTS supports the following system properties:
diff --git a/python/pyspark/tests/test_context.py 
b/python/pyspark/tests/test_context.py
index bcd5d06..3f3150b 100644
--- a/python/pyspark/tests/test_context.py
+++ b/python/pyspark/tests/test_context.py
@@ -273,7 +273,8 @@ class ContextTestsWithResources(unittest.TestCase):
         self.tempFile.close()
         os.chmod(self.tempFile.name, stat.S_IRWXU | stat.S_IXGRP | 
stat.S_IRGRP |
                  stat.S_IROTH | stat.S_IXOTH)
-        conf = SparkConf().set("spark.driver.resource.gpu.amount", "1")
+        conf = SparkConf().set("spark.test.home", SPARK_HOME)
+        conf = conf.set("spark.driver.resource.gpu.amount", "1")
         conf = conf.set("spark.driver.resource.gpu.discoveryScript", 
self.tempFile.name)
         self.sc = SparkContext('local-cluster[2,1,1024]', class_name, 
conf=conf)
 
diff --git a/python/pyspark/tests/test_taskcontext.py 
b/python/pyspark/tests/test_taskcontext.py
index 66357b6..66c5f9f 100644
--- a/python/pyspark/tests/test_taskcontext.py
+++ b/python/pyspark/tests/test_taskcontext.py
@@ -23,7 +23,7 @@ import time
 import unittest
 
 from pyspark import SparkConf, SparkContext, TaskContext, BarrierTaskContext
-from pyspark.testing.utils import PySparkTestCase
+from pyspark.testing.utils import PySparkTestCase, SPARK_HOME
 
 
 class TaskContextTests(PySparkTestCase):
@@ -194,9 +194,11 @@ class TaskContextTestsWithResources(unittest.TestCase):
         self.tempFile.close()
         os.chmod(self.tempFile.name, stat.S_IRWXU | stat.S_IXGRP | 
stat.S_IRGRP |
                  stat.S_IROTH | stat.S_IXOTH)
-        conf = SparkConf().set("spark.task.resource.gpu.amount", "1")
+        conf = SparkConf().set("spark.test.home", SPARK_HOME)
+        conf = conf.set("spark.worker.resource.gpu.discoveryScript", 
self.tempFile.name)
+        conf = conf.set("spark.worker.resource.gpu.amount", 1)
+        conf = conf.set("spark.task.resource.gpu.amount", "1")
         conf = conf.set("spark.executor.resource.gpu.amount", "1")
-        conf = conf.set("spark.executor.resource.gpu.discoveryScript", 
self.tempFile.name)
         self.sc = SparkContext('local-cluster[2,1,1024]', class_name, 
conf=conf)
 
     def test_resources(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to