mridulm commented on a change in pull request #35172:
URL: https://github.com/apache/spark/pull/35172#discussion_r803948435
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -780,15 +805,52 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
(scheduler.sc.resourceProfileManager.resourceProfileFromId(rpid), num)
}
val response = synchronized {
+ val oldResourceProfileToNumExecutors =
requestedTotalExecutorsPerResourceProfile.map {
+ case (rp, num) =>
+ (rp.id, num)
+ }.toMap
this.requestedTotalExecutorsPerResourceProfile.clear()
this.requestedTotalExecutorsPerResourceProfile ++=
resourceProfileToNumExecutors
this.numLocalityAwareTasksPerResourceProfileId =
numLocalityAwareTasksPerResourceProfileId
this.rpHostToLocalTaskCount = hostToLocalTaskCount
+ updateExecRequestTimes(oldResourceProfileToNumExecutors,
resourceProfileIdToNumExecutors)
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
}
defaultAskTimeout.awaitResult(response)
}
+ private def updateExecRequestTimes(oldProfile: Map[Int, Int], newProfile:
Map[Int, Int]): Unit = {
+ newProfile.map {
+ case (k, v) =>
+ val delta = v - oldProfile.getOrElse(k, 0)
+ if (delta != 0) {
+ updateExecRequestTime(k, delta)
+ }
+ }
Review comment:
Looks like if we drop keeps from newProfile,
`requestedTotalExecutorsPerResourceProfile` silently forgets about them - and
backend continues to request for the earlier count ?
I should relook at this later (to clarify, this is not an issue with this PR
btw)
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -260,9 +266,27 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
.resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf)
(info.name, new ExecutorResourceInfo(info.name, info.addresses,
numParts))
}
+ // If we've requested the executor figure out when we did.
+ val reqTs: Option[Long] =
CoarseGrainedSchedulerBackend.this.synchronized {
+ execRequestTimes.get(resourceProfileId).flatMap {
+ times =>
+ times.headOption.map {
+ h =>
+ // Take off the top element
+ times.dequeue()
+ // If we requested more than one exec reduce the req count by
1 and prepend it back
+ if (h._1 > 1) {
+ ((h._1 - 1, h._2)) +=: times
Review comment:
I will let @holdenk elaborate better.
If we are considering an execution allocation to be to satisfy the oldest
pending request, this should be fine - right ?
Requests:
queue += (t=1, num=2)
queue += (t=2, num=1)
registration of exec1 at t=3:
queue.dequeue -> update num to 1 and prepend back to queue, so that (t=1,
num=1) remains head.
(exec1 will have requestTs = 1, and a subsequent exec2 will also have
requestTs = 1)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]