tgravescs commented on a change in pull request #25047:
[WIP][SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone
URL: https://github.com/apache/spark/pull/25047#discussion_r301149575
##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
##########
@@ -70,6 +79,186 @@ private[spark] object ResourceUtils extends Logging {
// internally we currently only support addresses, so its just an integer
count
val AMOUNT = "amount"
+ /**
+ * Assign resources to workers from the same host to avoid address conflict.
+ * @param componentName spark.driver / spark.worker
+ * @param resources the resources found by worker on the host
+ * @param resourceRequests map resource name to the request amount by the
worker
+ * @return allocated resources for the worker/driver or throws exception if
can't
+ * meet worker/driver's requirement
+ */
+ def acquireResources(
+ componentName: String,
+ resources: Map[String, ResourceInformation],
+ resourceRequests: Map[String, Int])
+ : Map[String, ResourceInformation] = {
+ if (resourceRequests.isEmpty) {
+ return Map.empty
+ }
+ val lock = acquireLock()
+ val resourcesFile = new File(getOrCreateResourcesDir(),
ALLOCATED_RESOURCES_FILE)
+ val allocated = {
+ if (resourcesFile.exists()) {
+ val allocated = parseAllocatedFromJsonFile(resourcesFile.getPath)
+ allocated.map { allocation => allocation.id.resourceName ->
allocation.addresses.toArray}
+ } else {
+ Map.empty
+ }
+ }.toMap
+
+ val newAssigned = {
+ resourceRequests.map{ case (rName, amount) =>
+ val assigned = allocated.getOrElse(rName, Array.empty)
+ val available = resources(rName).addresses.diff(assigned)
+ val newAssigned = {
+ if (available.length >= amount) {
+ available.take(amount)
+ } else {
+ releaseLock(lock)
+ throw new SparkException(s"No more resources available since
they've already" +
+ s" assigned to other workers/drivers.")
+ }
+ }
+ rName -> new ResourceInformation(rName, newAssigned)
+ }
+ }
+
+ val newAllocated = {
+ val newResources =
newAssigned.keys.partition(allocated.keySet.contains)._2.toSet
+ val allResources = newResources ++ allocated.keys
+ allResources.map { rName =>
+ val oldAddrs = allocated.getOrElse(rName, Array.empty)
+ val newAddrs = Option(newAssigned.getOrElse(rName,
null)).map(_.addresses)
+ .getOrElse(Array.empty)
+ rName -> new ResourceInformation(rName, Array.concat(oldAddrs,
newAddrs))
+ }.toMap
+ }
+ writeResourceAllocationJson(SPARK_WORKER_PREFIX, newAllocated,
resourcesFile)
+ logInfo("==============================================================")
+ logInfo(s"Acquired isolated resources for
$componentName:\n${newAssigned.mkString("\n")}")
+ logInfo("==============================================================")
+ releaseLock(lock)
+ newAssigned
+ }
+
+ /**
+ * Free the indicated resources to make those resources be available for
other
+ * workers on the same host.
+ * @param toRelease the resources expected to release
+ */
+ def releaseResources(toRelease: Map[String, ResourceInformation]): Unit = {
+ if (toRelease.nonEmpty) {
+ val lock = acquireLock()
+ val resourcesFile = new File(getOrCreateResourcesDir(),
ALLOCATED_RESOURCES_FILE)
+ if (resourcesFile.exists()) {
+ val allocated = {
+ val allocated = parseAllocatedFromJsonFile(resourcesFile.getPath)
+ allocated.map { allocation => allocation.id.resourceName ->
allocation.addresses.toArray}
+ }.toMap
+ val newAllocated = {
+ allocated.map { case (rName, addresses) =>
+ val retained = addresses.diff(Option(toRelease.getOrElse(rName,
null))
+ .map(_.addresses).getOrElse(Array.empty))
+ rName -> new ResourceInformation(rName, retained)
+ }.filter(_._2.addresses.nonEmpty)
+ }
+ if (newAllocated.nonEmpty) {
+ writeResourceAllocationJson(SPARK_WORKER_PREFIX, newAllocated,
resourcesFile)
+ } else {
+ if (!resourcesFile.delete()) {
+ logWarning(s"Failed to delete $ALLOCATED_RESOURCES_FILE.")
+ }
+ }
+ }
+ releaseLock(lock)
+ }
+ }
+
+ private def acquireLock(): FileLock = {
+ val resourcesDir = getOrCreateResourcesDir()
+ val lockFile = new File(resourcesDir, RESOURCES_LOCK_FILE)
+ val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel
+ var keepTry = true
+ var lock: FileLock = null
+ while(keepTry) {
+ try {
+ lock = lockFileChannel.lock()
+ logInfo(s"Acquired lock on $RESOURCES_LOCK_FILE.")
+ keepTry = false
+ } catch {
+ case e: OverlappingFileLockException =>
+ // This exception throws when we're in LocalSparkCluster mode.
FileLock is designed
+ // to be used across JVMs, but our LocalSparkCluster is designed to
launch multiple
+ // workers in the same JVM. As a result, when an worker in
LocalSparkCluster try to
+ // acquire the lock on `resources.lock` which already locked by
other worker, we'll
+ // hit this exception. So, we should manually control it.
+ keepTry = true
+ // there may be multiple workers race for the lock,
+ // so, sleep for a random time to avoid possible conflict
+ val duration = Random.nextInt(1000) + 1000
+ Thread.sleep(duration)
+ }
+ }
+ assert(lock != null, s"Acquired null lock on $RESOURCES_LOCK_FILE.")
+ lock
+ }
+
+ private def releaseLock(lock: FileLock): Unit = {
+ try {
+ lock.release()
+ lock.channel().close()
+ logInfo(s"Released lock on $RESOURCES_LOCK_FILE.")
+ } catch {
+ case e: Exception =>
+ logWarning(s"Error while releasing lock on $RESOURCES_LOCK_FILE.", e)
+ }
+ }
+
+ private def getOrCreateResourcesDir(): File = {
+ val sparkHome = if (Utils.isTesting) {
+ assert(sys.props.contains("spark.test.home"), "spark.test.home is not
set!")
+ new File(sys.props("spark.test.home"))
+ } else {
+ new File(sys.env.getOrElse("SPARK_HOME", "."))
+ }
+ val resourceDir = new File(sparkHome, SPARK_RESOURCES_DIRECTORY)
Review comment:
how do you know user has permissions to write to sparkHome? If its not set
then it goes to . and you don't know if that is going to be shared by multiple
users, thus defeating the purpose. Seems like this should at least be
configurable.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]