mengxr commented on issue #24615: [SPARK-27488][CORE] Driver interface to support GPU resources URL: https://github.com/apache/spark/pull/24615#issuecomment-494580663 @tgravescs About the conf utilities, I think it hurts readability because we didn't have the abstraction at resource level. We went to manipulate confs directly. I have a proposal (not tested against your code) below. Let me know how you like the proposal. We can also merge the PR in current form and do the refactoring in a later PR. ~~~scala /** * * @param componentName spark.driver / spark.executor / spark.task * @param resourceName gpu, fpga */ private[spark] case class ResourceID(componentName: String, resourceName: String) { def confPrefix: String = s"$componentName.resource.$resourceName." // with ending dot } private[spark] case class ResourceRequest(id: ResourceID, count: Double, discoveryScript: String) // We can define toResourceInfo() to map it to a public ResourceInformation instance. private[spark] case class ResourceAllocation(id: ResourceID, addresses: Seq[String]) object ResourceUtils { val COUNT = "count" val DISCOVERY_SCRIPT = "discoveryScript" val ADDRESSES = "addresses" def parseRequest(sparkConf: SparkConf, resourceId: ResourceID): ResourceRequest = { val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap val quantity = settings(COUNT).toDouble val discoveryScript = settings(DISCOVERY_SCRIPT) ResourceRequest(resourceId, quantity, discoveryScript) } def listResourceIds(sparkConf: SparkConf, componentName: String): Seq[ResourceID] = { sparkConf.getAllWithPrefix(s"$componentName.resource.").map { case (key, _) => key.substring(0, key.indexOf('.')) }.toSet.toSeq.map(name => ResourceID(componentName, name)) } def parseAllRequests( sparkConf: SparkConf, componentName: String): Seq[ResourceRequest] = { listResourceIds(sparkConf, componentName).map { id => parseRequest(sparkConf, id) } } def parseResourceAllocationJson(resources: String): Seq[ResourceAllocation] = { implicit val formats = DefaultFormats parse(resources).extract[Seq[ResourceAllocation]] } def discoverResource(resourceRequest: ResourceRequest): ResourceAllocation = ??? def parseAllocatedOrDiscoverResources( sparkConf: SparkConf, componentName: String, resourcesOpt: Option[String]): Seq[ResourceAllocation] = { val allocated = resourcesOpt.flatMap(parseResourceAllocationJson(_)) .filter(_.id.componentName == componentName) .toSeq val otherResourceIds = listResourceIds(sparkConf, componentName).diff(allocated.map(_.id)) allocated ++ otherResourceIds.map { id => val request = parseRequest(sparkConf, id) discoverResource(request) } } def assertResourceAllocationMeetsRequest( allocation: ResourceAllocation, request: ResourceRequest): Unit = { require(allocation.id == request.id && allocation.addresses.size >= request.count) } def assertAllResourceAllocationMeetRequests( allocations: Seq[ResourceAllocation], requests: Seq[ResourceRequest]): Unit = { val allocated = allocations.map(x => x.id -> x).toMap requests.foreach { r => assertResourceAllocationMeetsRequest(allocated(r.id), r) } } // also add some methods to convert requests to Spark conf entries to simplify tests } ~~~
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
