[GitHub] [spark] abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling
abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling URL: https://github.com/apache/spark/pull/26078#discussion_r343381352 ## File path: core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala ## @@ -86,8 +103,8 @@ trait ResourceAllocator { s"address $address doesn't exist.") } val isAvailable = addressAvailabilityMap(address) - if (!isAvailable) { -addressAvailabilityMap(address) = true + if (isAvailable < slotsPerAddress) { +addressAvailabilityMap(address) = addressAvailabilityMap(address) + 1 Review comment: https://issues.apache.org/jira/browse/SPARK-29780 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling
abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling URL: https://github.com/apache/spark/pull/26078#discussion_r341745050 ## File path: core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala ## @@ -86,8 +103,8 @@ trait ResourceAllocator { s"address $address doesn't exist.") } val isAvailable = addressAvailabilityMap(address) - if (!isAvailable) { -addressAvailabilityMap(address) = true + if (isAvailable < slotsPerAddress) { +addressAvailabilityMap(address) = addressAvailabilityMap(address) + 1 Review comment: `ResourceAllocator` _should_ only used from a single thread (it is documented that way). For its public interface, here is the access pattern: - `CoarseGrainedSchedulerBackend` and `Master` event loop are calling into `acquire` and `release`. They are also using `availableAddrs`, and are calling `toBuffer` on it before handing off that state. This is a single thread pattern. - A class extending `ResourceAllocator` (`WorkerResourceInfo`), has some potential issues: The `WorkerInfo` class is calling into `availableAddrs` and `assignedAddrs` but those calls appear to be coming from the UI (looking at the `resourcesInfo*` functions), e.g. `JsonProtocol` and `MasterPage` call this. There are other calls to the `resourceInfo*` functions, but those are from the event loop. Is this what you were worried about @jiangxb1987? Note this issue is not directly related to this PR, as this PR didn't change that access pattern, or the datastructure. That said, I do believe that a request from the UI could result in at least an exception (if not worse). Let me know and I can handle here, or open another PR that would address it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling
abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling URL: https://github.com/apache/spark/pull/26078#discussion_r341699372 ## File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ## @@ -68,6 +69,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME)) private val createTimeNs = System.nanoTime() + private val taskResources: Map[String, ResourceRequirement] = Review comment: Good suggestion, making this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling
abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling URL: https://github.com/apache/spark/pull/26078#discussion_r341697161 ## File path: core/src/main/scala/org/apache/spark/SparkContext.scala ## @@ -2790,7 +2790,10 @@ object SparkContext extends Logging { s" = ${taskReq.amount}") } // Compare and update the max slots each executor can provide. -val resourceNumSlots = execAmount / taskReq.amount +// If the configured amount per task was < 1.0, a task is subdividing +// executor resources. If the amount per task was > 1.0, the task wants +// multiple executor resources. +val resourceNumSlots = Math.floor(execAmount * taskReq.numParts / taskReq.amount).toInt Review comment: I think that you could go either way. If you feel strongly that this is confusing, I can try to change the case class. I like the formula here because it shows you explicitly how slots are computed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling
abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling URL: https://github.com/apache/spark/pull/26078#discussion_r341696045 ## File path: core/src/main/scala/org/apache/spark/SparkContext.scala ## @@ -2800,11 +2803,19 @@ object SparkContext extends Logging { // large enough if any task resources were specified. taskResourceRequirements.foreach { taskReq => val execAmount = executorResourcesAndAmounts(taskReq.resourceName) -if (taskReq.amount * numSlots < execAmount) { +if ((numSlots * taskReq.amount / taskReq.numParts) < execAmount) { + val taskReqStr = if (taskReq.numParts > 1) { +s"${taskReq.amount}/${taskReq.numParts}" Review comment: Yes that is correct. Though I am not sure this hurts. I like the idea of seeing it in text if it is misconfigured, but let me know if you want that changed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling
abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling URL: https://github.com/apache/spark/pull/26078#discussion_r341695371 ## File path: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala ## @@ -94,8 +112,28 @@ private[spark] object ResourceUtils extends Logging { def parseResourceRequirements(sparkConf: SparkConf, componentName: String) : Seq[ResourceRequirement] = { -parseAllResourceRequests(sparkConf, componentName).map { request => - ResourceRequirement(request.id.resourceName, request.amount) +listResourceIds(sparkConf, componentName).map { resourceId => + val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap + val amountDouble = settings.getOrElse(AMOUNT, +throw new SparkException(s"You must specify an amount for ${resourceId.resourceName}") + ).toDouble Review comment: I am adding a header to the case class `ResourceRequirement` to help clear this. `AMOUNT` only makes sense as floating point when working with task resource requests. `ResourceRequirements` are at the executor level, and it doesn't make sense to have a fractional resource there, so those should remain being integers. Perhaps `AMOUNT` is a bit too general, but I am not sure we want to change that at this point. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling
abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling URL: https://github.com/apache/spark/pull/26078#discussion_r337521957 ## File path: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala ## @@ -47,7 +48,25 @@ private[spark] case class ResourceRequest( discoveryScript: Option[String], vendor: Option[String]) -private[spark] case class ResourceRequirement(resourceName: String, amount: Int) +/** + * Case class that represents a user's resource requirement as given by configuration + * (e.g spark.task.resource.[resource type].amount = 4) + * + * A configuration of spark.task.resource.[resource type].amount = 4, equates to: + * amount = 4, and numParts = 1. + * + * A configuration of spark.task.resource.[resource type].amount = 0.25, equates to: + * amount = 1, and numParts = 4. + * + * @param resourceName gpu, fpga, etc. + * @param amount whole units of the resource we expect (e.g. 1 gpus, 2 fpgas) + * @param numParts if not 1, the number of ways a whole resource is subdivided. + */ +private[spark] case class ResourceRequirement( +resourceName: String, +amount: Int, +numParts: Int = 1) { Review comment: not sure how that got there, but fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling
abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling URL: https://github.com/apache/spark/pull/26078#discussion_r333704705 ## File path: core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala ## @@ -30,27 +30,35 @@ trait ResourceAllocator { protected def resourceName: String protected def resourceAddresses: Seq[String] + protected def resourcesPerAddress: Int /** - * Map from an address to its availability, the value `true` means the address is available, - * while value `false` means the address is assigned. + * Map from an address to its availability, a value > 0 means the address is available, + * while value of 0 means the address is fully assigned. + * + * For task resources ([[org.apache.spark.scheduler.ExecutorResourceInfo]]), this value + * can be fractional. Review comment: good catch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling
abellina commented on a change in pull request #26078: [SPARK-29151][CORE] Support fractional resources for task resource scheduling URL: https://github.com/apache/spark/pull/26078#discussion_r333704288 ## File path: core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala ## @@ -25,10 +25,13 @@ import org.apache.spark.resource.{ResourceAllocator, ResourceInformation} * information. * @param name Resource name * @param addresses Resource addresses provided by the executor + * @param numParts Number of ways each resource is subdivided when scheduling tasks */ -private[spark] class ExecutorResourceInfo(name: String, addresses: Seq[String]) +private[spark] class ExecutorResourceInfo(name: String, addresses: Seq[String], + numParts: Int) extends ResourceInformation(name, addresses.toArray) with ResourceAllocator { override protected def resourceName = this.name override protected def resourceAddresses = this.addresses + override protected def resourcesPerAddress = numParts Review comment: One issue here is that this would be part of the `ResourceAllocator` interface, of which there are two `WorkerResourceInfo`, which sets the `resourcesPerAddress` to 1, and `ExecutorResourceInfo`, which is the variable one. Could we all it something like `slotsPerAddress`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org