tgravescs commented on a change in pull request #25047: 
[WIP][SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone
URL: https://github.com/apache/spark/pull/25047#discussion_r301149575
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
 ##########
 @@ -70,6 +79,186 @@ private[spark] object ResourceUtils extends Logging {
   // internally we currently only support addresses, so its just an integer 
count
   val AMOUNT = "amount"
 
+  /**
+   * Assign resources to workers from the same host to avoid address conflict.
+   * @param componentName spark.driver / spark.worker
+   * @param resources the resources found by worker on the host
+   * @param resourceRequests map resource name to the request amount by the 
worker
+   * @return allocated resources for the worker/driver or throws exception if 
can't
+   *         meet worker/driver's requirement
+   */
+  def acquireResources(
+      componentName: String,
+      resources: Map[String, ResourceInformation],
+      resourceRequests: Map[String, Int])
+    : Map[String, ResourceInformation] = {
+    if (resourceRequests.isEmpty) {
+      return Map.empty
+    }
+    val lock = acquireLock()
+    val resourcesFile = new File(getOrCreateResourcesDir(), 
ALLOCATED_RESOURCES_FILE)
+    val allocated = {
+      if (resourcesFile.exists()) {
+        val allocated = parseAllocatedFromJsonFile(resourcesFile.getPath)
+        allocated.map { allocation => allocation.id.resourceName -> 
allocation.addresses.toArray}
+      } else {
+        Map.empty
+      }
+    }.toMap
+
+    val newAssigned = {
+      resourceRequests.map{ case (rName, amount) =>
+        val assigned = allocated.getOrElse(rName, Array.empty)
+        val available = resources(rName).addresses.diff(assigned)
+        val newAssigned = {
+          if (available.length >= amount) {
+            available.take(amount)
+          } else {
+            releaseLock(lock)
+            throw new SparkException(s"No more resources available since 
they've already" +
+              s" assigned to other workers/drivers.")
+          }
+        }
+        rName -> new ResourceInformation(rName, newAssigned)
+      }
+    }
+
+    val newAllocated = {
+      val newResources = 
newAssigned.keys.partition(allocated.keySet.contains)._2.toSet
+      val allResources = newResources ++ allocated.keys
+      allResources.map { rName =>
+        val oldAddrs = allocated.getOrElse(rName, Array.empty)
+        val newAddrs = Option(newAssigned.getOrElse(rName, 
null)).map(_.addresses)
+          .getOrElse(Array.empty)
+        rName -> new ResourceInformation(rName, Array.concat(oldAddrs, 
newAddrs))
+      }.toMap
+    }
+    writeResourceAllocationJson(SPARK_WORKER_PREFIX, newAllocated, 
resourcesFile)
+    logInfo("==============================================================")
+    logInfo(s"Acquired isolated resources for 
$componentName:\n${newAssigned.mkString("\n")}")
+    logInfo("==============================================================")
+    releaseLock(lock)
+    newAssigned
+  }
+
+  /**
+   * Free the indicated resources to make those resources be available for 
other
+   * workers on the same host.
+   * @param toRelease the resources expected to release
+   */
+  def releaseResources(toRelease: Map[String, ResourceInformation]): Unit = {
+    if (toRelease.nonEmpty) {
+      val lock = acquireLock()
+      val resourcesFile = new File(getOrCreateResourcesDir(), 
ALLOCATED_RESOURCES_FILE)
+      if (resourcesFile.exists()) {
+        val allocated = {
+          val allocated = parseAllocatedFromJsonFile(resourcesFile.getPath)
+          allocated.map { allocation => allocation.id.resourceName -> 
allocation.addresses.toArray}
+        }.toMap
+        val newAllocated = {
+          allocated.map { case (rName, addresses) =>
+            val retained = addresses.diff(Option(toRelease.getOrElse(rName, 
null))
+              .map(_.addresses).getOrElse(Array.empty))
+            rName -> new ResourceInformation(rName, retained)
+          }.filter(_._2.addresses.nonEmpty)
+        }
+        if (newAllocated.nonEmpty) {
+          writeResourceAllocationJson(SPARK_WORKER_PREFIX, newAllocated, 
resourcesFile)
+        } else {
+          if (!resourcesFile.delete()) {
+            logWarning(s"Failed to delete $ALLOCATED_RESOURCES_FILE.")
+          }
+        }
+      }
+      releaseLock(lock)
+    }
+  }
+
+  private def acquireLock(): FileLock = {
+    val resourcesDir = getOrCreateResourcesDir()
+    val lockFile = new File(resourcesDir, RESOURCES_LOCK_FILE)
+    val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel
+    var keepTry = true
+    var lock: FileLock = null
+    while(keepTry) {
+      try {
+        lock = lockFileChannel.lock()
+        logInfo(s"Acquired lock on $RESOURCES_LOCK_FILE.")
+        keepTry = false
+      } catch {
+        case e: OverlappingFileLockException =>
+          // This exception throws when we're in LocalSparkCluster mode. 
FileLock is designed
+          // to be used across JVMs, but our LocalSparkCluster is designed to 
launch multiple
+          // workers in the same JVM. As a result, when an worker in 
LocalSparkCluster try to
+          // acquire the lock on `resources.lock` which already locked by 
other worker, we'll
+          // hit this exception. So, we should manually control it.
+          keepTry = true
+          // there may be multiple workers race for the lock,
+          // so, sleep for a random time to avoid possible conflict
+          val duration = Random.nextInt(1000) + 1000
+          Thread.sleep(duration)
+      }
+    }
+    assert(lock != null, s"Acquired null lock on $RESOURCES_LOCK_FILE.")
+    lock
+  }
+
+  private def releaseLock(lock: FileLock): Unit = {
+    try {
+      lock.release()
+      lock.channel().close()
+      logInfo(s"Released lock on $RESOURCES_LOCK_FILE.")
+    } catch {
+      case e: Exception =>
+        logWarning(s"Error while releasing lock on $RESOURCES_LOCK_FILE.", e)
+    }
+  }
+
+  private def getOrCreateResourcesDir(): File = {
+    val sparkHome = if (Utils.isTesting) {
+      assert(sys.props.contains("spark.test.home"), "spark.test.home is not 
set!")
+      new File(sys.props("spark.test.home"))
+    } else {
+      new File(sys.env.getOrElse("SPARK_HOME", "."))
+    }
+    val resourceDir = new File(sparkHome, SPARK_RESOURCES_DIRECTORY)
 
 Review comment:
   how do you know user has permissions to write to sparkHome?   If its not set 
then it goes to . and you don't know if that is going to be shared by multiple 
users, thus defeating the purpose.  Seems like this should at least be 
configurable.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to