This is an automated email from the ASF dual-hosted git repository.

meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 74e5e41  [SPARK-27488][CORE] Driver interface to support GPU resources
74e5e41 is described below

commit 74e5e41eebf9ed596b48e6db52a2a9c642e5cbc3
Author: Thomas Graves <tgra...@nvidia.com>
AuthorDate: Thu May 23 11:46:13 2019 -0700

    [SPARK-27488][CORE] Driver interface to support GPU resources
    
    ## What changes were proposed in this pull request?
    
    Added the driver functionality to get the resources.
    
    The user interface is: SparkContext.resources  - I called it this to match 
the TaskContext.resources api proposed in the other PR. Originally it was going 
to be called SparkContext.getResources but changed to be consistent, if people 
have strong feelings I can change it.
    
    There are 2 ways the driver can discover what resources it has.
      1) user specifies a discoveryScript, this is similar to the executors and 
is meant for yarn and k8s where they don't tell you what you were allocated but 
you are running in isolated environment.
      2) read the config spark.driver.resource.resourceName.addresses.  The 
config is meant to be used with standalone mode where the Worker will have to 
assign what GPU addresses the Driver is allowed to use by setting that config.
    
    When the user runs a spark application, if they want the driver to have 
GPU's they would specify the conf spark.driver.resource.gpu.count=X  where x is 
the number they want.  If they are running on yarn or k8s they will also have 
to specify the discoveryScript as specified above, if they are on standalone 
mode and cluster is setup properly they wouldn't have to specify anything else. 
 We could potentially get rid of the spark.driver.resources.gpu.addresses 
config which is really meant [...]
    
    - This PR also has changes to be consistent about using resourceName 
everywhere.
    - change the config names from POSTFIX to SUFFIX to be more consistent with 
other areas in Spark
    - Moved the config checks around a bit since now used by both executor and 
driver. Note those might overlap a bit with 
https://github.com/apache/spark/pull/24374 so we will have to figure out which 
one should go in first.
    
    ## How was this patch tested?
    
    Unit tests and manually test the interface.
    
    Closes #24615 from tgravescs/SPARK-27488.
    
    Authored-by: Thomas Graves <tgra...@nvidia.com>
    Signed-off-by: Xiangrui Meng <m...@databricks.com>
---
 .../org/apache/spark/ResourceDiscoverer.scala      | 88 ++++++++++++++-----
 .../main/scala/org/apache/spark/SparkConf.scala    | 60 +++++++++++++
 .../main/scala/org/apache/spark/SparkContext.scala | 47 +++++++++++
 .../executor/CoarseGrainedExecutorBackend.scala    | 64 +++-----------
 .../org/apache/spark/internal/config/package.scala |  5 +-
 .../org/apache/spark/ResourceDiscovererSuite.scala | 83 ++++++++++++++----
 .../scala/org/apache/spark/SparkConfSuite.scala    | 43 ++++++++++
 .../scala/org/apache/spark/SparkContextSuite.scala | 98 ++++++++++++++++++++++
 .../CoarseGrainedExecutorBackendSuite.scala        | 61 ++++----------
 docs/configuration.md                              | 29 +++++--
 10 files changed, 436 insertions(+), 142 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala 
b/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala
index 1963942..d3b3860 100644
--- a/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala
+++ b/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala
@@ -29,10 +29,10 @@ import org.apache.spark.internal.config._
 import org.apache.spark.util.Utils.executeAndGetOutput
 
 /**
- * Discovers resources (GPUs/FPGAs/etc). It currently only supports resources 
that have
- * addresses.
+ * Discovers information about resources (GPUs/FPGAs/etc). It currently only 
supports
+ * resources that have addresses.
  * This class finds resources by running and parsing the output of the user 
specified script
- * from the config 
spark.{driver/executor}.resource.{resourceType}.discoveryScript.
+ * from the config 
spark.{driver/executor}.resource.{resourceName}.discoveryScript.
  * The output of the script it runs is expected to be JSON in the format of the
  * ResourceInformation class.
  *
@@ -42,28 +42,41 @@ private[spark] object ResourceDiscoverer extends Logging {
 
   private implicit val formats = DefaultFormats
 
-  def findResources(sparkConf: SparkConf, isDriver: Boolean): Map[String, 
ResourceInformation] = {
-    val prefix = if (isDriver) {
-      SPARK_DRIVER_RESOURCE_PREFIX
-    } else {
-      SPARK_EXECUTOR_RESOURCE_PREFIX
-    }
-    // get unique resource types by grabbing first part config with multiple 
periods,
-    // ie resourceType.count, grab resourceType part
-    val resourceNames = sparkConf.getAllWithPrefix(prefix).map { case (k, _) =>
-      k.split('.').head
-    }.toSet
+  /**
+   * This function will discover information about a set of resources by using 
the
+   * user specified script 
(spark.{driver/executor}.resource.{resourceName}.discoveryScript).
+   * It optionally takes a set of resource names or if that isn't specified
+   * it uses the config prefix passed in to look at the executor or driver 
configs
+   * to get the resource names. Then for each resource it will run the 
discovery script
+   * and get the ResourceInformation about it.
+   *
+   * @param sparkConf SparkConf
+   * @param confPrefix Driver or Executor resource prefix
+   * @param resourceNamesOpt Optionally specify resource names. If not set 
uses the resource
+   *                  configs based on confPrefix passed in to get the 
resource names.
+   * @return Map of resource name to ResourceInformation
+   */
+  def discoverResourcesInformation(
+      sparkConf: SparkConf,
+      confPrefix: String,
+      resourceNamesOpt: Option[Set[String]] = None
+      ): Map[String, ResourceInformation] = {
+    val resourceNames = resourceNamesOpt.getOrElse(
+      // get unique resource names by grabbing first part config with multiple 
periods,
+      // ie resourceName.count, grab resourceName part
+      SparkConf.getBaseOfConfigs(sparkConf.getAllWithPrefix(confPrefix))
+    )
     resourceNames.map { rName => {
-      val rInfo = getResourceInfoForType(sparkConf, prefix, rName)
+      val rInfo = getResourceInfo(sparkConf, confPrefix, rName)
       (rName -> rInfo)
     }}.toMap
   }
 
