This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 474b1bb [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling 474b1bb is described below commit 474b1bb5c2bce2f83c4dd8e19b9b7c5b3aebd6c4 Author: Thomas Graves <tgra...@nvidia.com> AuthorDate: Thu Mar 26 09:46:36 2020 -0500 [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling ### What changes were proposed in this pull request? This is the core scheduler changes to support Stage level scheduling. The main changes here include modification to the DAGScheduler to look at the ResourceProfiles associated with an RDD and have those applied inside the scheduler. Currently if multiple RDD's in a stage have conflicting ResourceProfiles we throw an error. logic to allow this will happen in SPARK-29153. I added the interfaces to RDD to add and get the REsourceProfile so that I could add unit tests for the scheduler. These are marked as private for now until we finish the feature and will be exposed in SPARK-29150. If you think this is confusing I can remove those and remove the tests and add them back later. I modified the task scheduler to make sure to only schedule on executor that exactly match the resource profile. It will then check those executors to make sure the current resources meet the task needs before assigning it. In here I changed the way we do the custom resource assignment. Other changes here include having the cpus per task passed around so that we can properly account for them. Previously we just used the one global config, but now it can change based on the ResourceProfile. I removed the exceptions that require the cores to be the limiting resource. With this change all the places I found that used executor cores /task cpus as slots has been updated to use the ResourceProfile logic and look to see what resource is limiting. ### Why are the changes needed? Stage level sheduling feature ### Does this PR introduce any user-facing change? No ### How was this patch tested? unit tests and lots of manual testing Closes #27773 from tgravescs/SPARK-29154. Lead-authored-by: Thomas Graves <tgra...@nvidia.com> Co-authored-by: Thomas Graves <tgra...@apache.org> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../main/scala/org/apache/spark/SparkContext.scala | 27 +-- .../org/apache/spark/internal/config/Tests.scala | 9 + core/src/main/scala/org/apache/spark/rdd/RDD.scala | 27 +++ .../apache/spark/resource/ResourceProfile.scala | 42 +++-- .../spark/resource/ResourceProfileManager.scala | 11 +- .../org/apache/spark/resource/ResourceUtils.scala | 13 +- .../spark/resource/TaskResourceRequests.scala | 2 +- .../org/apache/spark/scheduler/DAGScheduler.scala | 70 +++++--- .../apache/spark/scheduler/SchedulerBackend.scala | 8 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 178 ++++++++++++++----- .../scala/org/apache/spark/scheduler/TaskSet.scala | 3 +- .../apache/spark/scheduler/TaskSetManager.scala | 32 ++-- .../org/apache/spark/scheduler/WorkerOffer.scala | 5 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 24 ++- .../scheduler/local/LocalSchedulerBackend.scala | 9 +- .../deploy/StandaloneDynamicAllocationSuite.scala | 1 - .../CoarseGrainedExecutorBackendSuite.scala | 4 +- .../CoarseGrainedSchedulerBackendSuite.scala | 13 +- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 115 +++++++++++- .../scheduler/ExternalClusterManagerSuite.scala | 3 +- .../org/apache/spark/scheduler/FakeTask.scala | 31 +++- .../org/apache/spark/scheduler/PoolSuite.scala | 4 +- .../scheduler/SchedulerIntegrationSuite.scala | 5 +- .../spark/scheduler/TaskSchedulerImplSuite.scala | 192 ++++++++++++++++++++- .../spark/scheduler/TaskSetManagerSuite.scala | 91 ++++++---- .../mesos/MesosFineGrainedSchedulerBackend.scala | 3 +- 26 files changed, 704 insertions(+), 218 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cdb98db..588e7dc 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1597,13 +1597,17 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Get the max number of tasks that can be concurrent launched currently. + * Get the max number of tasks that can be concurrent launched based on the ResourceProfile + * being used. * Note that please don't cache the value returned by this method, because the number can change * due to add/remove executors. * + * @param rp ResourceProfile which to use to calculate max concurrent tasks. * @return The max number of tasks that can be concurrent launched currently. */ - private[spark] def maxNumConcurrentTasks(): Int = schedulerBackend.maxNumConcurrentTasks() + private[spark] def maxNumConcurrentTasks(rp: ResourceProfile): Int = { + schedulerBackend.maxNumConcurrentTasks(rp) + } /** * Update the cluster manager on our scheduling needs. Three bits of information are included @@ -2764,23 +2768,10 @@ object SparkContext extends Logging { // others its checked in ResourceProfile. def checkResourcesPerTask(executorCores: Int): Unit = { val taskCores = sc.conf.get(CPUS_PER_TASK) - validateTaskCpusLargeEnough(executorCores, taskCores) - val defaultProf = sc.resourceProfileManager.defaultResourceProfile - // TODO - this is temporary until all of stage level scheduling feature is integrated, - // fail if any other resource limiting due to dynamic allocation and scheduler using - // slots based on cores - val cpuSlots = executorCores/taskCores - val limitingResource = defaultProf.limitingResource(sc.conf) - if (limitingResource.nonEmpty && !limitingResource.equals(ResourceProfile.CPUS) && - defaultProf.maxTasksPerExecutor(sc.conf) < cpuSlots) { - throw new IllegalArgumentException("The number of slots on an executor has to be " + - "limited by the number of cores, otherwise you waste resources and " + - "some scheduling doesn't work properly. Your configuration has " + - s"core/task cpu slots = ${cpuSlots} and " + - s"${limitingResource} = " + - s"${defaultProf.maxTasksPerExecutor(sc.conf)}. Please adjust your configuration " + - "so that all resources require same number of executor slots.") + if (!sc.conf.get(SKIP_VALIDATE_CORES_TESTING)) { + validateTaskCpusLargeEnough(sc.conf, executorCores, taskCores) } + val defaultProf = sc.resourceProfileManager.defaultResourceProfile ResourceUtils.warnOnWastedResources(defaultProf, sc.conf, Some(executorCores)) } diff --git a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala index 15610e8..33dac04 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala @@ -73,4 +73,13 @@ private[spark] object Tests { .booleanConf .createWithDefault(false) + // This configuration is used for unit tests to allow skipping the task cpus to cores validation + // to allow emulating standalone mode behavior while running in local mode. Standalone mode + // by default doesn't specify a number of executor cores, it just uses all the ones available + // on the host. + val SKIP_VALIDATE_CORES_TESTING = + ConfigBuilder("spark.testing.skipValidateCores") + .booleanConf + .createWithDefault(false) + } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a26b579..f59c587 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -42,6 +42,7 @@ import org.apache.spark.partial.BoundedDouble import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult +import org.apache.spark.resource.ResourceProfile import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.util.{BoundedPriorityQueue, Utils} import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap, @@ -1716,11 +1717,37 @@ abstract class RDD[T: ClassTag]( @Since("2.4.0") def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this)) + /** + * Specify a ResourceProfile to use when calculating this RDD. This is only supported on + * certain cluster managers and currently requires dynamic allocation to be enabled. + * It will result in new executors with the resources specified being acquired to + * calculate the RDD. + */ + // PRIVATE for now, added for testing purposes, will be made public with SPARK-29150 + @Experimental + @Since("3.0.0") + private[spark] def withResources(rp: ResourceProfile): this.type = { + resourceProfile = Option(rp) + sc.resourceProfileManager.addResourceProfile(resourceProfile.get) + this + } + + /** + * Get the ResourceProfile specified with this RDD or null if it wasn't specified. + * @return the user specified ResourceProfile or null (for Java compatibility) if + * none was specified + */ + // PRIVATE for now, added for testing purposes, will be made public with SPARK-29150 + @Experimental + @Since("3.0.0") + private[spark] def getResourceProfile(): ResourceProfile = resourceProfile.getOrElse(null) + // ======================================================================= // Other internal methods and fields // ======================================================================= private var storageLevel: StorageLevel = StorageLevel.NONE + private var resourceProfile: Option[ResourceProfile] = None /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ @transient private[spark] val creationSite = sc.getCallSite() diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index 844026d..96c456e 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -76,6 +76,21 @@ class ResourceProfile( taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt) } + /* + * This function takes into account fractional amounts for the task resource requirement. + * Spark only supports fractional amounts < 1 to basically allow for multiple tasks + * to use the same resource address. + * The way the scheduler handles this is it adds the same address the number of slots per + * address times and then the amount becomes 1. This way it re-uses the same address + * the correct number of times. ie task requirement amount=0.25 -> addrs["0", "0", "0", "0"] + * and scheduler task amount=1. See ResourceAllocator.slotsPerAddress. + */ + private[spark] def getSchedulerTaskResourceAmount(resource: String): Int = { + val taskAmount = taskResources.getOrElse(resource, + throw new SparkException(s"Resource $resource doesn't exist in profile id: $id")) + if (taskAmount.amount < 1) 1 else taskAmount.amount.toInt + } + private[spark] def getNumSlotsPerAddress(resource: String, sparkConf: SparkConf): Int = { _executorResourceSlotsPerAddr.getOrElse { calculateTasksAndLimitingResource(sparkConf) @@ -137,7 +152,7 @@ class ResourceProfile( assert(cpusPerTask > 0, "CPUs per task configuration has to be > 0") val coresPerExecutor = getExecutorCores.getOrElse(sparkConf.get(EXECUTOR_CORES)) _coresLimitKnown = true - ResourceUtils.validateTaskCpusLargeEnough(coresPerExecutor, cpusPerTask) + ResourceUtils.validateTaskCpusLargeEnough(sparkConf, coresPerExecutor, cpusPerTask) val tasksBasedOnCores = coresPerExecutor / cpusPerTask // Note that if the cores per executor aren't set properly this calculation could be off, // we default it to just be 1 in order to allow checking of the rest of the custom @@ -163,17 +178,6 @@ class ResourceProfile( numPartsPerResourceMap(rName) = parts val numTasks = ((execReq.amount * parts) / numPerTask).toInt if (taskLimit == -1 || numTasks < taskLimit) { - if (shouldCheckExecCores) { - // TODO - until resource profiles full implemented we need to error if cores not - // limiting resource because the scheduler code uses that for slots - throw new IllegalArgumentException("The number of slots on an executor has to be " + - "limited by the number of cores, otherwise you waste resources and " + - "some scheduling doesn't work properly. Your configuration has " + - s"core/task cpu slots = ${taskLimit} and " + - s"${execReq.resourceName} = ${numTasks}. " + - "Please adjust your configuration so that all resources require same number " + - "of executor slots.") - } limitingResource = rName taskLimit = numTasks } @@ -183,12 +187,6 @@ class ResourceProfile( "no corresponding task resource request was specified.") } } - if(!shouldCheckExecCores && execResourceToCheck.nonEmpty) { - // if we can't rely on the executor cores config throw a warning for user - logWarning("Please ensure that the number of slots available on your " + - "executors is limited by the number of cores to task cpus and not another " + - "custom resource.") - } if (taskResourcesToCheck.nonEmpty) { throw new SparkException("No executor resource configs were not specified for the " + s"following task configs: ${taskResourcesToCheck.keys.mkString(",")}") @@ -319,4 +317,12 @@ object ResourceProfile extends Logging { rp: ResourceProfile): Map[String, ExecutorResourceRequest] = { rp.executorResources.filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k)) } + + /* + * Get the number of cpus per task if its set in the profile, otherwise return the + * cpus per task for the default profile. + */ + private[spark] def getTaskCpusOrDefaultForProfile(rp: ResourceProfile, conf: SparkConf): Int = { + rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK)) + } } diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala index 06db946..fabc0bd 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala @@ -41,7 +41,6 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin def defaultResourceProfile: ResourceProfile = defaultProfile - private val taskCpusDefaultProfile = defaultProfile.getTaskCpus.get private val dynamicEnabled = Utils.isDynamicAllocationEnabled(sparkConf) private val master = sparkConf.getOption("spark.master") private val isNotYarn = master.isDefined && !master.get.equals("yarn") @@ -64,8 +63,10 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin isSupported(rp) // force the computation of maxTasks and limitingResource now so we don't have cost later rp.limitingResource(sparkConf) - logInfo(s"Adding ResourceProfile id: ${rp.id}") - resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp) + val res = resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp) + if (res == null) { + logInfo(s"Added ResourceProfile id: ${rp.id}") + } } /* @@ -79,8 +80,4 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin } rp } - - def taskCpusForProfileId(rpId: Int): Int = { - resourceProfileFromId(rpId).getTaskCpus.getOrElse(taskCpusDefaultProfile) - } } diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index 2227255..36ef906 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -30,7 +30,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.resource.ResourceDiscoveryPlugin import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{CPUS_PER_TASK, EXECUTOR_CORES, RESOURCES_DISCOVERY_PLUGIN, SPARK_TASK_PREFIX} -import org.apache.spark.internal.config.Tests.{RESOURCES_WARNING_TESTING} +import org.apache.spark.internal.config.Tests.{RESOURCES_WARNING_TESTING, SKIP_VALIDATE_CORES_TESTING} import org.apache.spark.util.Utils /** @@ -392,7 +392,7 @@ private[spark] object ResourceUtils extends Logging { s"${resourceRequest.id.resourceName}") } - def validateTaskCpusLargeEnough(execCores: Int, taskCpus: Int): Boolean = { + def validateTaskCpusLargeEnough(sparkConf: SparkConf, execCores: Int, taskCpus: Int): Boolean = { // Number of cores per executor must meet at least one task requirement. if (execCores < taskCpus) { throw new SparkException(s"The number of cores per executor (=$execCores) has to be >= " + @@ -414,7 +414,7 @@ private[spark] object ResourceUtils extends Logging { val coresKnown = rp.isCoresLimitKnown var limitingResource = rp.limitingResource(sparkConf) var maxTaskPerExec = rp.maxTasksPerExecutor(sparkConf) - val taskCpus = rp.getTaskCpus.getOrElse(sparkConf.get(CPUS_PER_TASK)) + val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, sparkConf) val cores = if (execCores.isDefined) { execCores.get } else if (coresKnown) { @@ -455,11 +455,12 @@ private[spark] object ResourceUtils extends Logging { taskReq.foreach { case (rName, treq) => val execAmount = execReq(rName).amount + // handles fractional + val taskAmount = rp.getSchedulerTaskResourceAmount(rName) val numParts = rp.getNumSlotsPerAddress(rName, sparkConf) - // handle fractional - val taskAmount = if (numParts > 1) 1 else treq.amount if (maxTaskPerExec < (execAmount * numParts / taskAmount)) { - val taskReqStr = s"${taskAmount}/${numParts}" + val origTaskAmount = treq.amount + val taskReqStr = s"${origTaskAmount}/${numParts}" val resourceNumSlots = Math.floor(execAmount * numParts / taskAmount).toInt val message = s"The configuration of resource: ${treq.resourceName} " + s"(exec = ${execAmount}, task = ${taskReqStr}, " + diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala index 9624b51..9a5114f 100644 --- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala +++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.resource.ResourceProfile._ /** - * A set of task resource requests. This is used in conjuntion with the ResourceProfile to + * A set of task resource requests. This is used in conjunction with the ResourceProfile to * programmatically specify the resources needed for an RDD that will be applied at the * stage level. * diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a226b65..079cf11 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -385,15 +385,17 @@ private[spark] class DAGScheduler( def createShuffleMapStage[K, V, C]( shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd + val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd) + val resourceProfile = mergeResourceProfilesForStage(resourceProfiles) checkBarrierStageWithDynamicAllocation(rdd) - checkBarrierStageWithNumSlots(rdd) + checkBarrierStageWithNumSlots(rdd, resourceProfile) checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions) val numTasks = rdd.partitions.length - val parents = getOrCreateParentStages(rdd, jobId) + val parents = getOrCreateParentStages(shuffleDeps, jobId) val id = nextStageId.getAndIncrement() val stage = new ShuffleMapStage( id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker, - ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + resourceProfile.id) stageIdToStage(id) = stage shuffleIdToMapStage(shuffleDep.shuffleId) = stage @@ -433,14 +435,32 @@ private[spark] class DAGScheduler( * the check fails consecutively beyond a configured number for a job, then fail current job * submission. */ - private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = { + private def checkBarrierStageWithNumSlots(rdd: RDD[_], rp: ResourceProfile): Unit = { val numPartitions = rdd.getNumPartitions - val maxNumConcurrentTasks = sc.maxNumConcurrentTasks + val maxNumConcurrentTasks = sc.maxNumConcurrentTasks(rp) if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) { throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks) } } + private[scheduler] def mergeResourceProfilesForStage( + stageResourceProfiles: HashSet[ResourceProfile]): ResourceProfile = { + logDebug(s"Merging stage rdd profiles: $stageResourceProfiles") + val resourceProfile = if (stageResourceProfiles.size > 1) { + // add option later to actually merge profiles - SPARK-29153 + throw new IllegalArgumentException("Multiple ResourceProfile's specified in the RDDs for " + + "this stage, please resolve the conflicting ResourceProfile's as Spark doesn't" + + "currently support merging them.") + } else { + if (stageResourceProfiles.size == 1) { + stageResourceProfiles.head + } else { + sc.resourceProfileManager.defaultResourceProfile + } + } + resourceProfile + } + /** * Create a ResultStage associated with the provided jobId. */ @@ -450,24 +470,27 @@ private[spark] class DAGScheduler( partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { + val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd) + val resourceProfile = mergeResourceProfilesForStage(resourceProfiles) checkBarrierStageWithDynamicAllocation(rdd) - checkBarrierStageWithNumSlots(rdd) + checkBarrierStageWithNumSlots(rdd, resourceProfile) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) - val parents = getOrCreateParentStages(rdd, jobId) + val parents = getOrCreateParentStages(shuffleDeps, jobId) val id = nextStageId.getAndIncrement() - val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite, - ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, + callSite, resourceProfile.id) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage } /** - * Get or create the list of parent stages for a given RDD. The new Stages will be created with - * the provided firstJobId. + * Get or create the list of parent stages for the given shuffle dependencies. The new + * Stages will be created with the provided firstJobId. */ - private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { - getShuffleDependencies(rdd).map { shuffleDep => + private def getOrCreateParentStages(shuffleDeps: HashSet[ShuffleDependency[_, _, _]], + firstJobId: Int): List[Stage] = { + shuffleDeps.map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList } @@ -485,7 +508,8 @@ private[spark] class DAGScheduler( val toVisit = waitingForVisit.remove(0) if (!visited(toVisit)) { visited += toVisit - getShuffleDependencies(toVisit).foreach { shuffleDep => + val (shuffleDeps, _) = getShuffleDependenciesAndResourceProfiles(toVisit) + shuffleDeps.foreach { shuffleDep => if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) { ancestors.prepend(shuffleDep) waitingForVisit.prepend(shuffleDep.rdd) @@ -497,10 +521,11 @@ private[spark] class DAGScheduler( } /** - * Returns shuffle dependencies that are immediate parents of the given RDD. + * Returns shuffle dependencies that are immediate parents of the given RDD and the + * ResourceProfiles associated with the RDDs for this stage. * - * This function will not return more distant ancestors. For example, if C has a shuffle - * dependency on B which has a shuffle dependency on A: + * This function will not return more distant ancestors for shuffle dependencies. For example, + * if C has a shuffle dependency on B which has a shuffle dependency on A: * * A <-- B <-- C * @@ -508,9 +533,10 @@ private[spark] class DAGScheduler( * * This function is scheduler-visible for the purpose of unit testing. */ - private[scheduler] def getShuffleDependencies( - rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { + private[scheduler] def getShuffleDependenciesAndResourceProfiles( + rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]], HashSet[ResourceProfile]) = { val parents = new HashSet[ShuffleDependency[_, _, _]] + val resourceProfiles = new HashSet[ResourceProfile] val visited = new HashSet[RDD[_]] val waitingForVisit = new ListBuffer[RDD[_]] waitingForVisit += rdd @@ -518,6 +544,7 @@ private[spark] class DAGScheduler( val toVisit = waitingForVisit.remove(0) if (!visited(toVisit)) { visited += toVisit + Option(toVisit.getResourceProfile).foreach(resourceProfiles += _) toVisit.dependencies.foreach { case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep @@ -526,7 +553,7 @@ private[spark] class DAGScheduler( } } } - parents + (parents, resourceProfiles) } /** @@ -1253,7 +1280,8 @@ private[spark] class DAGScheduler( logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( - tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) + tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties, + stage.resourceProfileId)) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 4752353..a5bba64 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import org.apache.spark.resource.ResourceProfile + /** * A backend interface for scheduling systems that allows plugging in different ones under * TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as @@ -80,12 +82,14 @@ private[spark] trait SchedulerBackend { def getDriverAttributes: Option[Map[String, String]] = None /** - * Get the max number of tasks that can be concurrent launched currently. + * Get the max number of tasks that can be concurrent launched based on the ResourceProfile + * being used. * Note that please don't cache the value returned by this method, because the number can change * due to add/remove executors. * + * @param rp ResourceProfile which to use to calculate max concurrent tasks. * @return The max number of tasks that can be concurrent launched currently. */ - def maxNumConcurrentTasks(): Int + def maxNumConcurrentTasks(rp: ResourceProfile): Int } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 1b197c4..7e2fbb4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -31,7 +31,7 @@ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ -import org.apache.spark.resource.ResourceUtils +import org.apache.spark.resource.{ResourceInformation, ResourceProfile} import org.apache.spark.rpc.RpcEndpoint import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality @@ -93,9 +93,6 @@ private[spark] class TaskSchedulerImpl( // CPUs to request per task val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK) - // Resources to request per task - val resourcesReqsPerTask = ResourceUtils.parseResourceRequirements(sc.conf, SPARK_TASK_PREFIX) - // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. Protected by `this` private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] @@ -209,7 +206,8 @@ private[spark] class TaskSchedulerImpl( override def submitTasks(taskSet: TaskSet): Unit = { val tasks = taskSet.tasks - logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") + logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks " + + "resource profile " + taskSet.resourceProfileId) this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId @@ -340,39 +338,49 @@ private[spark] class TaskSchedulerImpl( for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK && - resourcesMeetTaskRequirements(availableResources(i))) { - try { - for (task <- taskSet.resourceOffer(execId, host, maxLocality, availableResources(i))) { - tasks(i) += task - val tid = task.taskId - taskIdToTaskSetManager.put(tid, taskSet) - taskIdToExecutorId(tid) = execId - executorIdToRunningTaskIds(execId).add(tid) - availableCpus(i) -= CPUS_PER_TASK - assert(availableCpus(i) >= 0) - task.resources.foreach { case (rName, rInfo) => - // Remove the first n elements from availableResources addresses, these removed - // addresses are the same as that we allocated in taskSet.resourceOffer() since it's - // synchronized. We don't remove the exact addresses allocated because the current - // approach produces the identical result with less time complexity. - availableResources(i).getOrElse(rName, - throw new SparkException(s"Try to acquire resource $rName that doesn't exist.")) - .remove(0, rInfo.addresses.size) - } - // Only update hosts for a barrier task. - if (taskSet.isBarrier) { - // The executor address is expected to be non empty. - addressesWithDescs += (shuffledOffers(i).address.get -> task) + val taskSetRpID = taskSet.taskSet.resourceProfileId + // make the resource profile id a hard requirement for now - ie only put tasksets + // on executors where resource profile exactly matches. + if (taskSetRpID == shuffledOffers(i).resourceProfileId) { + val taskResAssignmentsOpt = resourcesMeetTaskRequirements(taskSet, availableCpus(i), + availableResources(i)) + taskResAssignmentsOpt.foreach { taskResAssignments => + try { + val prof = sc.resourceProfileManager.resourceProfileFromId(taskSetRpID) + val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) + val taskDescOption = taskSet.resourceOffer(execId, host, maxLocality, + taskResAssignments) + for (task <- taskDescOption) { + tasks(i) += task + val tid = task.taskId + taskIdToTaskSetManager.put(tid, taskSet) + taskIdToExecutorId(tid) = execId + executorIdToRunningTaskIds(execId).add(tid) + availableCpus(i) -= taskCpus + assert(availableCpus(i) >= 0) + task.resources.foreach { case (rName, rInfo) => + // Remove the first n elements from availableResources addresses, these removed + // addresses are the same as that we allocated in taskResourceAssignments since it's + // synchronized. We don't remove the exact addresses allocated because the current + // approach produces the identical result with less time complexity. + availableResources(i).getOrElse(rName, + throw new SparkException(s"Try to acquire resource $rName that doesn't exist.")) + .remove(0, rInfo.addresses.size) + } + // Only update hosts for a barrier task. + if (taskSet.isBarrier) { + // The executor address is expected to be non empty. + addressesWithDescs += (shuffledOffers(i).address.get -> task) + } + launchedTask = true } - launchedTask = true + } catch { + case e: TaskNotSerializableException => + logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") + // Do not offer resources for this task, but don't throw an error to allow other + // task sets to be submitted. + return launchedTask } - } catch { - case e: TaskNotSerializableException => - logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") - // Do not offer resources for this task, but don't throw an error to allow other - // task sets to be submitted. - return launchedTask } } } @@ -381,12 +389,81 @@ private[spark] class TaskSchedulerImpl( /** * Check whether the resources from the WorkerOffer are enough to run at least one task. + * Returns None if the resources don't meet the task requirements, otherwise returns + * the task resource assignments to give to the next task. Note that the assignments maybe + * be empty if no custom resources are used. */ - private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = { - val resourcesFree = resources.map(r => r._1 -> r._2.length) - val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask) - logDebug(s"Resources meet task requirements is: $meetsReqs") - meetsReqs + private def resourcesMeetTaskRequirements( + taskSet: TaskSetManager, + availCpus: Int, + availWorkerResources: Map[String, Buffer[String]] + ): Option[Map[String, ResourceInformation]] = { + val rpId = taskSet.taskSet.resourceProfileId + val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId) + val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf, conf) + // check if the ResourceProfile has cpus first since that is common case + if (availCpus < taskCpus) return None + // only look at the resource other then cpus + val tsResources = ResourceProfile.getCustomTaskResources(taskSetProf) + if (tsResources.isEmpty) return Some(Map.empty) + val localTaskReqAssign = HashMap[String, ResourceInformation]() + // we go through all resources here so that we can make sure they match and also get what the + // assignments are for the next task + for ((rName, taskReqs) <- tsResources) { + val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName) + availWorkerResources.get(rName) match { + case Some(workerRes) => + if (workerRes.size >= taskAmount) { + localTaskReqAssign.put(rName, new ResourceInformation(rName, + workerRes.take(taskAmount).toArray)) + } else { + return None + } + case None => return None + } + } + Some(localTaskReqAssign.toMap) + } + + // Use the resource that the resourceProfile has as the limiting resource to calculate the + // total number of slots available based on the current offers. + private def calculateAvailableSlots( + resourceProfileIds: Array[Int], + availableCpus: Array[Int], + availableResources: Array[Map[String, Buffer[String]]], + rpId: Int): Int = { + val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId) + val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) => + (id == resourceProfile.id) + } + val coresKnown = resourceProfile.isCoresLimitKnown + var limitingResource = resourceProfile.limitingResource(conf) + val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, conf) + + offersForResourceProfile.map { case (o, index) => + val numTasksPerExecCores = availableCpus(index) / taskCpus + // if limiting resource is empty then we have no other resources, so it has to be CPU + if (limitingResource == ResourceProfile.CPUS || limitingResource.isEmpty) { + numTasksPerExecCores + } else { + val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount) + .getOrElse(throw new SparkException("limitingResource returns from ResourceProfile" + + s" $resourceProfile doesn't actually contain that task resource!") + ) + // available addresses already takes into account if there are fractional + // task resource requests + val availAddrs = availableResources(index).get(limitingResource).map(_.size).getOrElse(0) + val resourceLimit = (availAddrs / taskLimit).toInt + if (!coresKnown) { + // when executor cores config isn't set, we can't calculate the real limiting resource + // and number of tasks per executor ahead of time, so calculate it now based on what + // is available. + if (numTasksPerExecCores <= resourceLimit) numTasksPerExecCores else resourceLimit + } else { + resourceLimit + } + } + }.sum } /** @@ -429,9 +506,12 @@ private[spark] class TaskSchedulerImpl( val shuffledOffers = shuffleOffers(filteredOffers) // Build a list of tasks to assign to each worker. + // Note the size estimate here might be off with different ResourceProfiles but should be + // close estimate val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) val availableResources = shuffledOffers.map(_.resources).toArray val availableCpus = shuffledOffers.map(o => o.cores).toArray + val resourceProfileIds = shuffledOffers.map(o => o.resourceProfileId).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie) for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( @@ -441,19 +521,27 @@ private[spark] class TaskSchedulerImpl( } } - // Take each TaskSet in our scheduling order, and then offer it each node in increasing order + // Take each TaskSet in our scheduling order, and then offer it to each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum + // we only need to calculate available slots if using barrier scheduling, otherwise the + // value is -1 + val numBarrierSlotsAvailable = if (taskSet.isBarrier) { + val slots = calculateAvailableSlots(resourceProfileIds, availableCpus, availableResources, + taskSet.taskSet.resourceProfileId) + slots + } else { + -1 + } // Skip the barrier taskSet if the available slots are less than the number of pending tasks. - if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { + if (taskSet.isBarrier && numBarrierSlotsAvailable < taskSet.numTasks) { // Skip the launch process. // TODO SPARK-24819 If the job requires more slots than available (both busy and free // slots), fail the job on submit. logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + - s"number of available slots is $availableSlots.") + s"number of available slots is $numBarrierSlotsAvailable.") } else { var launchedAnyTask = false // Record all the executor IDs assigned barrier tasks on. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala index 517c899..7a8ed16 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala @@ -28,7 +28,8 @@ private[spark] class TaskSet( val stageId: Int, val stageAttemptId: Int, val priority: Int, - val properties: Properties) { + val properties: Properties, + val resourceProfileId: Int) { val id: String = stageId + "." + stageAttemptId override def toString: String = "TaskSet " + id diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 18684ee..2c79233 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -90,10 +90,18 @@ private[spark] class TaskSetManager( // SPARK-30417: #cores per executor might not be set in spark conf for standalone mode, then // the value of the conf would 1 by default. However, the executor would use all the cores on // the worker. Therefore, CPUS_PER_TASK is okay to be greater than 1 without setting #cores. - // To handle this case, we assume the minimum number of slots is 1. + // To handle this case, we set slots to 1 when we don't know the executor cores. // TODO: use the actual number of slots for standalone mode. - val speculationTasksLessEqToSlots = - numTasks <= Math.max(conf.get(EXECUTOR_CORES) / sched.CPUS_PER_TASK, 1) + val speculationTasksLessEqToSlots = { + val rpId = taskSet.resourceProfileId + val resourceProfile = sched.sc.resourceProfileManager.resourceProfileFromId(rpId) + val slots = if (!resourceProfile.isCoresLimitKnown) { + 1 + } else { + resourceProfile.maxTasksPerExecutor(conf) + } + numTasks <= slots + } // For each task, tracks whether a copy of the task has succeeded. A task will also be // marked as "succeeded" if it failed with a fetch failure, in which case it should not @@ -394,7 +402,7 @@ private[spark] class TaskSetManager( execId: String, host: String, maxLocality: TaskLocality.TaskLocality, - availableResources: Map[String, Seq[String]] = Map.empty) + taskResourceAssignments: Map[String, ResourceInformation] = Map.empty) : Option[TaskDescription] = { val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist => @@ -457,18 +465,8 @@ private[spark] class TaskSetManager( // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + - s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)") - - val extraResources = sched.resourcesReqsPerTask.map { taskReq => - val rName = taskReq.resourceName - val count = taskReq.amount - val rAddresses = availableResources.getOrElse(rName, Seq.empty) - assert(rAddresses.size >= count, s"Required $count $rName addresses, but only " + - s"${rAddresses.size} available.") - // We'll drop the allocated addresses later inside TaskSchedulerImpl. - val allocatedAddresses = rAddresses.take(count) - (rName, new ResourceInformation(rName, allocatedAddresses.toArray)) - }.toMap + s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes) " + + s"taskResourceAssignments ${taskResourceAssignments}") sched.dagScheduler.taskStarted(task, info) new TaskDescription( @@ -481,7 +479,7 @@ private[spark] class TaskSetManager( addedFiles, addedJars, task.localProperties, - extraResources, + taskResourceAssignments, serializedTask) } } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala index 522dbfa..92a12f1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala @@ -19,6 +19,8 @@ package org.apache.spark.scheduler import scala.collection.mutable.Buffer +import org.apache.spark.resource.ResourceProfile + /** * Represents free resources available on an executor. */ @@ -30,4 +32,5 @@ case class WorkerOffer( // `address` is an optional hostPort string, it provide more useful information than `host` // when multiple executors are launched on the same host. address: Option[String] = None, - resources: Map[String, Buffer[String]] = Map.empty) + resources: Map[String, Buffer[String]] = Map.empty, + resourceProfileId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 6e1efda..cca8e86 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -145,7 +145,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => - executorInfo.freeCores += scheduler.CPUS_PER_TASK + val rpId = executorInfo.resourceProfileId + val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) + val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) + executorInfo.freeCores += taskCpus resources.foreach { case (k, v) => executorInfo.resourcesInfo.get(k).foreach { r => r.release(v.addresses) @@ -231,7 +234,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val resourcesInfo = resources.map { case (rName, info) => - // tell the executor it can schedule resources up to numParts times, + // tell the executor it can schedule resources up to numSlotsPerAddress times, // as configured by the user, or set to 1 as that is the default (1 task/resource) val numParts = scheduler.sc.resourceProfileManager .resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf) @@ -298,7 +301,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp Some(executorData.executorAddress.hostPort), executorData.resourcesInfo.map { case (rName, rInfo) => (rName, rInfo.availableAddrs.toBuffer) - }) + }, executorData.resourceProfileId) }.toIndexedSeq scheduler.resourceOffers(workOffers) } @@ -327,7 +330,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp Some(executorData.executorAddress.hostPort), executorData.resourcesInfo.map { case (rName, rInfo) => (rName, rInfo.availableAddrs.toBuffer) - })) + }, executorData.resourceProfileId)) scheduler.resourceOffers(workOffers) } else { Seq.empty @@ -359,7 +362,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val executorData = executorDataMap(task.executorId) // Do resources allocation here. The allocated resources will get released after the task // finishes. - executorData.freeCores -= scheduler.CPUS_PER_TASK + val rpId = executorData.resourceProfileId + val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) + val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) + executorData.freeCores -= taskCpus task.resources.foreach { case (rName, rInfo) => assert(executorData.resourcesInfo.contains(rName)) executorData.resourcesInfo(rName).acquire(rInfo.addresses) @@ -606,10 +612,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } - override def maxNumConcurrentTasks(): Int = synchronized { - executorDataMap.values.map { executor => - executor.totalCores / scheduler.CPUS_PER_TASK - }.sum + override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized { + val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, conf) + val executorsWithResourceProfile = executorDataMap.values.filter(_.resourceProfileId == rp.id) + executorsWithResourceProfile.map(_.totalCores / cpusPerTask).sum } // this function is for testing only diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 42a5afe..e2b1198 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -26,7 +26,7 @@ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.{Executor, ExecutorBackend} import org.apache.spark.internal.{config, Logging} import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} -import org.apache.spark.resource.ResourceInformation +import org.apache.spark.resource.{ResourceInformation, ResourceProfile} import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo @@ -162,7 +162,12 @@ private[spark] class LocalSchedulerBackend( override def applicationId(): String = appId - override def maxNumConcurrentTasks(): Int = totalCores / scheduler.CPUS_PER_TASK + // Doesn't support different ResourceProfiles yet + // so we expect all executors to be of same ResourceProfile + override def maxNumConcurrentTasks(rp: ResourceProfile): Int = { + val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, conf) + totalCores / cpusPerTask + } private def stop(finalState: SparkAppHandle.State): Unit = { localEndpoint.ask(StopExecutor) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index f8b9930..0b2a58d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -511,7 +511,6 @@ class StandaloneDynamicAllocationSuite val taskScheduler = mock(classOf[TaskSchedulerImpl]) when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host")) when(taskScheduler.resourceOffers(any())).thenReturn(Nil) - when(taskScheduler.resourcesReqsPerTask).thenReturn(Seq.empty) when(taskScheduler.sc).thenReturn(sc) val rpcEnv = RpcEnv.create("test-rpcenv", "localhost", 0, conf, securityManager) diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 3134a73..e0b5860 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -301,8 +301,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val taskId = 1000000 // We don't really verify the data, just pass it around. val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)) - val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", - 19, 1, mutable.Map.empty, mutable.Map.empty, new Properties, + val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", 19, + 1, mutable.Map.empty, mutable.Map.empty, new Properties, Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data) val serializedTaskDescription = TaskDescription.encode(taskDescription) backend.executor = mock[Executor] diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 7666c6c..f4745db 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -72,7 +72,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo // Ensure all executors have been launched. assert(sc.getExecutorIds().length == 4) } - assert(sc.maxNumConcurrentTasks() == 12) + assert(sc.maxNumConcurrentTasks(ResourceProfile.getOrCreateDefaultProfile(conf)) == 12) } test("compute max number of concurrent tasks can be launched when spark.task.cpus > 1") { @@ -86,7 +86,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo assert(sc.getExecutorIds().length == 4) } // Each executor can only launch one task since `spark.task.cpus` is 2. - assert(sc.maxNumConcurrentTasks() == 4) + assert(sc.maxNumConcurrentTasks(ResourceProfile.getOrCreateDefaultProfile(conf)) == 4) } test("compute max number of concurrent tasks can be launched when some executors are busy") { @@ -126,7 +126,8 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo assert(taskStarted.get()) assert(taskEnded.get() == false) // Assert we count in slots on both busy and free executors. - assert(sc.maxNumConcurrentTasks() == 4) + assert( + sc.maxNumConcurrentTasks(ResourceProfile.getOrCreateDefaultProfile(conf)) == 4) } } finally { sc.removeSparkListener(listener) @@ -187,8 +188,11 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo } test("extra resources from executor") { + import TestUtils._ + + val execCores = 3 val conf = new SparkConf() - .set(EXECUTOR_CORES, 1) + .set(EXECUTOR_CORES, execCores) .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test .set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor registrations .setMaster( @@ -294,7 +298,6 @@ private class CSMockExternalClusterManager extends ExternalClusterManager { when(ts.applicationAttemptId()).thenReturn(Some("attempt1")) when(ts.schedulingMode).thenReturn(SchedulingMode.FIFO) when(ts.nodeBlacklist()).thenReturn(Set.empty[String]) - when(ts.resourcesReqsPerTask).thenReturn(Seq.empty) ts } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4486389..33a14ce 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config import org.apache.spark.rdd.{DeterministicLevel, RDD} +import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder, TaskResourceRequests} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException} import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} @@ -2547,9 +2548,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi /** * Checks the DAGScheduler's internal logic for traversing an RDD DAG by making sure that - * getShuffleDependencies correctly returns the direct shuffle dependencies of a particular - * RDD. The test creates the following RDD graph (where n denotes a narrow dependency and s - * denotes a shuffle dependency): + * getShuffleDependenciesAndResourceProfiles correctly returns the direct shuffle dependencies + * of a particular RDD. The test creates the following RDD graph (where n denotes a narrow + * dependency and s denotes a shuffle dependency): * * A <------------s---------, * \ @@ -2558,7 +2559,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi * Here, the direct shuffle dependency of C is just the shuffle dependency on B. The direct * shuffle dependencies of E are the shuffle dependency on A and the shuffle dependency on C. */ - test("getShuffleDependencies correctly returns only direct shuffle parents") { + test("getShuffleDependenciesAndResourceProfiles correctly returns only direct shuffle parents") { val rddA = new MyRDD(sc, 2, Nil) val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1)) val rddB = new MyRDD(sc, 2, Nil) @@ -2569,11 +2570,16 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val narrowDepD = new OneToOneDependency(rddD) val rddE = new MyRDD(sc, 1, List(shuffleDepA, narrowDepD), tracker = mapOutputTracker) - assert(scheduler.getShuffleDependencies(rddA) === Set()) - assert(scheduler.getShuffleDependencies(rddB) === Set()) - assert(scheduler.getShuffleDependencies(rddC) === Set(shuffleDepB)) - assert(scheduler.getShuffleDependencies(rddD) === Set(shuffleDepC)) - assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC)) + val (shuffleDepsA, _) = scheduler.getShuffleDependenciesAndResourceProfiles(rddA) + assert(shuffleDepsA === Set()) + val (shuffleDepsB, _) = scheduler.getShuffleDependenciesAndResourceProfiles(rddB) + assert(shuffleDepsB === Set()) + val (shuffleDepsC, _) = scheduler.getShuffleDependenciesAndResourceProfiles(rddC) + assert(shuffleDepsC === Set(shuffleDepB)) + val (shuffleDepsD, _) = scheduler.getShuffleDependenciesAndResourceProfiles(rddD) + assert(shuffleDepsD === Set(shuffleDepC)) + val (shuffleDepsE, _) = scheduler.getShuffleDependenciesAndResourceProfiles(rddE) + assert(shuffleDepsE === Set(shuffleDepA, shuffleDepC)) } test("SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stages" + @@ -3141,6 +3147,97 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assertDataStructuresEmpty() } + test("test default resource profile") { + val rdd = sc.parallelize(1 to 10).map(x => (x, x)) + val (shuffledeps, resourceprofiles) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd) + val rp = scheduler.mergeResourceProfilesForStage(resourceprofiles) + assert(rp.id == scheduler.sc.resourceProfileManager.defaultResourceProfile.id) + } + + test("test 1 resource profile") { + val ereqs = new ExecutorResourceRequests().cores(4) + val treqs = new TaskResourceRequests().cpus(1) + val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build + + val rdd = sc.parallelize(1 to 10).map(x => (x, x)).withResources(rp1) + val (shuffledeps, resourceprofiles) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd) + val rpMerged = scheduler.mergeResourceProfilesForStage(resourceprofiles) + val expectedid = Option(rdd.getResourceProfile).map(_.id) + assert(expectedid.isDefined) + assert(expectedid.get != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + assert(rpMerged.id == expectedid.get) + } + + test("test 2 resource profiles errors by default") { + import org.apache.spark.resource._ + val ereqs = new ExecutorResourceRequests().cores(4) + val treqs = new TaskResourceRequests().cpus(1) + val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build + + val ereqs2 = new ExecutorResourceRequests().cores(2) + val treqs2 = new TaskResourceRequests().cpus(2) + val rp2 = new ResourceProfileBuilder().require(ereqs2).require(treqs2).build + + val rdd = sc.parallelize(1 to 10).withResources(rp1).map(x => (x, x)).withResources(rp2) + val error = intercept[IllegalArgumentException] { + val (shuffledeps, resourceprofiles) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd) + scheduler.mergeResourceProfilesForStage(resourceprofiles) + }.getMessage() + + assert(error.contains("Multiple ResourceProfile's specified in the RDDs")) + } + + /** + * Checks the DAGScheduler's internal logic for traversing an RDD DAG by making sure that + * getShuffleDependenciesAndResourceProfiles correctly returns the direct shuffle dependencies + * of a particular RDD. The test creates the following RDD graph (where n denotes a narrow + * dependency and s denotes a shuffle dependency): + * + * A <------------s---------, + * \ + * B <--s-- C <--s-- D <--n------ E + * + * Here, the direct shuffle dependency of C is just the shuffle dependency on B. The direct + * shuffle dependencies of E are the shuffle dependency on A and the shuffle dependency on C. + */ + test("getShuffleDependenciesAndResourceProfiles returns deps and profiles correctly") { + import org.apache.spark.resource._ + val ereqs = new ExecutorResourceRequests().cores(4) + val treqs = new TaskResourceRequests().cpus(1) + val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build + val ereqs2 = new ExecutorResourceRequests().cores(6) + val treqs2 = new TaskResourceRequests().cpus(2) + val rp2 = new ResourceProfileBuilder().require(ereqs2).require(treqs2).build + + val rddWithRp = new MyRDD(sc, 2, Nil).withResources(rp1) + val rddA = new MyRDD(sc, 2, Nil).withResources(rp1) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1)) + val rddB = new MyRDD(sc, 2, Nil) + val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1)) + val rddWithRpDep = new OneToOneDependency(rddWithRp) + val rddC = new MyRDD(sc, 1, List(rddWithRpDep, shuffleDepB)).withResources(rp2) + val shuffleDepC = new ShuffleDependency(rddC, new HashPartitioner(1)) + val rddD = new MyRDD(sc, 1, List(shuffleDepC)) + val narrowDepD = new OneToOneDependency(rddD) + val rddE = new MyRDD(sc, 1, List(shuffleDepA, narrowDepD), tracker = mapOutputTracker) + + val (shuffleDepsA, rprofsA) = scheduler.getShuffleDependenciesAndResourceProfiles(rddA) + assert(shuffleDepsA === Set()) + assert(rprofsA === Set(rp1)) + val (shuffleDepsB, rprofsB) = scheduler.getShuffleDependenciesAndResourceProfiles(rddB) + assert(shuffleDepsB === Set()) + assert(rprofsB === Set()) + val (shuffleDepsC, rprofsC) = scheduler.getShuffleDependenciesAndResourceProfiles(rddC) + assert(shuffleDepsC === Set(shuffleDepB)) + assert(rprofsC === Set(rp1, rp2)) + val (shuffleDepsD, rprofsD) = scheduler.getShuffleDependenciesAndResourceProfiles(rddD) + assert(shuffleDepsD === Set(shuffleDepC)) + assert(rprofsD === Set()) + val (shuffleDepsE, rprofsE) = scheduler.getShuffleDependenciesAndResourceProfiles(rddE) + assert(shuffleDepsE === Set(shuffleDepA, shuffleDepC)) + assert(rprofsE === Set()) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 9f593e0..7ead51b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.Map import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.AccumulatorV2 @@ -71,7 +72,7 @@ private class DummySchedulerBackend extends SchedulerBackend { def stop(): Unit = {} def reviveOffers(): Unit = {} def defaultParallelism(): Int = 1 - def maxNumConcurrentTasks(): Int = 0 + def maxNumConcurrentTasks(rp: ResourceProfile): Int = 0 } private class DummyTaskScheduler extends TaskScheduler { diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index 8cb6268..9ec088a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -21,6 +21,7 @@ import java.util.Properties import org.apache.spark.{Partition, SparkEnv, TaskContext} import org.apache.spark.executor.TaskMetrics +import org.apache.spark.resource.ResourceProfile class FakeTask( stageId: Int, @@ -42,7 +43,12 @@ object FakeTask { * locations for each task (given as varargs) if this sequence is not empty. */ def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { - createTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0, prefLocs: _*) + createTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, prefLocs: _*) + } + + def createTaskSet(numTasks: Int, rpId: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { + createTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0, rpId, prefLocs: _*) } def createTaskSet( @@ -50,7 +56,8 @@ object FakeTask { stageId: Int, stageAttemptId: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { - createTaskSet(numTasks, stageId, stageAttemptId, priority = 0, prefLocs: _*) + createTaskSet(numTasks, stageId, stageAttemptId, priority = 0, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, prefLocs: _*) } def createTaskSet( @@ -58,6 +65,7 @@ object FakeTask { stageId: Int, stageAttemptId: Int, priority: Int, + rpId: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { if (prefLocs.size != 0 && prefLocs.size != numTasks) { throw new IllegalArgumentException("Wrong number of task locations") @@ -65,7 +73,7 @@ object FakeTask { val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil) } - new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null) + new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null, rpId) } def createShuffleMapTaskSet( @@ -91,11 +99,21 @@ object FakeTask { }, prefLocs(i), new Properties, SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array()) } - new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null) + new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) } def createBarrierTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { - createBarrierTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0, prefLocs: _*) + createBarrierTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0, + rpId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, prefLocs: _*) + } + + def createBarrierTaskSet( + numTasks: Int, + rpId: Int, + prefLocs: Seq[TaskLocation]*): TaskSet = { + createBarrierTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0, + rpId = rpId, prefLocs: _*) } def createBarrierTaskSet( @@ -103,6 +121,7 @@ object FakeTask { stageId: Int, stageAttemptId: Int, priority: Int, + rpId: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { if (prefLocs.size != 0 && prefLocs.size != numTasks) { throw new IllegalArgumentException("Wrong number of task locations") @@ -110,6 +129,6 @@ object FakeTask { val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil, isBarrier = true) } - new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null) + new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null, rpId) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index b953add..d9de976 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -22,6 +22,7 @@ import java.util.Properties import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE +import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler.SchedulingMode._ /** @@ -39,7 +40,8 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(stageId, i, Nil) } - new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0) + new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID), 0) } def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index dff8975..0874163 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -36,6 +36,7 @@ import org.apache.spark.TaskState._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.SCHEDULER_REVIVE_INTERVAL import org.apache.spark.rdd.RDD +import org.apache.spark.resource.ResourceProfile import org.apache.spark.util.{CallSite, ThreadUtils, Utils} /** @@ -385,7 +386,7 @@ private[spark] abstract class MockBackend( }.toIndexedSeq } - override def maxNumConcurrentTasks(): Int = 0 + override def maxNumConcurrentTasks(rp: ResourceProfile): Int = 0 /** * This is called by the scheduler whenever it has tasks it would like to schedule, when a tasks @@ -406,9 +407,9 @@ private[spark] abstract class MockBackend( (taskDescription, task) } newTasks.foreach { case (taskDescription, _) => + freeCores -= taskScheduler.CPUS_PER_TASK executorIdToExecutor(taskDescription.executorId).freeCores -= taskScheduler.CPUS_PER_TASK } - freeCores -= newTasks.size * taskScheduler.CPUS_PER_TASK assignedTasksWaitingToRun ++= newTasks } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index e7ecf84..9ee84a8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -31,6 +31,7 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config +import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceRequests} import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.util.ManualClock @@ -40,7 +41,7 @@ class FakeSchedulerBackend extends SchedulerBackend { def stop(): Unit = {} def reviveOffers(): Unit = {} def defaultParallelism(): Int = 1 - def maxNumConcurrentTasks(): Int = 0 + def maxNumConcurrentTasks(rp: ResourceProfile): Int = 0 } class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach @@ -202,7 +203,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B config.CPUS_PER_TASK.key -> taskCpus.toString) val numFreeCores = 1 val taskSet = new TaskSet( - Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) + Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), + 0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus), new WorkerOffer("executor1", "host1", numFreeCores)) taskScheduler.submitTasks(taskSet) @@ -216,7 +218,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // still be processed without error taskScheduler.submitTasks(FakeTask.createTaskSet(1)) val taskSet2 = new TaskSet( - Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 1, 0, 0, null) + Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), + 1, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) taskScheduler.submitTasks(taskSet2) taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten assert(taskDescriptions.map(_.executorId) === Seq("executor0")) @@ -1135,6 +1138,96 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(0 === taskDescriptions.length) } + test("don't schedule for a barrier taskSet if available slots are less than " + + "pending tasks gpus limiting") { + val taskCpus = 1 + val taskScheduler = setupSchedulerWithMaster( + s"local[$taskCpus]", config.CPUS_PER_TASK.key -> taskCpus.toString, + "spark.executor.resource.gpu.amount" -> "1", "spark.task.resource.gpu.amount" -> "1") + + val numFreeCores = 3 + val workerOffers = IndexedSeq( + new WorkerOffer("executor0", "host0", numFreeCores, Some("192.168.0.101:49625"), + Map("gpu" -> Seq("0").toBuffer)), + new WorkerOffer("executor1", "host1", numFreeCores, Some("192.168.0.101:49627"), + Map("gpu" -> Seq("0").toBuffer))) + val attempt1 = FakeTask.createBarrierTaskSet(3) + + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(0 === taskDescriptions.length) + } + + test("schedule tasks for a barrier taskSet if all tasks can be launched together gpus") { + val taskCpus = 1 + val taskScheduler = setupSchedulerWithMaster( + s"local[$taskCpus]", config.CPUS_PER_TASK.key -> taskCpus.toString, + "spark.executor.resource.gpu.amount" -> "1", "spark.task.resource.gpu.amount" -> "1") + + val numFreeCores = 3 + val workerOffers = IndexedSeq( + new WorkerOffer("executor0", "host0", numFreeCores, Some("192.168.0.101:49625"), + Map("gpu" -> Seq("0").toBuffer)), + new WorkerOffer("executor1", "host1", numFreeCores, Some("192.168.0.101:49627"), + Map("gpu" -> Seq("0").toBuffer)), + new WorkerOffer("executor2", "host2", numFreeCores, Some("192.168.0.101:49629"), + Map("gpu" -> Seq("0").toBuffer))) + val attempt1 = FakeTask.createBarrierTaskSet(3) + + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(3 === taskDescriptions.length) + } + + // barrier scheduling doesn't yet work with dynamic allocation but test it with another + // ResourceProfile anyway to make sure code path works when it is supported + test("schedule tasks for a barrier taskSet if all tasks can be launched together " + + "diff ResourceProfile") { + val taskCpus = 1 + val taskScheduler = setupSchedulerWithMaster( + s"local[$taskCpus]", config.CPUS_PER_TASK.key -> taskCpus.toString) + val execReqs = new ExecutorResourceRequests().cores(2).resource("gpu", 2) + val taskReqs = new TaskResourceRequests().cpus(1).resource("gpu", 1) + val rp = new ResourceProfile(execReqs.requests, taskReqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + + val numFreeCores = 2 + val workerOffers = IndexedSeq( + new WorkerOffer("executor0", "host0", numFreeCores, Some("192.168.0.101:49625"), + Map("gpu" -> Seq("0", "1").toBuffer), rp.id), + new WorkerOffer("executor1", "host1", numFreeCores, Some("192.168.0.101:49627"), + Map("gpu" -> Seq("0", "1").toBuffer), rp.id)) + val attempt1 = FakeTask.createBarrierTaskSet(3, rpId = rp.id) + + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(3 === taskDescriptions.length) + } + + test("schedule tasks for a barrier taskSet if all tasks can be launched together " + + "diff ResourceProfile, but not enough gpus") { + val taskCpus = 1 + val taskScheduler = setupSchedulerWithMaster( + s"local[$taskCpus]", config.CPUS_PER_TASK.key -> taskCpus.toString) + val execReqs = new ExecutorResourceRequests().cores(2).resource("gpu", 2) + val taskReqs = new TaskResourceRequests().cpus(1).resource("gpu", 1) + val rp = new ResourceProfile(execReqs.requests, taskReqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + + val numFreeCores = 2 + // make each of the worker offers only have 1 GPU, thus making it not enough + val workerOffers = IndexedSeq( + new WorkerOffer("executor0", "host0", numFreeCores, Some("192.168.0.101:49625"), + Map("gpu" -> Seq("0").toBuffer), rp.id), + new WorkerOffer("executor1", "host1", numFreeCores, Some("192.168.0.101:49627"), + Map("gpu" -> Seq("0").toBuffer), rp.id)) + val attempt1 = FakeTask.createBarrierTaskSet(3, rpId = rp.id) + + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(0 === taskDescriptions.length) + } + test("schedule tasks for a barrier taskSet if all tasks can be launched together") { val taskCpus = 2 val taskScheduler = setupSchedulerWithMaster( @@ -1165,8 +1258,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B new WorkerOffer("executor0", "host0", numFreeCores, Some("192.168.0.101:49625")), new WorkerOffer("executor1", "host1", numFreeCores, Some("192.168.0.101:49627")), new WorkerOffer("executor2", "host2", numFreeCores, Some("192.168.0.101:49629"))) - val barrier = FakeTask.createBarrierTaskSet(3, stageId = 0, stageAttemptId = 0, priority = 1) - val highPrio = FakeTask.createTaskSet(1, stageId = 1, stageAttemptId = 0, priority = 0) + val barrier = FakeTask.createBarrierTaskSet(3, stageId = 0, stageAttemptId = 0, priority = 1, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + val highPrio = FakeTask.createTaskSet(1, stageId = 1, stageAttemptId = 0, priority = 0, + rpId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) // submit highPrio and barrier taskSet taskScheduler.submitTasks(highPrio) @@ -1289,6 +1384,93 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(ArrayBuffer("1") === taskDescriptions(1).resources.get(GPU).get.addresses) } + test("Scheduler correctly accounts for GPUs per task with fractional amount") { + val taskCpus = 1 + val taskGpus = 0.33 + val executorGpus = 1 + val executorCpus = 4 + + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> taskCpus.toString, + TASK_GPU_ID.amountConf -> taskGpus.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, + config.EXECUTOR_CORES.key -> executorCpus.toString) + val taskSet = FakeTask.createTaskSet(5) + + val numFreeCores = 4 + val resources = Map(GPU -> ArrayBuffer("0", "0", "0")) + val singleCoreWorkerOffers = + IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores, None, resources)) + + taskScheduler.submitTasks(taskSet) + // Launch tasks on executor that satisfies resource requirements. + var taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten + assert(3 === taskDescriptions.length) + assert(!failedTaskSet) + assert(ArrayBuffer("0") === taskDescriptions(0).resources.get(GPU).get.addresses) + assert(ArrayBuffer("0") === taskDescriptions(1).resources.get(GPU).get.addresses) + assert(ArrayBuffer("0") === taskDescriptions(2).resources.get(GPU).get.addresses) + } + + test("Scheduler works with multiple ResourceProfiles and gpus") { + val taskCpus = 1 + val taskGpus = 1 + val executorGpus = 4 + val executorCpus = 4 + + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> taskCpus.toString, + TASK_GPU_ID.amountConf -> taskGpus.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, + config.EXECUTOR_CORES.key -> executorCpus.toString) + + val ereqs = new ExecutorResourceRequests().cores(6).resource(GPU, 6) + val treqs = new TaskResourceRequests().cpus(2).resource(GPU, 2) + val rp = new ResourceProfile(ereqs.requests, treqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + val taskSet = FakeTask.createTaskSet(3) + val rpTaskSet = FakeTask.createTaskSet(5, stageId = 1, stageAttemptId = 0, + priority = 0, rpId = rp.id) + + val resourcesDefaultProf = Map(GPU -> ArrayBuffer("0", "1", "2", "3")) + val resources = Map(GPU -> ArrayBuffer("4", "5", "6", "7", "8", "9")) + + val workerOffers = + IndexedSeq(new WorkerOffer("executor0", "host0", 2, None, resourcesDefaultProf), + new WorkerOffer("executor1", "host1", 6, None, resources, rp.id)) + taskScheduler.submitTasks(taskSet) + taskScheduler.submitTasks(rpTaskSet) + // should have 2 for default profile and 2 for additional resource profile + var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(5 === taskDescriptions.length) + var has2Gpus = 0 + var has1Gpu = 0 + for (tDesc <- taskDescriptions) { + assert(tDesc.resources.contains(GPU)) + if (tDesc.resources(GPU).addresses.size == 2) { + has2Gpus += 1 + } + if (tDesc.resources(GPU).addresses.size == 1) { + has1Gpu += 1 + } + } + assert(has2Gpus == 3) + assert(has1Gpu == 2) + + val resources3 = Map(GPU -> ArrayBuffer("14", "15", "16", "17", "18", "19")) + + // clear the first 2 worker offers so they don't have any room and add a third + // for the resource profile + val workerOffers3 = IndexedSeq( + new WorkerOffer("executor0", "host0", 0, None, Map.empty), + new WorkerOffer("executor1", "host1", 0, None, Map.empty, rp.id), + new WorkerOffer("executor2", "host2", 6, None, resources3, rp.id)) + taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten + assert(2 === taskDescriptions.length) + assert(taskDescriptions.head.resources.contains(GPU)) + assert(2 == taskDescriptions.head.resources(GPU).addresses.size) + } + /** * Used by tests to simulate a task failure. This calls the failure handler explicitly, to ensure * that all the state is updated when this method returns. Otherwise, there's no way to know when diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index b740e35..4566e3c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -34,6 +34,8 @@ import org.scalatest.concurrent.Eventually import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config +import org.apache.spark.internal.config.Tests.SKIP_VALIDATE_CORES_TESTING +import org.apache.spark.resource.{ResourceInformation, ResourceProfile} import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -213,7 +215,6 @@ class TaskSetManagerSuite super.afterEach() } - test("TaskSet with no preferences") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1")) @@ -657,7 +658,8 @@ class TaskSetManagerSuite sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1")) - val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null) + val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, + null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) assert(!manager.emittedTaskSizeWarning) @@ -672,7 +674,8 @@ class TaskSetManagerSuite sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = new TaskSet( - Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) + Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), + 0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) intercept[TaskNotSerializableException] { @@ -743,7 +746,8 @@ class TaskSetManagerSuite val singleTask = new ShuffleMapTask(0, 0, null, new Partition { override def index: Int = 0 }, Seq(TaskLocation("host1", "execA")), new Properties, null) - val taskSet = new TaskSet(Array(singleTask), 0, 0, 0, null) + val taskSet = new TaskSet(Array(singleTask), 0, 0, 0, + null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) // Offer host1, which should be accepted as a PROCESS_LOCAL location @@ -1053,10 +1057,10 @@ class TaskSetManagerSuite } // Offer resources for 4 tasks to start for ((k, v) <- List( - "exec1" -> "host1", - "exec1" -> "host1", - "exec2" -> "host2", - "exec2" -> "host2")) { + "exec1" -> "host1", + "exec1" -> "host1", + "exec2" -> "host2", + "exec2" -> "host2")) { val taskOption = manager.resourceOffer(k, v, NO_PREF) assert(taskOption.isDefined) val task = taskOption.get @@ -1480,10 +1484,10 @@ class TaskSetManagerSuite } // Offer resources for 4 tasks to start for ((exec, host) <- Seq( - "exec1" -> "host1", - "exec1" -> "host1", - "exec3" -> "host3", - "exec2" -> "host2")) { + "exec1" -> "host1", + "exec1" -> "host1", + "exec3" -> "host3", + "exec2" -> "host2")) { val taskOption = manager.resourceOffer(exec, host, NO_PREF) assert(taskOption.isDefined) val task = taskOption.get @@ -1552,10 +1556,10 @@ class TaskSetManagerSuite } // Offer resources for 4 tasks to start for ((k, v) <- List( - "exec1" -> "host1", - "exec1" -> "host1", - "exec2" -> "host2", - "exec2" -> "host2")) { + "exec1" -> "host1", + "exec1" -> "host1", + "exec2" -> "host2", + "exec2" -> "host2")) { val taskOption = manager.resourceOffer(k, v, NO_PREF) assert(taskOption.isDefined) val task = taskOption.get @@ -1655,7 +1659,7 @@ class TaskSetManagerSuite assert(FakeRackUtil.numBatchInvocation === 1) } - test("TaskSetManager allocate resource addresses from available resources") { + test("TaskSetManager passes task resource along") { import TestUtils._ sc = new SparkContext("local", "test") @@ -1664,15 +1668,13 @@ class TaskSetManagerSuite val taskSet = FakeTask.createTaskSet(1) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) - val availableResources = Map(GPU -> ArrayBuffer("0", "1", "2", "3")) - val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF, availableResources) + val taskResourceAssignments = Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))) + val taskOption = + manager.resourceOffer("exec1", "host1", NO_PREF, taskResourceAssignments) assert(taskOption.isDefined) val allocatedResources = taskOption.get.resources assert(allocatedResources.size == 1) assert(allocatedResources(GPU).addresses sameElements Array("0", "1")) - // Allocated resource addresses should still present in `availableResources`, they will only - // get removed inside TaskSchedulerImpl later. - assert(availableResources(GPU) sameElements Array("0", "1", "2", "3")) } test("SPARK-26755 Ensure that a speculative task is submitted only once for execution") { @@ -1793,15 +1795,16 @@ class TaskSetManagerSuite numTasks: Int, numExecutorCores: Int, numCoresPerTask: Int): (TaskSetManager, ManualClock) = { - sc = new SparkContext("local", "test") - sc.conf.set(config.SPECULATION_ENABLED, true) - sc.conf.set(config.SPECULATION_QUANTILE.key, speculationQuantile.toString) + val conf = new SparkConf() + conf.set(config.SPECULATION_ENABLED, true) + conf.set(config.SPECULATION_QUANTILE.key, speculationQuantile.toString) // Set the number of slots per executor - sc.conf.set(config.EXECUTOR_CORES.key, numExecutorCores.toString) - sc.conf.set(config.CPUS_PER_TASK.key, numCoresPerTask.toString) + conf.set(config.EXECUTOR_CORES.key, numExecutorCores.toString) + conf.set(config.CPUS_PER_TASK.key, numCoresPerTask.toString) if (speculationThresholdOpt.isDefined) { - sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, speculationThresholdOpt.get) + conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, speculationThresholdOpt.get) } + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) // Create a task set with the given number of tasks val taskSet = FakeTask.createTaskSet(numTasks) @@ -1890,15 +1893,28 @@ class TaskSetManagerSuite test("SPARK-30417 when spark.task.cpus is greater than spark.executor.cores due to " + "standalone settings, speculate if there is only one task in the stage") { - val (manager, clock) = testSpeculationDurationSetup( - Some("60min"), - // Set the quantile to be 1.0 so that regular speculation would not be triggered - speculationQuantile = 1.0, - numTasks = 1, - numExecutorCores = 1, - numCoresPerTask = 2 - ) + val numTasks = 1 + val numCoresPerTask = 2 + val conf = new SparkConf() + // skip throwing exception when cores per task > cores per executor to emulate standalone mode + conf.set(SKIP_VALIDATE_CORES_TESTING, true) + conf.set(config.SPECULATION_ENABLED, true) + conf.set(config.SPECULATION_QUANTILE.key, "1.0") + // Skip setting cores per executor to emulate standalone default mode + conf.set(config.CPUS_PER_TASK.key, numCoresPerTask.toString) + conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, "60min") + sc = new SparkContext("local", "test", conf) + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + // Create a task set with the given number of tasks + val taskSet = FakeTask.createTaskSet(numTasks) + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + manager.isZombie = false + // Offer resources for the task to start + for (i <- 1 to numTasks) { + manager.resourceOffer(s"exec$i", s"host$i", NO_PREF) + } clock.advance(1000*60*60) assert(!manager.checkSpeculatableTasks(0)) assert(sched.speculativeTasks.size == 0) @@ -1942,7 +1958,8 @@ class TaskSetManagerSuite TestUtils.waitUntilExecutorsUp(sc, 2, 60000) val tasks = Array.tabulate[Task[_]](2)(partition => new FakeLongTasks(stageId = 0, partition)) - val taskSet: TaskSet = new TaskSet(tasks, stageId = 0, stageAttemptId = 0, priority = 0, null) + val taskSet: TaskSet = new TaskSet(tasks, stageId = 0, stageAttemptId = 0, priority = 0, null, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) val stageId = taskSet.stageId val stageAttemptId = taskSet.stageAttemptId sched.submitTasks(taskSet) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index e2a9914..f1e3fca 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -31,6 +31,7 @@ import org.apache.spark.{SparkContext, SparkException, TaskState} import org.apache.spark.deploy.mesos.{config => mesosConfig} import org.apache.spark.executor.MesosExecutorBackend import org.apache.spark.internal.config +import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.util.Utils @@ -457,7 +458,7 @@ private[spark] class MesosFineGrainedSchedulerBackend( super.applicationId } - override def maxNumConcurrentTasks(): Int = { + override def maxNumConcurrentTasks(rp: ResourceProfile): Int = { // TODO SPARK-25074 support this method for MesosFineGrainedSchedulerBackend 0 } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org