Ngone51 commented on a change in pull request #25047: [WIP][SPARK-27371][CORE]
Support GPU-aware resources scheduling in Standalone
URL: https://github.com/apache/spark/pull/25047#discussion_r302882692
##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
##########
@@ -70,6 +87,240 @@ private[spark] object ResourceUtils extends Logging {
// internally we currently only support addresses, so its just an integer
count
val AMOUNT = "amount"
+ /**
+ * Assign resources to workers from the same host to avoid address conflict.
+ * @param conf SparkConf
+ * @param componentName spark.driver / spark.worker
+ * @param resources the resources found by worker/driver on the host
+ * @param resourceRequirements the resource requirements asked by the
worker/driver
+ * @param pid the process id of worker/driver to acquire resources.
+ * @return allocated resources for the worker/driver or throws exception if
can't
+ * meet worker/driver's requirement
+ */
+ def acquireResources(
+ conf: SparkConf,
+ componentName: String,
+ resources: Map[String, ResourceInformation],
+ resourceRequirements: Seq[ResourceRequirement],
+ pid: Int)
+ : Map[String, ResourceInformation] = {
+ if (resourceRequirements.isEmpty) {
+ return Map.empty
+ }
+ val lock = acquireLock(conf)
+ val resourcesFile = new File(getOrCreateResourcesDir(conf),
ALLOCATED_RESOURCES_FILE)
+ val allocated = {
+ if (resourcesFile.exists()) {
+ val livingPids = livingWorkerAndDriverPids
+ val allocations = allocatedResources(resourcesFile.getPath).filter {
+ allocation => livingPids.contains(allocation.pid) || Utils.isTesting
+ }
+ allocations
+ } else {
+ Seq.empty
+ }
+ }
+
+ // combine all allocated resources into a single Array by resource type
+ val allocatedAddresses = {
+ val rNameToAddresses = new mutable.HashMap[String,
mutable.ArrayBuffer[String]]()
+ allocated.map(_.allocations).foreach { allocations =>
+ allocations.foreach { allocation =>
+ val addresses =
rNameToAddresses.getOrElseUpdate(allocation.id.resourceName,
+ mutable.ArrayBuffer())
+ addresses ++= allocation.addresses
+ }
+ }
+ rNameToAddresses.map(r => r._1 -> r._2.toArray)
+ }
+
+ val newAllocated = {
+ val allocations = resourceRequirements.map{ req =>
+ val rName = req.resourceName
+ val amount = req.amount
+ val assigned = allocatedAddresses.getOrElse(rName, Array.empty)
+ val available = resources(rName).addresses.diff(assigned)
+ val newAssigned = {
+ if (available.length >= amount) {
+ available.take(amount)
+ } else {
+ releaseLock(lock)
+ throw new SparkException(s"No more resources available since
they've already" +
+ s" assigned to other workers/drivers.")
+ }
+ }
+ ResourceAllocation(ResourceID(componentName, rName), newAssigned)
+ }
+ StandaloneResourceAllocation(pid, allocations)
+ }
+ writeResourceAllocationJson(SPARK_WORKER_PREFIX, allocated ++
Seq(newAllocated), resourcesFile)
+ val acquired = {
+ newAllocated.allocations.map { allocation =>
+ allocation.id.resourceName -> allocation.toResourceInformation
+ }.toMap
+ }
+ logInfo("==============================================================")
+ logInfo(s"Acquired isolated resources for
$componentName:\n${acquired.mkString("\n")}")
+ logInfo("==============================================================")
+ releaseLock(lock)
+ acquired
+ }
+
+ /**
+ * Free the indicated resources to make those resources be available for
other
+ * workers on the same host.
+ * @param conf SparkConf
+ * @param componentName spark.driver / spark.worker
+ * @param toRelease the resources expected to release
+ * @param pid the process id of worker/driver to release resources.
+ */
+ def releaseResources(
+ conf: SparkConf,
+ componentName: String,
+ toRelease: Map[String, ResourceInformation],
+ pid: Int)
+ : Unit = {
+ if (toRelease.nonEmpty) {
+ val lock = acquireLock(conf)
+ val resourcesFile = new File(getOrCreateResourcesDir(conf),
ALLOCATED_RESOURCES_FILE)
+ if (resourcesFile.exists()) {
+ val (target, others) =
allocatedResources(resourcesFile.getPath).partition(_.pid == pid)
+ if (target.nonEmpty) {
+ val rNameToAddresses = {
+ target.head.allocations.map { allocation =>
+ allocation.id.resourceName -> allocation.addresses
+ }.toMap
+ }
+ val allocations = {
+ rNameToAddresses.map { case (rName, addresses) =>
+ val retained = addresses.diff(Option(toRelease.getOrElse(rName,
null))
+ .map(_.addresses).getOrElse(Array.empty))
+ rName -> retained
+ }
+ .filter(_._2.nonEmpty)
+ .map{ case (rName, addresses) =>
+ ResourceAllocation(ResourceID(componentName, rName), addresses)
+ }.toSeq
+ }
+ if (allocations.nonEmpty) {
+ val newAllocation = StandaloneResourceAllocation(pid, allocations)
+ writeResourceAllocationJson(componentName, others ++
Seq(newAllocation), resourcesFile)
+ } else {
+ if (others.isEmpty) {
+ if (!resourcesFile.delete()) {
+ logWarning(s"Failed to delete $ALLOCATED_RESOURCES_FILE.")
+ }
+ } else {
+ writeResourceAllocationJson(componentName, others, resourcesFile)
+ }
+ }
+ }
+ }
+ releaseLock(lock)
+ }
+ }
+
+ private def acquireLock(conf: SparkConf): FileLock = {
+ val resourcesDir = getOrCreateResourcesDir(conf)
+ val lockFile = new File(resourcesDir, RESOURCES_LOCK_FILE)
+ val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel
+ var keepTry = true
+ var lock: FileLock = null
+ while(keepTry) {
+ try {
+ lock = lockFileChannel.lock()
+ logInfo(s"Acquired lock on $RESOURCES_LOCK_FILE.")
+ keepTry = false
+ } catch {
+ case e: OverlappingFileLockException =>
+ // This exception throws when we're in LocalSparkCluster mode.
FileLock is designed
+ // to be used across JVMs, but our LocalSparkCluster is designed to
launch multiple
+ // workers in the same JVM. As a result, when an worker in
LocalSparkCluster try to
+ // acquire the lock on `resources.lock` which already locked by
other worker, we'll
+ // hit this exception. So, we should manually control it.
+ keepTry = true
+ // there may be multiple workers race for the lock,
+ // so, sleep for a random time to avoid possible conflict
+ val duration = Random.nextInt(1000) + 1000
+ Thread.sleep(duration)
+ }
+ }
+ assert(lock != null, s"Acquired null lock on $RESOURCES_LOCK_FILE.")
+ lock
+ }
+
+ private def releaseLock(lock: FileLock): Unit = {
+ try {
+ lock.release()
+ lock.channel().close()
+ logInfo(s"Released lock on $RESOURCES_LOCK_FILE.")
+ } catch {
+ case e: Exception =>
+ logWarning(s"Error while releasing lock on $RESOURCES_LOCK_FILE.", e)
+ }
+ }
+
+ private def getOrCreateResourcesDir(conf: SparkConf): File = {
+ val coordinateDir = new File(conf.get(SPARK_RESOURCES_DIR).getOrElse {
+ val sparkHome = if (Utils.isTesting) {
+ assert(sys.props.contains("spark.test.home"), "spark.test.home is not
set!")
+ sys.props("spark.test.home")
+ } else {
+ sys.env.getOrElse("SPARK_HOME", ".")
+ }
+ sparkHome
+ })
+ val resourceDir = new File(coordinateDir, SPARK_RESOURCES_DIRECTORY)
+ if (!resourceDir.exists()) {
+ Utils.createDirectory(resourceDir)
+ }
+ resourceDir
+ }
+
+ private def livingWorkerAndDriverPids: Seq[Int] = {
+ val processIds = try {
+ Utils.executeAndGetOutput(Seq("jps"))
Review comment:
Probably, we can not rely on pid dir to judge whether the worker is living,
because
1) a Worker killed by `kill [-9] pid` can not remove its pid dir;
2) pid dir default to locate in /tmp, so that it may be removed unexpectedly
while Worker is still living;
So, I used `jps` here. WDYT ? @tgravescs
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]