-  private def getResourceInfoForType(
+  private def getResourceInfo(
       sparkConf: SparkConf,
-      prefix: String,
-      resourceType: String): ResourceInformation = {
-    val discoveryConf = prefix + resourceType + 
SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX
+      confPrefix: String,
+      resourceName: String): ResourceInformation = {
+    val discoveryConf = confPrefix + resourceName + 
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX
     val script = sparkConf.getOption(discoveryConf)
     val result = if (script.nonEmpty) {
       val scriptFile = new File(script.get)
@@ -73,21 +86,50 @@ private[spark] object ResourceDiscoverer extends Logging {
           val output = executeAndGetOutput(Seq(script.get), new File("."))
           val parsedJson = parse(output)
           val name = (parsedJson \ "name").extract[String]
-          val addresses = (parsedJson \ 
"addresses").extract[Array[String]].toArray
+          val addresses = (parsedJson \ "addresses").extract[Array[String]]
+          if (name != resourceName) {
+            throw new SparkException(s"Discovery script: ${script.get} 
specified via " +
+              s"$discoveryConf returned a resource name: $name that doesn't 
match the " +
+              s"config name: $resourceName")
+          }
           new ResourceInformation(name, addresses)
         } catch {
           case e @ (_: SparkException | _: MappingException | _: 
JsonParseException) =>
             throw new SparkException(s"Error running the resource discovery 
script: $scriptFile" +
-              s" for $resourceType", e)
+              s" for $resourceName", e)
         }
       } else {
-        throw new SparkException(s"Resource script: $scriptFile to discover 
$resourceType" +
+        throw new SparkException(s"Resource script: $scriptFile to discover 
$resourceName" +
           s" doesn't exist!")
       }
     } else {
-      throw new SparkException(s"User is expecting to use $resourceType 
resources but " +
+      throw new SparkException(s"User is expecting to use $resourceName 
resources but " +
         s"didn't specify a script via conf: $discoveryConf, to find them!")
     }
     result
   }
+
+  /**
+   * Make sure the actual resources we have on startup are at least the number 
the user
+   * requested. Note that there is other code in SparkConf that makes sure we 
have executor configs
+   * for each task resource requirement and that they are large enough. This 
function
+   * is used by both driver and executors.
+   *
+   * @param requiredResources The resources that are required for us to run.
+   * @param actualResources The actual resources discovered.
+   */
+  def checkActualResourcesMeetRequirements(
+      requiredResources: Map[String, String],
+      actualResources: Map[String, ResourceInformation]): Unit = {
+    requiredResources.foreach { case (rName, reqCount) =>
+      val actualRInfo = actualResources.get(rName).getOrElse(
+        throw new SparkException(s"Resource: $rName required but wasn't 
discovered on startup"))
+
+      if (actualRInfo.addresses.size < reqCount.toLong) {
+        throw new SparkException(s"Resource: $rName, with addresses: " +
+          s"${actualRInfo.addresses.mkString(",")} " +
+          s"is less than what the user requested: $reqCount)")
+      }
+    }
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index bd2ef5b..15f1730 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -415,6 +415,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
       .map { case (k, v) => (k.substring(prefix.length), v) }
   }
 
+  /**
+   * Get all parameters that start with `prefix` and end with 'suffix'
+   */
+  def getAllWithPrefixAndSuffix(prefix: String, suffix: String): 
Array[(String, String)] = {
+    getAll.filter { case (k, v) => k.startsWith(prefix) && k.endsWith(suffix) }
+      .map { case (k, v) => (k.substring(prefix.length, (k.length - 
suffix.length)), v) }
+  }
 
   /**
    * Get a parameter as an integer, falling back to a default if not set
@@ -596,6 +603,30 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
     require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The 
value of " +
       s"${networkTimeout}=${executorTimeoutThresholdMs}ms must be no less than 
the value of " +
       s"${EXECUTOR_HEARTBEAT_INTERVAL.key}=${executorHeartbeatIntervalMs}ms.")
+
+    // Make sure the executor resources were specified and are large enough if
+    // any task resources were specified.
+    val taskResourcesAndCount =
+    getAllWithPrefixAndSuffix(SPARK_TASK_RESOURCE_PREFIX, 
SPARK_RESOURCE_COUNT_SUFFIX).toMap
+    val executorResourcesAndCounts =
+      getAllWithPrefixAndSuffix(SPARK_EXECUTOR_RESOURCE_PREFIX, 
SPARK_RESOURCE_COUNT_SUFFIX).toMap
+
+    taskResourcesAndCount.foreach { case (rName, taskCount) =>
+      val execCount = executorResourcesAndCounts.get(rName).getOrElse(
+        throw new SparkException(
+          s"The executor resource config: " +
+            s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_COUNT_SUFFIX} " +
+            "needs to be specified since a task requirement config: " +
+            s"${SPARK_TASK_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_COUNT_SUFFIX} was specified")
+      )
+      if (execCount.toLong < taskCount.toLong) {
+        throw new SparkException(
+          s"The executor resource config: " +
+            s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_COUNT_SUFFIX} " +
+            s"= $execCount has to be >= the task config: " +
+            s"${SPARK_TASK_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_COUNT_SUFFIX} = $taskCount")
+      }
+    }
   }
 
   /**
@@ -789,6 +820,35 @@ private[spark] object SparkConf extends Logging {
   }
 
   /**
+   * A function to help parsing configs with multiple parts where the base and
+   * suffix could be one of many options. For instance configs like:
+   * spark.executor.resource.{resourceName}.{count/addresses}
+   * This function takes an Array of configs you got from the
+   * getAllWithPrefix function, selects only those that end with the suffix
+   * passed in and returns just the base part of the config before the first
+   * '.' and its value.
+   */
+  def getConfigsWithSuffix(
+      configs: Array[(String, String)],
+      suffix: String
+      ): Array[(String, String)] = {
+    configs.filter { case (rConf, _) => rConf.endsWith(suffix)}.
+      map { case (k, v) => (k.split('.').head, v) }
+  }
+
+  /**
+   * A function to help parsing configs with multiple parts where the base and
+   * suffix could be one of many options. For instance configs like:
+   * spark.executor.resource.{resourceName}.{count/addresses}
+   * This function takes an Array of configs you got from the
+   * getAllWithPrefix function and returns the base part of the config
+   * before the first '.'.
+   */
+  def getBaseOfConfigs(configs: Array[(String, String)]): Set[String] = {
+    configs.map { case (k, _) => k.split('.').head }.toSet
+  }
+
+  /**
    * Holds information about keys that have been deprecated and do not have a 
replacement.
    *
    * @param key The deprecated key.
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 9979410..878010d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -213,6 +213,7 @@ class SparkContext(config: SparkConf) extends Logging {
   private var _shutdownHookRef: AnyRef = _
   private var _statusStore: AppStatusStore = _
   private var _heartbeater: Heartbeater = _
+  private var _resources: scala.collection.immutable.Map[String, 
ResourceInformation] = _
 
   /* 
-------------------------------------------------------------------------------------
 *
    | Accessors and public fields. These provide access to the internal state 
of the        |
@@ -227,6 +228,8 @@ class SparkContext(config: SparkConf) extends Logging {
    */
   def getConf: SparkConf = conf.clone()
 
+  def resources: Map[String, ResourceInformation] = _resources
+
   def jars: Seq[String] = _jars
   def files: Seq[String] = _files
   def master: String = _conf.get("spark.master")
@@ -360,6 +363,48 @@ class SparkContext(config: SparkConf) extends Logging {
     Utils.setLogLevel(org.apache.log4j.Level.toLevel(upperCased))
   }
 
+  /**
+   * Checks to see if any resources (GPU/FPGA/etc) are available to the driver 
by looking
+   * at and processing the spark.driver.resource.resourceName.addresses and
+   * spark.driver.resource.resourceName.discoveryScript configs. The configs 
have to be
+   * present when the driver starts, setting them after startup does not work.
+   *
+   * If any resource addresses configs were specified then assume all 
resources will be specified
+   * in that way. Otherwise use the discovery scripts to find the resources. 
Users should
+   * not really be setting the addresses config directly and should not be 
mixing methods
+   * for different types of resources since the addresses config is meant for 
Standalone mode
+   * and other cluster managers should use the discovery scripts.
+   */
+  private def setupDriverResources(): Unit = {
+    // Only call getAllWithPrefix once and filter on those since there could 
be a lot of spark
+    // configs.
+    val allDriverResourceConfs = 
_conf.getAllWithPrefix(SPARK_DRIVER_RESOURCE_PREFIX)
+    val resourcesWithAddrsInConfs =
+      SparkConf.getConfigsWithSuffix(allDriverResourceConfs, 
SPARK_RESOURCE_ADDRESSES_SUFFIX)
+
+    _resources = if (resourcesWithAddrsInConfs.nonEmpty) {
+      resourcesWithAddrsInConfs.map { case (rName, addrString) =>
+        val addrsArray = addrString.split(",").map(_.trim())
+        (rName -> new ResourceInformation(rName, addrsArray))
+      }.toMap
+    } else {
+      // we already have the resources confs here so just pass in the unique 
resource names
+      // rather then having the resource discoverer reparse all the configs.
+      val uniqueResources = SparkConf.getBaseOfConfigs(allDriverResourceConfs)
+      ResourceDiscoverer.discoverResourcesInformation(_conf, 
SPARK_DRIVER_RESOURCE_PREFIX,
+        Some(uniqueResources))
+    }
+    // verify the resources we discovered are what the user requested
+    val driverReqResourcesAndCounts =
+      SparkConf.getConfigsWithSuffix(allDriverResourceConfs, 
SPARK_RESOURCE_COUNT_SUFFIX).toMap
+    
ResourceDiscoverer.checkActualResourcesMeetRequirements(driverReqResourcesAndCounts,
 _resources)
+
+    
logInfo("===============================================================================")
+    logInfo(s"Driver Resources:")
+    _resources.foreach { case (k, v) => logInfo(s"$k -> $v") }
+    
logInfo("===============================================================================")
+  }
+
   try {
     _conf = config.clone()
     _conf.validateSettings()
@@ -373,6 +418,8 @@ class SparkContext(config: SparkConf) extends Logging {
 
     _driverLogger = DriverLogger(_conf)
 
+    setupDriverResources()
+
     // log out spark.app.name in the Spark driver logs
     logInfo(s"Submitted application: $appName")
 
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index af01e0b..fac4d40 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -83,51 +83,11 @@ private[spark] class CoarseGrainedExecutorBackend(
     }(ThreadUtils.sameThread)
   }
 
-  // Check that the actual resources discovered will satisfy the user specified
-  // requirements and that they match the configs specified by the user to 
catch
-  // mismatches between what the user requested and what resource manager gave 
or
-  // what the discovery script found.
-  private def checkResourcesMeetRequirements(
-      resourceConfigPrefix: String,
-      reqResourcesAndCounts: Array[(String, String)],
-      actualResources: Map[String, ResourceInformation]): Unit = {
-
-    reqResourcesAndCounts.foreach { case (rName, reqCount) =>
-      if (actualResources.contains(rName)) {
-        val resourceInfo = actualResources(rName)
-
-        if (resourceInfo.addresses.size < reqCount.toLong) {
-          throw new SparkException(s"Resource: $rName with addresses: " +
-            s"${resourceInfo.addresses.mkString(",")} doesn't meet the " +
-            s"requirements of needing $reqCount of them")
-        }
-        // also make sure the resource count on start matches the
-        // resource configs specified by user
-        val userCountConfigName =
-          resourceConfigPrefix + rName + SPARK_RESOURCE_COUNT_POSTFIX
-        val userConfigCount = env.conf.getOption(userCountConfigName).
-          getOrElse(throw new SparkException(s"Resource: $rName not specified 
" +
-            s"via config: $userCountConfigName, but required, " +
-            "please fix your configuration"))
-
-        if (userConfigCount.toLong > resourceInfo.addresses.size) {
-          throw new SparkException(s"Resource: $rName, with addresses: " +
-            s"${resourceInfo.addresses.mkString(",")} " +
-            s"is less than what the user requested for count: 
$userConfigCount, " +
-            s"via $userCountConfigName")
-        }
-      } else {
-        throw new SparkException(s"Executor resource config missing required 
task resource: $rName")
-      }
-    }
-  }
-
   // visible for testing
   def parseOrFindResources(resourcesFile: Option[String]): Map[String, 
ResourceInformation] = {
     // only parse the resources if a task requires them
-    val taskResourceConfigs = 
env.conf.getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX)
-    val resourceInfo = if (taskResourceConfigs.nonEmpty) {
-      val execResources = resourcesFile.map { resourceFileStr => {
+    val resourceInfo = if 
(env.conf.getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX).nonEmpty) {
+      val actualExecResources = resourcesFile.map { resourceFileStr => {
         val source = new BufferedInputStream(new 
FileInputStream(resourceFileStr))
         val resourceMap = try {
           val parsedJson = parse(source).asInstanceOf[JArray].arr
@@ -144,26 +104,26 @@ private[spark] class CoarseGrainedExecutorBackend(
           source.close()
         }
         resourceMap
-      }}.getOrElse(ResourceDiscoverer.findResources(env.conf, isDriver = 
false))
+      }}.getOrElse(ResourceDiscoverer.discoverResourcesInformation(env.conf,
+        SPARK_EXECUTOR_RESOURCE_PREFIX))
 
-      if (execResources.isEmpty) {
+      if (actualExecResources.isEmpty) {
         throw new SparkException("User specified resources per task via: " +
           s"$SPARK_TASK_RESOURCE_PREFIX, but can't find any resources 
available on the executor.")
       }
-      // get just the map of resource name to count
-      val resourcesAndCounts = taskResourceConfigs.
-        withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_COUNT_POSTFIX)}.
-        map { case (k, v) => (k.dropRight(SPARK_RESOURCE_COUNT_POSTFIX.size), 
v)}
+      val execReqResourcesAndCounts =
+        env.conf.getAllWithPrefixAndSuffix(SPARK_EXECUTOR_RESOURCE_PREFIX,
+          SPARK_RESOURCE_COUNT_SUFFIX).toMap
 
-      checkResourcesMeetRequirements(SPARK_EXECUTOR_RESOURCE_PREFIX, 
resourcesAndCounts,
-        execResources)
+      
ResourceDiscoverer.checkActualResourcesMeetRequirements(execReqResourcesAndCounts,
+        actualExecResources)
 
       
logInfo("===============================================================================")
       logInfo(s"Executor $executorId Resources:")
-      execResources.foreach { case (k, v) => logInfo(s"$k -> $v") }
+      actualExecResources.foreach { case (k, v) => logInfo(s"$k -> $v") }
       
logInfo("===============================================================================")
 
-      execResources
+      actualExecResources
     } else {
       if (resourcesFile.nonEmpty) {
         logWarning(s"A resources file was specified but the application is not 
configured " +
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 0aed1af..d9341bf 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -34,8 +34,9 @@ package object config {
   private[spark] val SPARK_EXECUTOR_RESOURCE_PREFIX = 
"spark.executor.resource."
   private[spark] val SPARK_TASK_RESOURCE_PREFIX = "spark.task.resource."
 
-  private[spark] val SPARK_RESOURCE_COUNT_POSTFIX = ".count"
-  private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX = 
".discoveryScript"
+  private[spark] val SPARK_RESOURCE_COUNT_SUFFIX = ".count"
+  private[spark] val SPARK_RESOURCE_ADDRESSES_SUFFIX = ".addresses"
+  private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX = 
".discoveryScript"
 
   private[spark] val DRIVER_CLASS_PATH =
     
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional
diff --git a/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala 
b/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
index 77f8a0b..623cd5c 100644
--- a/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
@@ -40,7 +40,8 @@ class ResourceDiscovererSuite extends SparkFunSuite
 
   test("Resource discoverer no resources") {
     val sparkconf = new SparkConf
-    val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = 
false)
+    val resources =
+      ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
     assert(resources.size === 0)
     assert(resources.get("gpu").isEmpty,
       "Should have a gpus entry that is empty")
@@ -54,8 +55,36 @@ class ResourceDiscovererSuite extends SparkFunSuite
       val scriptPath = mockDiscoveryScript(gpuFile,
         """'{"name": "gpu","addresses":["0", "1"]}'""")
       sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, scriptPath)
-      val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = 
false)
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
+      val resources =
+        ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
+      val gpuValue = resources.get("gpu")
+      assert(gpuValue.nonEmpty, "Should have a gpu entry")
+      assert(gpuValue.get.name == "gpu", "name should be gpu")
+      assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
+      assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 
0,1 entries")
+    }
+  }
+
+  test("Resource discoverer passed in resources") {
+    val sparkconf = new SparkConf
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val gpuFile = new File(dir, "gpuDiscoverScript")
+      val gpuScript = mockDiscoveryScript(gpuFile,
+        """'{"name": "gpu","addresses":["0", "1"]}'""")
+      val fpgaFile = new File(dir, "fpgaDiscoverScript")
+      val fpgaScript = mockDiscoveryScript(fpgaFile,
+        """'{"name": "fpga","addresses":["f0", "f1"]}'""")
+      sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuScript)
+      sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" +
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, fpgaScript)
+      // it should only look at the resources passed in and ignore fpga conf
+      val resources =
+        ResourceDiscoverer.discoverResourcesInformation(sparkconf,
+          SPARK_EXECUTOR_RESOURCE_PREFIX, Some(Set("gpu")))
+      assert(resources.size === 1, "should only have the gpu resource")
       val gpuValue = resources.get("gpu")
       assert(gpuValue.nonEmpty, "Should have a gpu entry")
       assert(gpuValue.get.name == "gpu", "name should be gpu")
@@ -72,8 +101,9 @@ class ResourceDiscovererSuite extends SparkFunSuite
       val scriptPath = mockDiscoveryScript(gpuFile,
         """'{"name": "gpu"}'""")
       sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, scriptPath)
-      val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = 
false)
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
+      val resources =
+        ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
       val gpuValue = resources.get("gpu")
       assert(gpuValue.nonEmpty, "Should have a gpu entry")
       assert(gpuValue.get.name == "gpu", "name should be gpu")
@@ -89,15 +119,16 @@ class ResourceDiscovererSuite extends SparkFunSuite
       val gpuDiscovery = mockDiscoveryScript(gpuFile,
         """'{"name": "gpu", "addresses": ["0", "1"]}'""")
       sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, gpuDiscovery)
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery)
 
       val fpgaFile = new File(dir, "fpgaDiscoverScript")
       val fpgaDiscovery = mockDiscoveryScript(fpgaFile,
         """'{"name": "fpga", "addresses": ["f1", "f2", "f3"]}'""")
       sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, fpgaDiscovery)
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, fpgaDiscovery)
 
