tgravescs commented on a change in pull request #25047: 
[WIP][SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone
URL: https://github.com/apache/spark/pull/25047#discussion_r303999029
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
 ##########
 @@ -70,6 +86,258 @@ private[spark] object ResourceUtils extends Logging {
   // internally we currently only support addresses, so its just an integer 
count
   val AMOUNT = "amount"
 
+  /**
+   * Assign resources to workers from the same host to avoid address conflict.
+   * @param conf SparkConf
+   * @param componentName spark.driver / spark.worker
+   * @param resources the resources found by worker/driver on the host
+   * @param resourceRequirements the resource requirements asked by the 
worker/driver
+   * @param pid the process id of worker/driver to acquire resources.
+   * @return allocated resources for the worker/driver or throws exception if 
can't
+   *         meet worker/driver's requirement
+   */
+  def acquireResources(
+      conf: SparkConf,
+      componentName: String,
+      resources: Map[String, ResourceInformation],
+      resourceRequirements: Seq[ResourceRequirement],
+      pid: Int)
+    : Map[String, ResourceInformation] = {
+    if (resourceRequirements.isEmpty) {
+      return Map.empty
+    }
+    val lock = acquireLock(conf)
+    val resourcesFile = new File(getOrCreateResourcesDir(conf), 
ALLOCATED_RESOURCES_FILE)
+    var origAllocation = Seq.empty[StandaloneResourceAllocation]
+    var allocated = {
+      if (resourcesFile.exists()) {
+        origAllocation = allocatedResources(resourcesFile.getPath)
+        val allocations = origAllocation.map { resource =>
+          val thePid = resource.pid
+          val resourceMap = {
+            resource.allocations.map { allocation =>
+              allocation.id.resourceName -> allocation.addresses.toArray
+            }.toMap
+          }
+          thePid -> resourceMap
+        }.toMap
+        allocations
+      } else {
+        Map.empty[Int, Map[String, Array[String]]]
+      }
+    }
+
+    var newAssignments: Map[String, Array[String]] = null
+    // Whether we've checked process status and we'll only do the check at 
most once.
+    var checked = false
+    // Whether we need to keep allocating for the worker/driver and we'll only 
go through
+    // the loop at most twice.
+    var keepAllocating = true
+    while(keepAllocating) {
 
 Review comment:
   add space after while

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to