tgravescs commented on a change in pull request #32526:
URL: https://github.com/apache/spark/pull/32526#discussion_r638018207
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -859,6 +847,29 @@ private[spark] class ExecutorAllocationManager(
allocationManager.synchronized {
// Clear unschedulableTaskSets since atleast one task becomes
schedulable now
unschedulableTaskSets.remove(stageAttempt)
+ removeStageFromResourceProfileIfUnused(stageAttempt)
+ }
+ }
+
+ def removeStageFromResourceProfileIfUnused(stageAttempt: StageAttempt):
Unit = {
+ if (!stageAttemptToNumRunningTask.contains(stageAttempt) &&
+ !stageAttemptToNumTasks.contains(stageAttempt) &&
+ !stageAttemptToNumSpeculativeTasks.contains(stageAttempt) &&
+ !stageAttemptToTaskIndices.contains(stageAttempt) &&
+ !stageAttemptToSpeculativeTaskIndices.contains(stageAttempt) &&
+ !unschedulableTaskSets.contains(stageAttempt)
Review comment:
we don't need to check for unschedulableTaskSets. if a attempt gets into
there it should have stageAttemptToNumTasks . If its been stage completed then
stageAttemptToNumTasks would be removed but we shouldn't still be trying to
schedule it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]