kiszk commented on a change in pull request #27313: [SPARK-29148][CORE] Add
stage level scheduling dynamic allocation and scheduler backend changes
URL: https://github.com/apache/spark/pull/27313#discussion_r369721010
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -648,20 +775,50 @@ private[spark] class ExecutorAllocationManager(
*
* Note: This is not thread-safe without the caller owning the
`allocationManager` lock.
*/
- def pendingTasks(): Int = {
- stageAttemptToNumTasks.map { case (stageAttempt, numTasks) =>
- numTasks -
stageAttemptToTaskIndices.get(stageAttempt).map(_.size).getOrElse(0)
+ def pendingTasksPerResourceProfile(rpId: Int): Int = {
Review comment:
Is it better to refactor code to use a common function from line 778 to line
814? The following stuff can be reused.
```
val pending = attempts.map { attempt =>
val numTotalTasks = stageAttemptToNumTasks.getOrElse(attempt, 0)
val numRunning =
stageAttemptToTaskIndices.get(attempt).map(_.size).getOrElse(0)
numTotalTasks - numRunning
}.sum
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]