This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3cb18d9 [SPARK-29151][CORE] Support fractional resources for task resource scheduling 3cb18d9 is described below commit 3cb18d90c441bbaa64c693e276793b670213e599 Author: Alessandro Bellina <abell...@nvidia.com> AuthorDate: Tue Nov 5 08:57:43 2019 -0600 [SPARK-29151][CORE] Support fractional resources for task resource scheduling ### What changes were proposed in this pull request? This PR adds the ability for tasks to request fractional resources, in order to be able to execute more than 1 task per resource. For example, if you have 1 GPU in the executor, and the task configuration is 0.5 GPU/task, the executor can schedule two tasks to run on that 1 GPU. ### Why are the changes needed? Currently there is no good way to share a resource such that multiple tasks can run on a single unit. This allows multiple tasks to share an executor resource. ### Does this PR introduce any user-facing change? Yes: There is a configuration change where `spark.task.resource.[resource type].amount` can now be fractional. ### How was this patch tested? Unit tests and manually on standalone mode, and yarn. Closes #26078 from abellina/SPARK-29151. Authored-by: Alessandro Bellina <abell...@nvidia.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../main/scala/org/apache/spark/SparkContext.scala | 21 ++++++-- .../apache/spark/deploy/master/WorkerInfo.scala | 1 + .../apache/spark/resource/ResourceAllocator.scala | 39 +++++++++++---- .../org/apache/spark/resource/ResourceUtils.scala | 58 ++++++++++++++++++++-- .../spark/scheduler/ExecutorResourceInfo.scala | 7 ++- .../cluster/CoarseGrainedSchedulerBackend.scala | 15 +++++- .../org/apache/spark/HeartbeatReceiverSuite.scala | 1 + .../scala/org/apache/spark/SparkConfSuite.scala | 51 +++++++++++++++++++ .../scala/org/apache/spark/SparkContextSuite.scala | 3 +- .../deploy/StandaloneDynamicAllocationSuite.scala | 1 + .../CoarseGrainedSchedulerBackendSuite.scala | 1 + .../scheduler/ExecutorResourceInfoSuite.scala | 34 +++++++++++-- docs/configuration.md | 12 +++-- 13 files changed, 214 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cad88ad..3cea2ef 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2799,7 +2799,10 @@ object SparkContext extends Logging { s" = ${taskReq.amount}") } // Compare and update the max slots each executor can provide. - val resourceNumSlots = execAmount / taskReq.amount + // If the configured amount per task was < 1.0, a task is subdividing + // executor resources. If the amount per task was > 1.0, the task wants + // multiple executor resources. + val resourceNumSlots = Math.floor(execAmount * taskReq.numParts / taskReq.amount).toInt if (resourceNumSlots < numSlots) { numSlots = resourceNumSlots limitingResourceName = taskReq.resourceName @@ -2809,11 +2812,19 @@ object SparkContext extends Logging { // large enough if any task resources were specified. taskResourceRequirements.foreach { taskReq => val execAmount = executorResourcesAndAmounts(taskReq.resourceName) - if (taskReq.amount * numSlots < execAmount) { + if ((numSlots * taskReq.amount / taskReq.numParts) < execAmount) { + val taskReqStr = if (taskReq.numParts > 1) { + s"${taskReq.amount}/${taskReq.numParts}" + } else { + s"${taskReq.amount}" + } + val resourceNumSlots = Math.floor(execAmount * taskReq.numParts/taskReq.amount).toInt val message = s"The configuration of resource: ${taskReq.resourceName} " + - s"(exec = ${execAmount}, task = ${taskReq.amount}) will result in wasted " + - s"resources due to resource ${limitingResourceName} limiting the number of " + - s"runnable tasks per executor to: ${numSlots}. Please adjust your configuration." + s"(exec = ${execAmount}, task = ${taskReqStr}, " + + s"runnable tasks = ${resourceNumSlots}) will " + + s"result in wasted resources due to resource ${limitingResourceName} limiting the " + + s"number of runnable tasks per executor to: ${numSlots}. Please adjust " + + s"your configuration." if (Utils.isTesting) { throw new SparkException(message) } else { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 4845881..0137e2b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -28,6 +28,7 @@ private[spark] case class WorkerResourceInfo(name: String, addresses: Seq[String override protected def resourceName = this.name override protected def resourceAddresses = this.addresses + override protected def slotsPerAddress: Int = 1 def acquire(amount: Int): ResourceInformation = { val allocated = availableAddrs.take(amount) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala index e64fadc..22272a0 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala @@ -30,27 +30,44 @@ trait ResourceAllocator { protected def resourceName: String protected def resourceAddresses: Seq[String] + protected def slotsPerAddress: Int /** - * Map from an address to its availability, the value `true` means the address is available, - * while value `false` means the address is assigned. + * Map from an address to its availability, a value > 0 means the address is available, + * while value of 0 means the address is fully assigned. + * + * For task resources ([[org.apache.spark.scheduler.ExecutorResourceInfo]]), this value + * can be a multiple, such that each address can be allocated up to [[slotsPerAddress]] + * times. + * * TODO Use [[OpenHashMap]] instead to gain better performance. */ - private lazy val addressAvailabilityMap = mutable.HashMap(resourceAddresses.map(_ -> true): _*) + private lazy val addressAvailabilityMap = { + mutable.HashMap(resourceAddresses.map(_ -> slotsPerAddress): _*) + } /** * Sequence of currently available resource addresses. + * + * With [[slotsPerAddress]] greater than 1, [[availableAddrs]] can contain duplicate addresses + * e.g. with [[slotsPerAddress]] == 2, availableAddrs for addresses 0 and 1 can look like + * Seq("0", "0", "1"), where address 0 has two assignments available, and 1 has one. */ - def availableAddrs: Seq[String] = addressAvailabilityMap.flatMap { case (addr, available) => - if (available) Some(addr) else None - }.toSeq + def availableAddrs: Seq[String] = addressAvailabilityMap + .flatMap { case (addr, available) => + (0 until available).map(_ => addr) + }.toSeq /** * Sequence of currently assigned resource addresses. + * + * With [[slotsPerAddress]] greater than 1, [[assignedAddrs]] can contain duplicate addresses + * e.g. with [[slotsPerAddress]] == 2, assignedAddrs for addresses 0 and 1 can look like + * Seq("0", "1", "1"), where address 0 was assigned once, and 1 was assigned twice. */ private[spark] def assignedAddrs: Seq[String] = addressAvailabilityMap .flatMap { case (addr, available) => - if (!available) Some(addr) else None + (0 until slotsPerAddress - available).map(_ => addr) }.toSeq /** @@ -65,8 +82,8 @@ trait ResourceAllocator { s"address $address doesn't exist.") } val isAvailable = addressAvailabilityMap(address) - if (isAvailable) { - addressAvailabilityMap(address) = false + if (isAvailable > 0) { + addressAvailabilityMap(address) = addressAvailabilityMap(address) - 1 } else { throw new SparkException("Try to acquire an address that is not available. " + s"$resourceName address $address is not available.") @@ -86,8 +103,8 @@ trait ResourceAllocator { s"address $address doesn't exist.") } val isAvailable = addressAvailabilityMap(address) - if (!isAvailable) { - addressAvailabilityMap(address) = true + if (isAvailable < slotsPerAddress) { + addressAvailabilityMap(address) = addressAvailabilityMap(address) + 1 } else { throw new SparkException(s"Try to release an address that is not assigned. $resourceName " + s"address $address is not assigned.") diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index 150ba09..e5ae7a9 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -27,6 +27,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.SPARK_TASK_PREFIX import org.apache.spark.util.Utils.executeAndGetOutput /** @@ -41,13 +42,44 @@ private[spark] case class ResourceID(componentName: String, resourceName: String def vendorConf: String = s"$confPrefix${ResourceUtils.VENDOR}" } +/** + * Case class that represents a resource request at the executor level. + * + * The class used when discovering resources (using the discovery script), + * or via the context as it is parsing configuration, for SPARK_EXECUTOR_PREFIX. + * + * @param id object identifying the resource + * @param amount integer amount for the resource. Note that for a request (executor level), + * fractional resources does not make sense, so amount is an integer. + * @param discoveryScript optional discovery script file name + * @param vendor optional vendor name + */ private[spark] case class ResourceRequest( id: ResourceID, amount: Int, discoveryScript: Option[String], vendor: Option[String]) -private[spark] case class ResourceRequirement(resourceName: String, amount: Int) +/** + * Case class that represents resource requirements for a component in a + * an application (components are driver, executor or task). + * + * A configuration of spark.task.resource.[resourceName].amount = 4, equates to: + * amount = 4, and numParts = 1. + * + * A configuration of spark.task.resource.[resourceName].amount = 0.25, equates to: + * amount = 1, and numParts = 4. + * + * @param resourceName gpu, fpga, etc. + * @param amount whole units of the resource we expect (e.g. 1 gpus, 2 fpgas) + * @param numParts if not 1, the number of ways a whole resource is subdivided. + * This is always an integer greater than or equal to 1, + * where 1 is whole resource, 2 is divide a resource in two, and so on. + */ +private[spark] case class ResourceRequirement( + resourceName: String, + amount: Int, + numParts: Int = 1) /** * Case class representing allocated resource addresses for a specific resource. @@ -94,8 +126,28 @@ private[spark] object ResourceUtils extends Logging { def parseResourceRequirements(sparkConf: SparkConf, componentName: String) : Seq[ResourceRequirement] = { - parseAllResourceRequests(sparkConf, componentName).map { request => - ResourceRequirement(request.id.resourceName, request.amount) + listResourceIds(sparkConf, componentName).map { resourceId => + val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap + val amountDouble = settings.getOrElse(AMOUNT, + throw new SparkException(s"You must specify an amount for ${resourceId.resourceName}") + ).toDouble + val (amount, parts) = if (componentName.equalsIgnoreCase(SPARK_TASK_PREFIX)) { + val parts = if (amountDouble <= 0.5) { + Math.floor(1.0 / amountDouble).toInt + } else if (amountDouble % 1 != 0) { + throw new SparkException( + s"The resource amount ${amountDouble} must be either <= 0.5, or a whole number.") + } else { + 1 + } + (Math.ceil(amountDouble).toInt, parts) + } else if (amountDouble % 1 != 0) { + throw new SparkException( + s"Only tasks support fractional resources, please check your $componentName settings") + } else { + (amountDouble.toInt, 1) + } + ResourceRequirement(resourceId.resourceName, amount, parts) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala index 0204760..fd04db8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala @@ -25,10 +25,15 @@ import org.apache.spark.resource.{ResourceAllocator, ResourceInformation} * information. * @param name Resource name * @param addresses Resource addresses provided by the executor + * @param numParts Number of ways each resource is subdivided when scheduling tasks */ -private[spark] class ExecutorResourceInfo(name: String, addresses: Seq[String]) +private[spark] class ExecutorResourceInfo( + name: String, + addresses: Seq[String], + numParts: Int) extends ResourceInformation(name, addresses.toArray) with ResourceAllocator { override protected def resourceName = this.name override protected def resourceAddresses = this.addresses + override protected def slotsPerAddress: Int = numParts } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 6e990d1..ea045e6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -33,6 +33,7 @@ import org.apache.spark.executor.ExecutorLogUrlHandler import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Network._ +import org.apache.spark.resource.ResourceRequirement import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -68,6 +69,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME)) private val createTimeNs = System.nanoTime() + private val taskResourceNumParts: Map[String, Int] = + if (scheduler.resourcesReqsPerTask != null) { + scheduler.resourcesReqsPerTask.map(req => req.resourceName -> req.numParts).toMap + } else { + Map.empty + } + // Accessing `executorDataMap` in the inherited methods from ThreadSafeRpcEndpoint doesn't need // any protection. But accessing `executorDataMap` out of the inherited methods must be // protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should only @@ -215,7 +223,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val resourcesInfo = resources.map{ case (k, v) => - (v.name, new ExecutorResourceInfo(v.name, v.addresses))} + (v.name, + new ExecutorResourceInfo(v.name, v.addresses, + // tell the executor it can schedule resources up to numParts times, + // as configured by the user, or set to 1 as that is the default (1 task/resource) + taskResourceNumParts.getOrElse(v.name, 1))) + } val data = new ExecutorData(executorRef, executorAddress, hostname, cores, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes, resourcesInfo) diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 2a25171..595fc73 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -74,6 +74,7 @@ class HeartbeatReceiverSuite scheduler = mock(classOf[TaskSchedulerImpl]) when(sc.taskScheduler).thenReturn(scheduler) when(scheduler.nodeBlacklist).thenReturn(Predef.Set[String]()) + when(scheduler.resourcesReqsPerTask).thenReturn(Seq.empty) when(scheduler.sc).thenReturn(sc) heartbeatReceiverClock = new ManualClock heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock) diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 0ac6ba2..b91759c 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -456,6 +456,57 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst assert(taskResourceRequirement.size == 1) assert(taskResourceRequirement.get(FPGA).isEmpty) } + + test("Ensure that we can configure fractional resources for a task") { + val ratioSlots = Seq( + (0.10, 10), (0.11, 9), (0.125, 8), (0.14, 7), (0.16, 6), + (0.20, 5), (0.25, 4), (0.33, 3), (0.5, 2), (1.0, 1), + // if the amount is fractional greater than 0.5 and less than 1.0 we throw + (0.51, 1), (0.9, 1), + // if the amount is greater than one is not whole, we throw + (1.5, 0), (2.5, 0), + // it's ok if the amount is whole, and greater than 1 + // parts are 1 because we get a whole part of a resource + (2.0, 1), (3.0, 1), (4.0, 1)) + ratioSlots.foreach { + case (ratio, slots) => + val conf = new SparkConf() + conf.set(TASK_GPU_ID.amountConf, ratio.toString) + if (ratio > 0.5 && ratio % 1 != 0) { + assertThrows[SparkException] { + parseResourceRequirements(conf, SPARK_TASK_PREFIX) + } + } else { + val reqs = parseResourceRequirements(conf, SPARK_TASK_PREFIX) + assert(reqs.size == 1) + assert(reqs.head.amount == Math.ceil(ratio).toInt) + assert(reqs.head.numParts == slots) + } + } + } + + test("Non-task resources are never fractional") { + val ratioSlots = Seq( + // if the amount provided is not a whole number, we throw + (0.25, 0), (0.5, 0), (1.5, 0), + // otherwise we are successful at parsing resources + (1.0, 1), (2.0, 2), (3.0, 3)) + ratioSlots.foreach { + case (ratio, slots) => + val conf = new SparkConf() + conf.set(EXECUTOR_GPU_ID.amountConf, ratio.toString) + if (ratio % 1 != 0) { + assertThrows[SparkException] { + parseResourceRequirements(conf, SPARK_EXECUTOR_PREFIX) + } + } else { + val reqs = parseResourceRequirements(conf, SPARK_EXECUTOR_PREFIX) + assert(reqs.size == 1) + assert(reqs.head.amount == slots) + assert(reqs.head.numParts == 1) + } + } + } } class Class1 {} diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 4fd8628..712ed9b 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -842,7 +842,8 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu sc = new SparkContext(conf) }.getMessage() - assert(error.contains("The configuration of resource: gpu (exec = 4, task = 2) will result " + + assert(error.contains( + "The configuration of resource: gpu (exec = 4, task = 2, runnable tasks = 2) will result " + "in wasted resources due to resource CPU limiting the number of runnable tasks per " + "executor to: 1. Please adjust your configuration.")) } diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 29c210f..1775878 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -506,6 +506,7 @@ class StandaloneDynamicAllocationSuite val taskScheduler = mock(classOf[TaskSchedulerImpl]) when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host")) when(taskScheduler.resourceOffers(any())).thenReturn(Nil) + when(taskScheduler.resourcesReqsPerTask).thenReturn(Seq.empty) when(taskScheduler.sc).thenReturn(sc) val rpcEnv = RpcEnv.create("test-rpcenv", "localhost", 0, conf, securityManager) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 6152214..8a16ae6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -282,6 +282,7 @@ private class CSMockExternalClusterManager extends ExternalClusterManager { when(ts.applicationAttemptId()).thenReturn(Some("attempt1")) when(ts.schedulingMode).thenReturn(SchedulingMode.FIFO) when(ts.nodeBlacklist()).thenReturn(Set.empty[String]) + when(ts.resourcesReqsPerTask).thenReturn(Seq.empty) ts } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala index 0109d1f..388d4e2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala @@ -26,7 +26,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { test("Track Executor Resource information") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3")) + val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1) assert(info.availableAddrs.sorted sameElements Seq("0", "1", "2", "3")) assert(info.assignedAddrs.isEmpty) @@ -43,7 +43,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { test("Don't allow acquire address that is not available") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3")) + val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1) // Acquire some addresses. info.acquire(Seq("0", "1")) assert(!info.availableAddrs.contains("1")) @@ -56,7 +56,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { test("Don't allow acquire address that doesn't exist") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3")) + val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1) assert(!info.availableAddrs.contains("4")) // Acquire an address that doesn't exist val e = intercept[SparkException] { @@ -67,7 +67,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { test("Don't allow release address that is not assigned") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3")) + val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1) // Acquire addresses info.acquire(Array("0", "1")) assert(!info.assignedAddrs.contains("2")) @@ -80,7 +80,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { test("Don't allow release address that doesn't exist") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3")) + val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1) assert(!info.assignedAddrs.contains("4")) // Release an address that doesn't exist val e = intercept[SparkException] { @@ -88,4 +88,28 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { } assert(e.getMessage.contains("Try to release an address that doesn't exist.")) } + + test("Ensure that we can acquire the same fractions of a resource from an executor") { + val slotSeq = Seq(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) + val addresses = ArrayBuffer("0", "1", "2", "3") + slotSeq.foreach { slots => + val info = new ExecutorResourceInfo(GPU, addresses, slots) + for (_ <- 0 until slots) { + addresses.foreach(addr => info.acquire(Seq(addr))) + } + + // assert that each address was assigned `slots` times + info.assignedAddrs + .groupBy(identity) + .mapValues(_.size) + .foreach(x => assert(x._2 == slots)) + + addresses.foreach { addr => + assertThrows[SparkException] { + info.acquire(Seq(addr)) + } + assert(!info.availableAddrs.contains(addr)) + } + } + } } diff --git a/docs/configuration.md b/docs/configuration.md index 7fdcf4a..97ea1fb 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1997,9 +1997,15 @@ Apart from these, the following properties are also available, and may be useful <td><code>spark.task.resource.{resourceName}.amount</code></td> <td>1</td> <td> - Amount of a particular resource type to allocate for each task. If this is specified - you must also provide the executor config <code>spark.executor.resource.{resourceName}.amount</code> - and any corresponding discovery configs so that your executors are created with that resource type. + Amount of a particular resource type to allocate for each task, note that this can be a double. + If this is specified you must also provide the executor config + <code>spark.executor.resource.{resourceName}.amount</code> and any corresponding discovery configs + so that your executors are created with that resource type. In addition to whole amounts, + a fractional amount (for example, 0.25, which means 1/4th of a resource) may be specified. + Fractional amounts must be less than or equal to 0.5, or in other words, the minimum amount of + resource sharing is 2 tasks per resource. Additionally, fractional amounts are floored + in order to assign resource slots (e.g. a 0.2222 configuration, or 1/0.2222 slots will become + 4 tasks/resource, not 5). </td> </tr> <tr> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org