mengxr commented on a change in pull request #24374: [WIP][SPARK-27366][CORE] 
Support GPU Resources in Spark job scheduling
URL: https://github.com/apache/spark/pull/24374#discussion_r281855721
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/SparkConf.scala
 ##########
 @@ -491,6 +491,101 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
     }
   }
 
+  /**
+   * Set available resources on the driver/executor.
+   */
+  private[spark] def setResources(
+      resourceName: String,
+      resourceCount: Int,
+      resourceAddresses: Option[Seq[String]],
+      isDriver: Boolean): SparkConf = {
+    if (resourceAddresses.isDefined && resourceAddresses.get.size != 
resourceCount) {
+      val nodeType = if (isDriver) {
+        "driver"
+      } else {
+        "executor"
+      }
+      throw new SparkException(s"Specified $resourceCount $resourceName(s) on 
$nodeType, while " +
+        s"the size of device addresses is ${resourceAddresses.size}.")
+    }
+    val prefix = if (isDriver) {
+      s"$SPARK_DRIVER_RESOURCE_PREFIX.$resourceName"
+    } else {
+      s"$SPARK_EXECUTOR_RESOURCE_PREFIX.$resourceName"
+    }
+    set(s"$prefix.$SPARK_RESOURCE_COUNT", resourceCount.toString)
+    if (resourceAddresses.isDefined) {
+      set(s"$prefix.$SPARK_RESOURCE_ADDRESSES", 
resourceAddresses.get.mkString(","))
+    }
+    this
+  }
+
+  /**
+   * Set task resource requirement.
+   */
+  private[spark] def setResourceRequirement(
+      resourceName: String, resourceCount: Int): SparkConf = {
+    val key = 
s"$SPARK_TASK_RESOURCE_PREFIX.$resourceName.$SPARK_RESOURCE_COUNT"
+    set(key, resourceCount.toString)
+    this
+  }
+
+  /**
+   * Get available resources on the driver/executor.
+   */
+  private[spark] def getResources(isDriver: Boolean): Map[String, 
ResourceInformation] = {
+    val prefix = if (isDriver) {
+      s"$SPARK_DRIVER_RESOURCE_PREFIX."
+    } else {
+      s"$SPARK_EXECUTOR_RESOURCE_PREFIX."
+    }
+
+    val resourceCountMap = new HashMap[String, Int]
+    val resourceAddressMap = new HashMap[String, Array[String]]
+    getAllWithPrefix(prefix).foreach { tuple =>
+      val keys = tuple._1.split("\\.")
+      if (keys.size == 2) {
+        val resourceName = keys.head
+        if (keys.last.equals(SPARK_RESOURCE_COUNT)) {
+          val resourceCount = tuple._2.toInt
+          resourceCountMap.put(resourceName, resourceCount)
+        } else if (keys.last.equals(SPARK_RESOURCE_ADDRESSES)) {
+          val resourceAddresses = tuple._2.split(",").map(_.trim())
+          resourceAddressMap.put(resourceName, resourceAddresses)
+        }
+      }
+    }
+
+    resourceCountMap.map { case (resourceName, resourceCount) =>
+      if (resourceAddressMap.contains(resourceName)) {
+        val resourceAddresses = resourceAddressMap.get(resourceName).get
+        (resourceName, new ResourceInformation(resourceName, "", 
resourceCount, resourceAddresses))
+      } else {
+        (resourceName, new ResourceInformation(resourceName, "", 
resourceCount, Array.empty))
+      }
+    }.toMap
+  }
+
+  /**
+   * Get task resource requirements.
+   */
+  private[spark] def getResourceRequirements(): Map[String, Int] = {
 
 Review comment:
   `getTaskResourceRequirements`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to