-      val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = 
false)
+      val resources =
+        ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
       assert(resources.size === 2)
       val gpuValue = resources.get("gpu")
       assert(gpuValue.nonEmpty, "Should have a gpu entry")
@@ -122,11 +153,12 @@ class ResourceDiscovererSuite extends SparkFunSuite
       val gpuDiscovery = mockDiscoveryScript(gpuFile,
         """'{"name": "gpu", "addresses": ["0", "1"]}'""")
       sparkconf.set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, gpuDiscovery)
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery)
       sparkconf set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, "boguspath")
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, "boguspath")
       // make sure it reads from correct config, here it should use driver
-      val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = 
true)
+      val resources =
+        ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_DRIVER_RESOURCE_PREFIX)
       val gpuValue = resources.get("gpu")
       assert(gpuValue.nonEmpty, "Should have a gpu entry")
       assert(gpuValue.get.name == "gpu", "name should be gpu")
@@ -135,6 +167,23 @@ class ResourceDiscovererSuite extends SparkFunSuite
     }
   }
 
+  test("Resource discoverer script returns mismatched name") {
+    val sparkconf = new SparkConf
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val gpuFile = new File(dir, "gpuDiscoverScript")
+      val gpuDiscovery = mockDiscoveryScript(gpuFile,
+        """'{"name": "fpga", "addresses": ["0", "1"]}'""")
+      sparkconf.set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery)
+      val error = intercept[SparkException] {
+        ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_DRIVER_RESOURCE_PREFIX)
+      }.getMessage()
+
+      assert(error.contains("Error running the resource discovery script"))
+    }
+  }
+
   test("Resource discoverer script returns invalid format") {
     val sparkconf = new SparkConf
     assume(!(Utils.isWindows))
@@ -143,9 +192,9 @@ class ResourceDiscovererSuite extends SparkFunSuite
       val gpuDiscovery = mockDiscoveryScript(gpuFile,
         """'{"addresses": ["0", "1"]}'""")
       sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, gpuDiscovery)
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery)
       val error = intercept[SparkException] {
-        ResourceDiscoverer.findResources(sparkconf, isDriver = false)
+        ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
       }.getMessage()
 
       assert(error.contains("Error running the resource discovery"))
