Ngone51 commented on a change in pull request #27773: [SPARK-29154][CORE]
Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389531492
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -340,39 +338,48 @@ private[spark] class TaskSchedulerImpl(
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
- if (availableCpus(i) >= CPUS_PER_TASK &&
- resourcesMeetTaskRequirements(availableResources(i))) {
- try {
- for (task <- taskSet.resourceOffer(execId, host, maxLocality,
availableResources(i))) {
- tasks(i) += task
- val tid = task.taskId
- taskIdToTaskSetManager.put(tid, taskSet)
- taskIdToExecutorId(tid) = execId
- executorIdToRunningTaskIds(execId).add(tid)
- availableCpus(i) -= CPUS_PER_TASK
- assert(availableCpus(i) >= 0)
- task.resources.foreach { case (rName, rInfo) =>
- // Remove the first n elements from availableResources
addresses, these removed
- // addresses are the same as that we allocated in
taskSet.resourceOffer() since it's
- // synchronized. We don't remove the exact addresses allocated
because the current
- // approach produces the identical result with less time
complexity.
- availableResources(i).getOrElse(rName,
- throw new SparkException(s"Try to acquire resource $rName that
doesn't exist."))
- .remove(0, rInfo.addresses.size)
- }
- // Only update hosts for a barrier task.
- if (taskSet.isBarrier) {
- // The executor address is expected to be non empty.
- addressesWithDescs += (shuffledOffers(i).address.get -> task)
+ val taskSetRpID = taskSet.taskSet.resourceProfileId
+ // make the resource profile id a hard requirement for now - ie only put
tasksets
+ // on executors where resource profile exactly matches.
+ if (taskSetRpID == shuffledOffers(i).resourceProfileId) {
+ val taskResAssignmentsOpt = resourcesMeetTaskRequirements(taskSet,
availableCpus(i),
+ availableResources(i))
+ taskResAssignmentsOpt.foreach { taskResAssignments =>
+ try {
+ val taskCpus =
sc.resourceProfileManager.taskCpusForProfileId(taskSetRpID)
+ val taskDescOption = taskSet.resourceOffer(execId, host,
maxLocality, taskCpus,
+ taskResAssignments)
+ for (task <- taskDescOption) {
+ tasks(i) += task
+ val tid = task.taskId
+ taskIdToTaskSetManager.put(tid, taskSet)
+ taskIdToExecutorId(tid) = execId
+ executorIdToRunningTaskIds(execId).add(tid)
+ availableCpus(i) -= task.cpus
+ assert(availableCpus(i) >= 0)
Review comment:
Seem this is not necessary anymore since we've checked in
`resourcesMeetTaskRequirements`. But I'm OK to keep it, though.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]