tgravescs commented on a change in pull request #25047: 
[WIP][SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone
URL: https://github.com/apache/spark/pull/25047#discussion_r304007299
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
 ##########
 @@ -70,6 +86,258 @@ private[spark] object ResourceUtils extends Logging {
   // internally we currently only support addresses, so its just an integer 
count
   val AMOUNT = "amount"
 
+  /**
+   * Assign resources to workers from the same host to avoid address conflict.
+   * @param conf SparkConf
+   * @param componentName spark.driver / spark.worker
+   * @param resources the resources found by worker/driver on the host
+   * @param resourceRequirements the resource requirements asked by the 
worker/driver
+   * @param pid the process id of worker/driver to acquire resources.
+   * @return allocated resources for the worker/driver or throws exception if 
can't
+   *         meet worker/driver's requirement
+   */
+  def acquireResources(
+      conf: SparkConf,
+      componentName: String,
+      resources: Map[String, ResourceInformation],
+      resourceRequirements: Seq[ResourceRequirement],
+      pid: Int)
+    : Map[String, ResourceInformation] = {
+    if (resourceRequirements.isEmpty) {
+      return Map.empty
+    }
+    val lock = acquireLock(conf)
+    val resourcesFile = new File(getOrCreateResourcesDir(conf), 
ALLOCATED_RESOURCES_FILE)
+    var origAllocation = Seq.empty[StandaloneResourceAllocation]
+    var allocated = {
+      if (resourcesFile.exists()) {
+        origAllocation = allocatedResources(resourcesFile.getPath)
+        val allocations = origAllocation.map { resource =>
+          val thePid = resource.pid
+          val resourceMap = {
+            resource.allocations.map { allocation =>
+              allocation.id.resourceName -> allocation.addresses.toArray
+            }.toMap
+          }
+          thePid -> resourceMap
+        }.toMap
+        allocations
+      } else {
+        Map.empty[Int, Map[String, Array[String]]]
+      }
+    }
+
+    var newAssignments: Map[String, Array[String]] = null
+    // Whether we've checked process status and we'll only do the check at 
most once.
+    var checked = false
+    // Whether we need to keep allocating for the worker/driver and we'll only 
go through
+    // the loop at most twice.
+    var keepAllocating = true
+    while(keepAllocating) {
+      keepAllocating = false
+      val pidsToCheck = mutable.Set[Int]()
+      newAssignments = resourceRequirements.map { req =>
+        val rName = req.resourceName
+        val amount = req.amount
+        // initially, we must have available.length >= amount as we've done 
pre-check previously
+        var available = resources(rName).addresses
+        allocated.foreach { a =>
+          val thePid = a._1
+          val resourceMap = a._2
+          val assigned = resourceMap.getOrElse(rName, Array.empty)
+          val retained = available.diff(assigned)
+          if (retained.length < available.length && !checked) {
+            pidsToCheck += thePid
+          }
+          if (retained.length >= amount) {
+            available = retained
+          } else if (checked) {
+            keepAllocating = false
+            releaseLock(lock)
+            throw new SparkException(s"No more resources available since 
they've already" +
+              s" assigned to other workers/drivers.")
+          } else {
+            keepAllocating = true
+          }
+        }
+        val assigned = {
+          if (keepAllocating) {
+            val (invalid, valid) = allocated.partition { a =>
+              pidsToCheck(a._1) && !(Utils.isTesting || 
Utils.isProcessRunning(a._1))}
+            allocated = valid
+            origAllocation = origAllocation.filter(allocation => 
!invalid.contains(allocation.pid))
+            checked = true
+            // note that this is a meaningless return value, just to avoid 
creating any new object
+            available
+          } else {
+            available.take(amount)
+          }
+        }
+        rName -> assigned
+      }.toMap
+
+    }
+    val newAllocation = {
+      val allocations = newAssignments.map{ case (rName, addresses) =>
 
 Review comment:
   space after map

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to