@@ -158,10 +207,10 @@ class ResourceDiscovererSuite extends SparkFunSuite
       val file1 = new File(dir, "bogusfilepath")
       try {
         sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
-          SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, file1.getPath())
+          SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, file1.getPath())
 
         val error = intercept[SparkException] {
-          ResourceDiscoverer.findResources(sparkconf, isDriver = false)
+          ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
         }.getMessage()
 
         assert(error.contains("doesn't exist"))
@@ -174,10 +223,10 @@ class ResourceDiscovererSuite extends SparkFunSuite
   test("gpu's specified but not discovery script") {
     val sparkconf = new SparkConf
     sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
-      SPARK_RESOURCE_COUNT_POSTFIX, "2")
+      SPARK_RESOURCE_COUNT_SUFFIX, "2")
 
     val error = intercept[SparkException] {
-      ResourceDiscoverer.findResources(sparkconf, isDriver = false)
+      ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
     }.getMessage()
 
     assert(error.contains("User is expecting to use"))
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 83a9ea3..965c7df 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -110,6 +110,49 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
     assert(conf.getOption("k4") === None)
   }
 
+  test("basic getAllWithPrefixAndPostfix") {
+    val conf = new SparkConf(false)
+    conf.set("spark.prefix.main.suffix", "v1")
+    val prefix = "spark.prefix."
+    val suffix = ".suffix"
+    assert(conf.getAllWithPrefixAndSuffix(prefix, suffix).toSet ===
+      Set(("main", "v1")))
+
+    conf.set("spark.prefix.main2.suffix", "v2")
+    conf.set("spark.prefix.main3.extra1.suffix", "v3")
+    conf.set("spark.prefix.main4.extra2.nonmatchingsuffix", "v4")
+    conf.set("spark.notmatchingprefix.main4.suffix", "v5")
+
+    assert(conf.getAllWithPrefixAndSuffix(prefix, suffix).toSet ===
+      Set(("main", "v1"), ("main2", "v2"), ("main3.extra1", "v3")))
+  }
+
+  test("test prefix config parsing utilities") {
+    val conf = new SparkConf(false)
+    conf.set("spark.prefix.main.suffix", "v1")
+    val prefix = "spark.prefix."
+    val suffix = ".suffix"
+    val configsWithPrefix = conf.getAllWithPrefix(prefix)
+    assert(configsWithPrefix.toSet === Set(("main.suffix", "v1")))
+    assert(SparkConf.getBaseOfConfigs(configsWithPrefix) === Set("main"))
+    assert(SparkConf.getConfigsWithSuffix(configsWithPrefix, suffix).toSet === 
Set(("main", "v1")))
+  }
+
+  test("basic getAllWithPrefix") {
+    val prefix = "spark.prefix."
+    val conf = new SparkConf(false)
+    conf.set("spark.prefix.main.suffix", "v1")
+    assert(conf.getAllWithPrefix(prefix).toSet ===
+      Set(("main.suffix", "v1")))
+
+    conf.set("spark.prefix.main2.suffix", "v2")
+    conf.set("spark.prefix.main3.extra1.suffix", "v3")
+    conf.set("spark.notMatching.main4", "v4")
+
+    assert(conf.getAllWithPrefix(prefix).toSet ===
+      Set(("main.suffix", "v1"), ("main2.suffix", "v2"), 
("main3.extra1.suffix", "v3")))
+  }
+
   test("creating SparkContext without master and app name") {
     val conf = new SparkConf(false)
     intercept[SparkException] { sc = new SparkContext(conf) }
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 1dcb2f7..abd7d8a 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -20,6 +20,9 @@ package org.apache.spark
 import java.io.File
 import java.net.{MalformedURLException, URI}
 import java.nio.charset.StandardCharsets
+import java.nio.file.{Files => JavaFiles}
+import java.nio.file.attribute.PosixFilePermission._
+import java.util.EnumSet
 import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit}
 
 import scala.concurrent.duration._
