attilapiros commented on a change in pull request #35172:
URL: https://github.com/apache/spark/pull/35172#discussion_r802407515
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
##########
@@ -72,6 +82,6 @@ class ExecutorInfo(
override def hashCode(): Int = {
val state = Seq(executorHost, totalCores, logUrlMap, attributes,
resourcesInfo,
resourceProfileId)
- state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+ state.filter(_ != null).map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
Review comment:
Is this change needed?
As the `state` have not touched would this be an unrelated bugfix?
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -260,9 +266,27 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
.resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf)
(info.name, new ExecutorResourceInfo(info.name, info.addresses,
numParts))
}
+ // If we've requested the executor figure out when we did.
+ val reqTs: Option[Long] =
CoarseGrainedSchedulerBackend.this.synchronized {
+ execRequestTimes.get(resourceProfileId).flatMap {
+ times =>
+ times.headOption.map {
+ h =>
+ // Take off the top element
+ times.dequeue()
+ // If we requested more than one exec reduce the req count by
1 and prepend it back
+ if (h._1 > 1) {
+ ((h._1 - 1, h._2)) +=: times
Review comment:
As I see the append operation on the queue is also amortized constant
time and it would keep the ordering (with dequeue & prepend we are moving the
first element to the end.)
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -780,15 +805,52 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
(scheduler.sc.resourceProfileManager.resourceProfileFromId(rpid), num)
}
val response = synchronized {
+ val oldResourceProfileToNumExecutors =
requestedTotalExecutorsPerResourceProfile.map {
+ case (rp, num) =>
+ (rp.id, num)
+ }.toMap
this.requestedTotalExecutorsPerResourceProfile.clear()
this.requestedTotalExecutorsPerResourceProfile ++=
resourceProfileToNumExecutors
this.numLocalityAwareTasksPerResourceProfileId =
numLocalityAwareTasksPerResourceProfileId
this.rpHostToLocalTaskCount = hostToLocalTaskCount
+ updateExecRequestTimes(oldResourceProfileToNumExecutors,
resourceProfileIdToNumExecutors)
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
}
defaultAskTimeout.awaitResult(response)
}
+ private def updateExecRequestTimes(oldProfile: Map[Int, Int], newProfile:
Map[Int, Int]): Unit = {
+ newProfile.map {
+ case (k, v) =>
+ val delta = v - oldProfile.getOrElse(k, 0)
+ if (delta != 0) {
+ updateExecRequestTime(k, delta)
+ }
+ }
+ }
+
+ private def updateExecRequestTime(profileId: Int, delta: Int) = {
+ val times = execRequestTimes.getOrElseUpdate(profileId, Queue[(Int,
Long)]())
+ if (delta > 0) {
+ // Add the request to the end, constant time op
+ times += ((delta, System.currentTimeMillis()))
Review comment:
This prepend is fine for the ordering (as earlier requested executors
will be in the beginning of the queue).
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -780,15 +805,52 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
(scheduler.sc.resourceProfileManager.resourceProfileFromId(rpid), num)
}
val response = synchronized {
+ val oldResourceProfileToNumExecutors =
requestedTotalExecutorsPerResourceProfile.map {
+ case (rp, num) =>
+ (rp.id, num)
+ }.toMap
this.requestedTotalExecutorsPerResourceProfile.clear()
this.requestedTotalExecutorsPerResourceProfile ++=
resourceProfileToNumExecutors
this.numLocalityAwareTasksPerResourceProfileId =
numLocalityAwareTasksPerResourceProfileId
this.rpHostToLocalTaskCount = hostToLocalTaskCount
+ updateExecRequestTimes(oldResourceProfileToNumExecutors,
resourceProfileIdToNumExecutors)
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
}
defaultAskTimeout.awaitResult(response)
}
+ private def updateExecRequestTimes(oldProfile: Map[Int, Int], newProfile:
Map[Int, Int]): Unit = {
+ newProfile.map {
+ case (k, v) =>
+ val delta = v - oldProfile.getOrElse(k, 0)
+ if (delta != 0) {
+ updateExecRequestTime(k, delta)
+ }
+ }
+ }
+
+ private def updateExecRequestTime(profileId: Int, delta: Int) = {
+ val times = execRequestTimes.getOrElseUpdate(profileId, Queue[(Int,
Long)]())
+ if (delta > 0) {
+ // Add the request to the end, constant time op
+ times += ((delta, System.currentTimeMillis()))
+ } else if (delta < 0) {
+ // Consume as if |delta| had been allocated
+ var c = -delta
+ // Note: it's possible that something else allocated an executor and we
have
+ // a negative delta, we can just avoid mutating the queue.
+ while (c > 0 && !times.isEmpty) {
+ val h = times.dequeue
+ if (h._1 > c) {
+ // Prepend updated first req to times, constant time op
+ ((h._1 - c, h._2)) +=: times
Review comment:
To keep the ordering we can use the append here too (instead of the
prepend).
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -780,15 +805,52 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
(scheduler.sc.resourceProfileManager.resourceProfileFromId(rpid), num)
}
val response = synchronized {
+ val oldResourceProfileToNumExecutors =
requestedTotalExecutorsPerResourceProfile.map {
+ case (rp, num) =>
+ (rp.id, num)
+ }.toMap
this.requestedTotalExecutorsPerResourceProfile.clear()
this.requestedTotalExecutorsPerResourceProfile ++=
resourceProfileToNumExecutors
this.numLocalityAwareTasksPerResourceProfileId =
numLocalityAwareTasksPerResourceProfileId
this.rpHostToLocalTaskCount = hostToLocalTaskCount
+ updateExecRequestTimes(oldResourceProfileToNumExecutors,
resourceProfileIdToNumExecutors)
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
}
defaultAskTimeout.awaitResult(response)
}
+ private def updateExecRequestTimes(oldProfile: Map[Int, Int], newProfile:
Map[Int, Int]): Unit = {
+ newProfile.map {
+ case (k, v) =>
+ val delta = v - oldProfile.getOrElse(k, 0)
+ if (delta != 0) {
+ updateExecRequestTime(k, delta)
+ }
+ }
+ }
+
+ private def updateExecRequestTime(profileId: Int, delta: Int) = {
+ val times = execRequestTimes.getOrElseUpdate(profileId, Queue[(Int,
Long)]())
+ if (delta > 0) {
+ // Add the request to the end, constant time op
+ times += ((delta, System.currentTimeMillis()))
+ } else if (delta < 0) {
+ // Consume as if |delta| had been allocated
+ var c = -delta
+ // Note: it's possible that something else allocated an executor and we
have
+ // a negative delta, we can just avoid mutating the queue.
+ while (c > 0 && !times.isEmpty) {
+ val h = times.dequeue
+ if (h._1 > c) {
+ // Prepend updated first req to times, constant time op
+ ((h._1 - c, h._2)) +=: times
Review comment:
Should not we set `c` to 0 (for stopping the loop as we processed all
the delta here)?
It would be nice to test this case too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]