@@ -731,6 +734,101 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
       resetSparkContext()
     }
   }
+
+  test("test gpu driver discovery under local-cluster mode") {
+    withTempDir { dir =>
+      val gpuFile = new File(dir, "gpuDiscoverScript")
+      val scriptPath = mockDiscoveryScript(gpuFile,
+        """'{"name": "gpu","addresses":["5", "6"]}'""")
+
+      val conf = new SparkConf()
+        .set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
+          SPARK_RESOURCE_COUNT_SUFFIX, "1")
+        .set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
+          SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
+        .setMaster("local-cluster[1, 1, 1024]")
+        .setAppName("test-cluster")
+      sc = new SparkContext(conf)
+
+      // Ensure all executors has started
+      eventually(timeout(10.seconds)) {
+        assert(sc.statusTracker.getExecutorInfos.size == 1)
+      }
+      assert(sc.resources.size === 1)
+      assert(sc.resources.get("gpu").get.addresses === Array("5", "6"))
+      assert(sc.resources.get("gpu").get.name === "gpu")
+    }
+  }
+
+  test("test gpu driver resource address and discovery under local-cluster 
mode") {
+    withTempDir { dir =>
+      val gpuFile = new File(dir, "gpuDiscoverScript")
+      val scriptPath = mockDiscoveryScript(gpuFile,
+        """'{"name": "gpu","addresses":["5", "6"]}'""")
+
+      val conf = new SparkConf()
+        .set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
+          SPARK_RESOURCE_COUNT_SUFFIX, "1")
+        .set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
+          SPARK_RESOURCE_ADDRESSES_SUFFIX, "0, 1, 8")
+        .set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
+          SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
+        .setMaster("local-cluster[1, 1, 1024]")
+        .setAppName("test-cluster")
+      sc = new SparkContext(conf)
+
+      // Ensure all executors has started
+      eventually(timeout(10.seconds)) {
+        assert(sc.statusTracker.getExecutorInfos.size == 1)
+      }
+      // driver gpu addresses config should take precedence over the script
+      assert(sc.resources.size === 1)
+      assert(sc.resources.get("gpu").get.addresses === Array("0", "1", "8"))
+      assert(sc.resources.get("gpu").get.name === "gpu")
+    }
+  }
+
+  test("Test parsing resources task configs with missing executor config") {
+    val conf = new SparkConf()
+      .set(SPARK_TASK_RESOURCE_PREFIX + "gpu" +
+        SPARK_RESOURCE_COUNT_SUFFIX, "1")
+      .setMaster("local-cluster[1, 1, 1024]")
+      .setAppName("test-cluster")
+
+    var error = intercept[SparkException] {
+      sc = new SparkContext(conf)
+    }.getMessage()
+
+    assert(error.contains("The executor resource config: 
spark.executor.resource.gpu.count " +
+      "needs to be specified since a task requirement config: 
spark.task.resource.gpu.count " +
+      "was specified"))
+  }
+
+  test("Test parsing resources executor config < task requirements") {
+    val conf = new SparkConf()
+      .set(SPARK_TASK_RESOURCE_PREFIX + "gpu" +
+        SPARK_RESOURCE_COUNT_SUFFIX, "2")
+      .set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
+        SPARK_RESOURCE_COUNT_SUFFIX, "1")
+      .setMaster("local-cluster[1, 1, 1024]")
+      .setAppName("test-cluster")
+
+    var error = intercept[SparkException] {
+      sc = new SparkContext(conf)
+    }.getMessage()
+
+    assert(error.contains("The executor resource config: " +
+      "spark.executor.resource.gpu.count = 1 has to be >= the task config: " +
+      "spark.task.resource.gpu.count = 2"))
+  }
+
+  def mockDiscoveryScript(file: File, result: String): String = {
+    Files.write(s"echo $result", file, StandardCharsets.UTF_8)
+    JavaFiles.setPosixFilePermissions(file.toPath(),
+      EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
+    file.getPath()
+  }
+
 }
 
 object SparkContextSuite {
diff --git 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index e59d1b0..43913d1 100644
--- 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -49,7 +49,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
 
   test("parsing no resources") {
     val conf = new SparkConf
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + 
SPARK_RESOURCE_COUNT_POSTFIX, "2")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, 
"2")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
 
@@ -71,8 +71,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
 
   test("parsing one resources") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + 
SPARK_RESOURCE_COUNT_POSTFIX, "2")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + 
SPARK_RESOURCE_COUNT_POSTFIX, "2")
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + 
SPARK_RESOURCE_COUNT_SUFFIX, "2")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, 
"2")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
@@ -95,10 +95,10 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
 
   test("parsing multiple resources") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + 
SPARK_RESOURCE_COUNT_POSTFIX, "3")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + "fpga" + 
SPARK_RESOURCE_COUNT_POSTFIX, "3")
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + 
SPARK_RESOURCE_COUNT_POSTFIX, "2")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + 
SPARK_RESOURCE_COUNT_POSTFIX, "2")
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + 
SPARK_RESOURCE_COUNT_SUFFIX, "3")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "fpga" + 
SPARK_RESOURCE_COUNT_SUFFIX, "3")
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + 
SPARK_RESOURCE_COUNT_SUFFIX, "2")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, 
"2")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
@@ -128,8 +128,8 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
 
   test("error checking parsing resources and executor and task configs") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + 
SPARK_RESOURCE_COUNT_POSTFIX, "2")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + 
SPARK_RESOURCE_COUNT_POSTFIX, "2")
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + 
SPARK_RESOURCE_COUNT_SUFFIX, "2")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, 
"2")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
@@ -148,7 +148,8 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
         val parsedResources = backend.parseOrFindResources(Some(f1))
       }.getMessage()
 
-      assert(error.contains("doesn't meet the requirements of needing"))
+      assert(error.contains("Resource: gpu, with addresses: 0 is less than 
what the " +
+        "user requested: 2"))
     }
 
     // missing resource on the executor
@@ -163,14 +164,14 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
         val parsedResources = backend.parseOrFindResources(Some(f1))
       }.getMessage()
 
-      assert(error.contains("Executor resource config missing required task 
resource"))
+      assert(error.contains("Resource: gpu required but wasn't discovered on 
startup"))
     }
   }
 
   test("executor resource found less than required") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + 
SPARK_RESOURCE_COUNT_POSTFIX, "4")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + 
SPARK_RESOURCE_COUNT_POSTFIX, "1")
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + 
SPARK_RESOURCE_COUNT_SUFFIX, "4")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, 
"1")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
@@ -189,40 +190,14 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
         val parsedResources = backend.parseOrFindResources(Some(f1))
       }.getMessage()
 
-      assert(error.contains("is less than what the user requested for count"))
-    }
-  }
-
-  test("parsing resources task configs with missing executor config") {
-    val conf = new SparkConf
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + 
SPARK_RESOURCE_COUNT_POSTFIX, "2")
-    val serializer = new JavaSerializer(conf)
-    val env = createMockEnv(conf, serializer)
-    // we don't really use this, just need it to get at the parser function
-    val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", 
"1", "host1",
-      4, Seq.empty[URL], env, None)
-
-    withTempDir { tmpDir =>
-      val gpuArgs =
-        ("name" -> "gpu") ~
-          ("addresses" -> Seq("0", "1"))
-      val ja = JArray(List(gpuArgs))
-      val f1 = writeFileWithJson(tmpDir, ja)
-
-      var error = intercept[SparkException] {
-        val parsedResources = backend.parseOrFindResources(Some(f1))
-      }.getMessage()
-
-      assert(error.contains("Resource: gpu not specified via config: " +
-        "spark.executor.resource.gpu.count, but required, please " +
-        "fix your configuration"))
+      assert(error.contains("gpu, with addresses: 0,1 is less than what the 
user requested: 4"))
     }
   }
 
   test("use discoverer") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + 
SPARK_RESOURCE_COUNT_POSTFIX, "3")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + "fpga" + 
SPARK_RESOURCE_COUNT_POSTFIX, "3")
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + 
SPARK_RESOURCE_COUNT_SUFFIX, "3")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "fpga" + 
SPARK_RESOURCE_COUNT_SUFFIX, "3")
     assume(!(Utils.isWindows))
     withTempDir { dir =>
       val fpgaDiscovery = new File(dir, "resourceDiscoverScriptfpga")
@@ -231,7 +206,7 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
       JavaFiles.setPosixFilePermissions(fpgaDiscovery.toPath(),
         EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
       conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, fpgaDiscovery.getPath())
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, fpgaDiscovery.getPath())
 
       val serializer = new JavaSerializer(conf)
       val env = createMockEnv(conf, serializer)
diff --git a/docs/configuration.md b/docs/configuration.md
index d0b2699..d20b416 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -188,6 +188,25 @@ of the most common options to set are:
   </td>
 </tr>
 <tr>
+ <td><code>spark.driver.resource.{resourceName}.count</code></td>
+  <td>0</td>
+  <td>
+    The number of a particular resource type to use on the driver.
+    If this is used, you must also specify the
+    <code>spark.driver.resource.{resourceName}.discoveryScript</code>
+    for the driver to find the resource on startup.
+  </td>
+</tr>
+<tr>
+ <td><code>spark.driver.resource.{resourceName}.discoveryScript</code></td>
+  <td>None</td>
+  <td>
+    A script for the driver to run to discover a particular resource type. 
This should
+    write to STDOUT a JSON string in the format of the ResourceInformation 
class. This has a
+    name and an array of addresses.
+  </td>
+</tr>
+<tr>
   <td><code>spark.executor.memory</code></td>
   <td>1g</td>
   <td>
@@ -222,17 +241,17 @@ of the most common options to set are:
   </td>
 </tr>
 <tr>
- <td><code>spark.executor.resource.{resourceType}.count</code></td>
+ <td><code>spark.executor.resource.{resourceName}.count</code></td>
   <td>0</td>
   <td>
     The number of a particular resource type to use per executor process.
     If this is used, you must also specify the
-    <code>spark.executor.resource.{resourceType}.discoveryScript</code>
+    <code>spark.executor.resource.{resourceName}.discoveryScript</code>
     for the executor to find the resource on startup.
   </td>
 </tr>
 <tr>
- <td><code>spark.executor.resource.{resourceType}.discoveryScript</code></td>
+ <td><code>spark.executor.resource.{resourceName}.discoveryScript</code></td>
   <td>None</td>
   <td>
     A script for the executor to run to discover a particular resource type. 
This should
@@ -1813,11 +1832,11 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.task.resource.{resourceType}.count</code></td>
+  <td><code>spark.task.resource.{resourceName}.count</code></td>
   <td>1</td>
   <td>
     Number of a particular resource type to allocate for each task. If this is 
specified
-    you must also provide the executor config 
<code>spark.executor.resource.{resourceType}.count</code>
+    you must also provide the executor config 
<code>spark.executor.resource.{resourceName}.count</code>
     and any corresponding discovery configs so that your executors are created 
with that resource type.
   </td>
 </tr>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to